Source code for satorbis_kit.pgstac.manager.stac_manager
"""Public STAC Manager with factory methods for different ingestion backends.
This module provides the public-facing :class:`STACIngestionManager` class that extends
:class:`~.ingestion_manager.BaseSTACIngestionManager` with factory methods for creating
manager instances configured for Airflow or compute engine.
"""
from typing import Optional
from ..clients import AirflowSTACClient, OpenEOSTACClient
from ..uploader import AbstractRasterUploadHandler
from .ingestion_manager import BaseSTACIngestionManager
[docs]
class STACIngestionManager(BaseSTACIngestionManager):
"""Manager for STAC raster ingestion workflows.
This is the main public API for STAC ingestion. It provides factory methods
for creating manager instances configured for different backends:
- Airflow: Direct DAG triggering for smaller jobs
- Spatial Engine: SQS-based queueing for large-scale jobs
The manager handles:
- Input validation
- Batch submission and chunking
- Cloud storage configuration
- Raster asset uploads with STAC-compliant naming
See Also:
- :meth:`from_airflow`: Create manager for Airflow backend
- :meth:`from_spatial_engine`: Create manager for spatial engine backend
- :func:`~.api.stack_rasters_and_ingest_via_airflow`: Function-based API for Airflow
- :func:`~.api.stack_rasters_and_ingest_via_spatial_engine`: Function-based API for spatial engine
"""
[docs]
@classmethod
def from_airflow(
cls,
airflow_base_url: str,
airflow_username: str,
airflow_password: str,
upload_handler: Optional[AbstractRasterUploadHandler] = None,
) -> "STACIngestionManager":
"""Create manager for direct Airflow DAG triggering.
Use this method when you want to trigger Airflow DAGs directly.
Suitable for smaller batch jobs.
Args:
airflow_base_url: Base URL for Airflow API (required)
airflow_username: Username for Airflow authentication (required)
airflow_password: Password for Airflow authentication (required)
upload_handler: Optional custom path builder for uploads
Returns:
STACIngestionManager configured for Airflow
Example:
>>> manager = STACIngestionManager.from_airflow(
... airflow_base_url="https://airflow.example.com",
... airflow_username="admin",
... airflow_password="secret",
... )
>>> job_id = manager.ingest_rasters(
... raster_s3_urls=["s3://bucket/COL/20240401.tif"],
... collection="TEST_COL",
... )
"""
client = AirflowSTACClient(
base_url=airflow_base_url,
username=airflow_username,
password=airflow_password,
)
return cls(client=client, upload_handler=upload_handler)
[docs]
@classmethod
def from_spatial_engine(
cls,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
timeout: int = 30,
upload_handler: Optional[AbstractRasterUploadHandler] = None,
) -> "STACIngestionManager":
"""Create manager for spatial engine API with SQS queueing.
Use this method when you want to submit jobs through a spatial engine API
(e.g., OpenEO) which handles SQS queueing. Better for large batch jobs
and scalability.
Args:
base_url: Base URL for spatial engine API (defaults to https://dev.openeo.satsure.co)
api_key: Optional API key for Bearer token authentication
timeout: Request timeout in seconds (default: 30)
upload_handler: Optional custom path builder for uploads
Returns:
STACIngestionManager configured for spatial engine
Example:
>>> manager = STACIngestionManager.from_spatial_engine(
... base_url="https://api.example.com",
... api_key="your-api-key",
... )
>>> job_id = manager.ingest_rasters(
... raster_s3_urls=["s3://bucket/COL/20240401.tif"],
... collection="TEST_COL",
... )
"""
client = OpenEOSTACClient(
base_url=base_url or "https://dev.openeo.satsure.co",
api_key=api_key,
timeout=timeout,
)
return cls(client=client, upload_handler=upload_handler)