Source code for satorbis_kit.pgstac.manager.stac_manager

"""Public STAC Manager with factory methods for different ingestion backends.

This module provides the public-facing :class:`STACIngestionManager` class that extends
:class:`~.ingestion_manager.BaseSTACIngestionManager` with factory methods for creating
manager instances configured for Airflow or compute engine.
"""

from typing import Optional

from ..clients import AirflowSTACClient, OpenEOSTACClient
from ..uploader import AbstractRasterUploadHandler
from .ingestion_manager import BaseSTACIngestionManager


[docs] class STACIngestionManager(BaseSTACIngestionManager): """Manager for STAC raster ingestion workflows. This is the main public API for STAC ingestion. It provides factory methods for creating manager instances configured for different backends: - Airflow: Direct DAG triggering for smaller jobs - Spatial Engine: SQS-based queueing for large-scale jobs The manager handles: - Input validation - Batch submission and chunking - Cloud storage configuration - Raster asset uploads with STAC-compliant naming See Also: - :meth:`from_airflow`: Create manager for Airflow backend - :meth:`from_spatial_engine`: Create manager for spatial engine backend - :func:`~.api.stack_rasters_and_ingest_via_airflow`: Function-based API for Airflow - :func:`~.api.stack_rasters_and_ingest_via_spatial_engine`: Function-based API for spatial engine """
[docs] @classmethod def from_airflow( cls, airflow_base_url: str, airflow_username: str, airflow_password: str, upload_handler: Optional[AbstractRasterUploadHandler] = None, ) -> "STACIngestionManager": """Create manager for direct Airflow DAG triggering. Use this method when you want to trigger Airflow DAGs directly. Suitable for smaller batch jobs. Args: airflow_base_url: Base URL for Airflow API (required) airflow_username: Username for Airflow authentication (required) airflow_password: Password for Airflow authentication (required) upload_handler: Optional custom path builder for uploads Returns: STACIngestionManager configured for Airflow Example: >>> manager = STACIngestionManager.from_airflow( ... airflow_base_url="https://airflow.example.com", ... airflow_username="admin", ... airflow_password="secret", ... ) >>> job_id = manager.ingest_rasters( ... raster_s3_urls=["s3://bucket/COL/20240401.tif"], ... collection="TEST_COL", ... ) """ client = AirflowSTACClient( base_url=airflow_base_url, username=airflow_username, password=airflow_password, ) return cls(client=client, upload_handler=upload_handler)
[docs] @classmethod def from_spatial_engine( cls, base_url: Optional[str] = None, api_key: Optional[str] = None, timeout: int = 30, upload_handler: Optional[AbstractRasterUploadHandler] = None, ) -> "STACIngestionManager": """Create manager for spatial engine API with SQS queueing. Use this method when you want to submit jobs through a spatial engine API (e.g., OpenEO) which handles SQS queueing. Better for large batch jobs and scalability. Args: base_url: Base URL for spatial engine API (defaults to https://dev.openeo.satsure.co) api_key: Optional API key for Bearer token authentication timeout: Request timeout in seconds (default: 30) upload_handler: Optional custom path builder for uploads Returns: STACIngestionManager configured for spatial engine Example: >>> manager = STACIngestionManager.from_spatial_engine( ... base_url="https://api.example.com", ... api_key="your-api-key", ... ) >>> job_id = manager.ingest_rasters( ... raster_s3_urls=["s3://bucket/COL/20240401.tif"], ... collection="TEST_COL", ... ) """ client = OpenEOSTACClient( base_url=base_url or "https://dev.openeo.satsure.co", api_key=api_key, timeout=timeout, ) return cls(client=client, upload_handler=upload_handler)