satorbis_kit.pgstac.manager package¶
STAC Ingestion Manager - High-level API for raster stacking and STAC ingestion workflows.
This module provides both class-based and function-based APIs for submitting STAC ingestion jobs.
Class-based API¶
>>> from satorbis_kit.pgstac.manager import STACIngestionManager
>>>
>>> # For Airflow backend
>>> manager = STACIngestionManager.from_airflow(
... airflow_base_url="https://airflow.example.com",
... airflow_username="admin",
... airflow_password="secret",
... )
>>> job_id = manager.ingest_rasters(
... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"],
... collection="TEST_COL",
... )
>>>
>>> # For spatial engine backend
>>> manager = STACIngestionManager.from_spatial_engine(
... base_url="https://api.example.com",
... api_key="your-key",
... )
>>> job_id = manager.ingest_rasters(
... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"],
... collection="TEST_COL",
... )
Function-based API¶
>>> from satorbis_kit.pgstac.manager import stack_rasters_and_ingest_via_airflow
>>>
>>> job_id = stack_rasters_and_ingest_via_airflow(
... s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"],
... collection_name="TEST_COL",
... airflow_base_url="https://airflow.example.com",
... airflow_username="admin",
... airflow_password="secret",
... )
>>> from satorbis_kit.pgstac.manager import stack_rasters_and_ingest_via_spatial_engine
>>>
>>> job_id = stack_rasters_and_ingest_via_spatial_engine(
... s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"],
... collection_name="TEST_COL",
... base_url="https://api.example.com",
... api_key="your-key",
... )
- class satorbis_kit.pgstac.manager.STACIngestionManager(client: AbstractSTACClient, upload_handler: AbstractRasterUploadHandler | None = None)[source]¶
Bases:
BaseSTACIngestionManagerManager for STAC raster ingestion workflows.
This is the main public API for STAC ingestion. It provides factory methods for creating manager instances configured for different backends: - Airflow: Direct DAG triggering for smaller jobs - Spatial Engine: SQS-based queueing for large-scale jobs
The manager handles: - Input validation - Batch submission and chunking - Cloud storage configuration - Raster asset uploads with STAC-compliant naming
See also
from_airflow(): Create manager for Airflow backendfrom_spatial_engine(): Create manager for spatial engine backendstack_rasters_and_ingest_via_airflow(): Function-based API for Airflowstack_rasters_and_ingest_via_spatial_engine(): Function-based API for spatial engine
- classmethod from_airflow(airflow_base_url: str, airflow_username: str, airflow_password: str, upload_handler: AbstractRasterUploadHandler | None = None) STACIngestionManager[source]¶
Create manager for direct Airflow DAG triggering.
Use this method when you want to trigger Airflow DAGs directly. Suitable for smaller batch jobs.
- Parameters:
airflow_base_url – Base URL for Airflow API (required)
airflow_username – Username for Airflow authentication (required)
airflow_password – Password for Airflow authentication (required)
upload_handler – Optional custom path builder for uploads
- Returns:
STACIngestionManager configured for Airflow
Example
>>> manager = STACIngestionManager.from_airflow( ... airflow_base_url="https://airflow.example.com", ... airflow_username="admin", ... airflow_password="secret", ... ) >>> job_id = manager.ingest_rasters( ... raster_s3_urls=["s3://bucket/COL/20240401.tif"], ... collection="TEST_COL", ... )
- classmethod from_spatial_engine(base_url: str | None = None, api_key: str | None = None, timeout: int = 30, upload_handler: AbstractRasterUploadHandler | None = None) STACIngestionManager[source]¶
Create manager for spatial engine API with SQS queueing.
Use this method when you want to submit jobs through a spatial engine API (e.g., OpenEO) which handles SQS queueing. Better for large batch jobs and scalability.
- Parameters:
base_url – Base URL for spatial engine API (defaults to https://dev.openeo.satsure.co)
api_key – Optional API key for Bearer token authentication
timeout – Request timeout in seconds (default: 30)
upload_handler – Optional custom path builder for uploads
- Returns:
STACIngestionManager configured for spatial engine
Example
>>> manager = STACIngestionManager.from_spatial_engine( ... base_url="https://api.example.com", ... api_key="your-api-key", ... ) >>> job_id = manager.ingest_rasters( ... raster_s3_urls=["s3://bucket/COL/20240401.tif"], ... collection="TEST_COL", ... )
- satorbis_kit.pgstac.manager.stack_rasters_and_ingest_via_airflow(s3_urls: List[str], collection_name: str, airflow_base_url: str, airflow_username: str, airflow_password: str, convert_to_cog: bool | None = None, cog_profile: str | None = None, cog_profile_options: Dict[str, Any] | None = None, cog_overview_level: int | None = None, ingestion_batch_size: int | None = None, ttl: int | None = None, **kwargs: Any) str | List[str][source]¶
Convenience function to submit raster ingestion via Airflow.
This function provides a simple, function-based interface for basic use cases using direct Airflow DAG triggering.
For OpenEO/SQS-based submission, use
stack_rasters_and_ingest_via_spatial_engine(). For more control and advanced features, useSTACIngestionManagerclass directly.- Parameters:
s3_urls – List of S3 URLs to raster files
collection_name – STAC collection name
airflow_base_url – Airflow API URL (required)
airflow_username – Username for authentication (required)
airflow_password – Password for authentication (required)
convert_to_cog – Whether to convert to COG format (optional)
cog_profile – COG profile name (e.g., ‘lzw’, ‘deflate’) (optional)
cog_profile_options – Profile options for cog_translate (optional)
cog_overview_level – Number of overview levels (optional)
ingestion_batch_size – Batch size for ingestion (optional, default: 100)
ttl – Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. Note: In this function API, passing ttl=None will also default to 30 days. To explicitly skip TTL for permanent items, use the class-based API with ttl=None. The value is stored as an expiry date string in properties[“ss:ttl”] in STAC metadata, computed as today’s date + ttl days (ISO format YYYY-MM-DD).
**kwargs – Additional keyword arguments (for future extensions)
- Returns:
Job ID(s) for tracking the ingestion workflow.
- Return type:
Union[str, List[str]]
- Raises:
ValidationError – If any input parameters are invalid
APIError – If Airflow API request fails
- satorbis_kit.pgstac.manager.stack_rasters_and_ingest_via_spatial_engine(s3_urls: List[str], collection_name: str, base_url: str | None = None, api_key: str | None = None, convert_to_cog: bool | None = None, cog_profile: str | None = None, cog_profile_options: Dict[str, Any] | None = None, cog_overview_level: int | None = None, ingestion_batch_size: int | None = None, timeout: int = 30, ttl: int | None = None, **kwargs: Any) str | List[str][source]¶
Convenience function to submit raster ingestion via spatial engine API.
This function provides a simple, function-based interface for submitting jobs through a spatial engine API (e.g., OpenEO) with SQS queueing. Better for large batch jobs.
For direct Airflow triggering, use
stack_rasters_and_ingest(). For more control and advanced features, useSTACIngestionManagerclass directly.- Parameters:
s3_urls – List of S3 URLs to raster files
collection_name – STAC collection name
base_url – Spatial engine API base URL (defaults to https://dev.openeo.satsure.co)
api_key – Optional API key for Bearer token authentication
convert_to_cog – Whether to convert to COG format (optional)
cog_profile – COG profile name (e.g., ‘lzw’, ‘deflate’) (optional)
cog_profile_options – Profile options for cog_translate (optional)
cog_overview_level – Number of overview levels (optional)
ingestion_batch_size – Batch size for ingestion (optional, default: 100)
timeout – Request timeout in seconds (default: 30)
ttl – Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. Note: In this function API, passing ttl=None will also default to 30 days. To explicitly skip TTL for permanent items, use the class-based API with ttl=None. The value is stored as properties[“ss:ttl”] in STAC metadata.
**kwargs – Additional keyword arguments (for future extensions)
- Returns:
Job ID(s) for tracking the ingestion workflow.
- Return type:
Union[str, List[str]]
- Raises:
ValidationError – If any input parameters are invalid
APIError – If spatial engine API request fails
Submodules¶
- satorbis_kit.pgstac.manager.api module
- satorbis_kit.pgstac.manager.ingestion_manager module
BaseSTACIngestionManagerBaseSTACIngestionManager.clientBaseSTACIngestionManager.build_stac_remote_path()BaseSTACIngestionManager.configure_storage()BaseSTACIngestionManager.get_job_status()BaseSTACIngestionManager.ingest_rasters()BaseSTACIngestionManager.set_upload_handler()BaseSTACIngestionManager.upload_raster_asset()
- satorbis_kit.pgstac.manager.stac_manager module