Source code for satorbis_kit.pgstac.manager.ingestion_manager

"""Core STAC Ingestion Manager implementation.

This module contains the core :class:`BaseSTACIngestionManager` class that provides
the base implementation for submitting raster ingestion jobs to STAC via Airflow or compute engine.
"""

import logging
from datetime import date, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from ...storage import CloudObjectStore, StorageError
from ..clients import AbstractSTACClient
from ..models import DEFAULT_COSTING_LABELS, IngestionConfig
from ..uploader import AbstractRasterUploadHandler, DateInput, RasterUploadService
from ..validations import validate_inputs

logger = logging.getLogger(__name__)

LocalPath = Union[str, Path]

# Sentinel value to distinguish "not provided" from "explicitly None" for TTL
_TTL_NOT_PROVIDED = object()


[docs] class BaseSTACIngestionManager: """Base manager for STAC raster ingestion workflows. This is the base class that provides the core implementation for STAC ingestion. End users should use :class:`~.stac_manager.STACIngestionManager` instead, which adds factory methods for creating manager instances. This class provides: - Input validation and batch processing - Cloud storage configuration - Raster asset uploads with STAC-compliant naming - Job submission and status tracking Attributes: client: STAC client instance for API communication (Airflow or OpenEO) Note: This is a base class. Use :class:`~.stac_manager.STACIngestionManager.from_airflow()` or :class:`~.stac_manager.STACIngestionManager.from_spatial_engine()` to create instances. """ def __init__( self, client: AbstractSTACClient, upload_handler: Optional[AbstractRasterUploadHandler] = None, ): """Initialize STAC Ingestion Manager with a STAC client. Note: Use factory methods `from_airflow()` or `from_spatial_engine()` instead of calling this constructor directly. Args: client: STAC client instance (Airflow or OpenEO) upload_handler: Optional custom path builder used when uploading local rasters before ingestion. """ self.client = client self._storage: Optional[CloudObjectStore] = None self._upload_service = RasterUploadService(upload_handler)
[docs] def ingest_rasters( self, raster_s3_urls: List[str], collection: str, ingestion_batch_size: Optional[int] = None, convert_to_cog: Optional[bool] = None, cog_profile: Optional[str] = None, cog_profile_options: Optional[Dict[str, Any]] = None, cog_overview_level: Optional[int] = None, lineage: Optional[Any] = None, ttl: Any = _TTL_NOT_PROVIDED, ) -> Union[str, List[str]]: """Submit raster stacking and STAC ingestion job. This method validates all inputs, builds the configuration, and submits the job to the configured client. It returns a job ID that can be used to track progress. If the number of raster S3 URLs exceeds 1000, the list is split into chunks of 1000, and multiple jobs are submitted. In this case, a list of job IDs is returned. Args: raster_s3_urls: List of S3 URLs to raster files collection: STAC collection name ingestion_batch_size: Batch size for ingestion (optional, default: 100) convert_to_cog: Whether to convert to COG format (optional) cog_profile: COG profile name (e.g., 'lzw', 'deflate') (optional) cog_profile_options: Profile options for cog_translate (optional) cog_overview_level: Number of overview levels (optional) lineage: Optional lineage information ttl: Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. If None, TTL is skipped for permanent items. The value is stored as an expiry date string in properties["ss:ttl"] in STAC metadata, computed as today's date + ttl days (ISO format YYYY-MM-DD). Returns: Union[str, List[str]]: Unique job ID(s) for tracking the ingestion workflow. Returns a single string if one job is submitted, or a list of strings if multiple jobs are submitted due to large input size (>1000 URLs). Raises: ValidationError: If any input parameters are invalid APIError: If API request fails """ # Handle TTL: convert sentinel to None for validation, but track if it was provided ttl_for_validation = None if ttl is _TTL_NOT_PROVIDED else ttl ttl_was_provided = ttl is not _TTL_NOT_PROVIDED # Validate all inputs using Pydantic validated = validate_inputs( raster_s3_urls=raster_s3_urls, collection=collection, ingestion_batch_size=ingestion_batch_size, convert_to_cog=convert_to_cog, cog_profile=cog_profile, cog_profile_options=cog_profile_options, cog_overview_level=cog_overview_level, lineage=lineage, ttl=ttl_for_validation, ) # Determine final TTL value for config: # - If ttl was not provided: default to 30 days # - If ttl=None was explicitly provided: skip TTL # - If ttl was a positive integer: use that value if not ttl_was_provided: # Not provided: default to 30 days final_ttl = 30 elif validated.ttl is None: # Explicitly None: skip TTL final_ttl = None else: # Positive integer provided: use that value final_ttl = validated.ttl full_urls = validated.raster_s3_urls chunk_size = 1000 if len(full_urls) > chunk_size: logger.info( "Ingesting %d rasters in %d-URL chunks (total: %d chunks)", len(full_urls), chunk_size, (len(full_urls) + chunk_size - 1) // chunk_size, ) job_ids = [] for i in range(0, len(full_urls), chunk_size): chunk_urls = full_urls[i : i + chunk_size] # Build configuration for this chunk config = self._build_config( raster_s3_urls=chunk_urls, collection=validated.collection, ingestion_batch_size=validated.ingestion_batch_size, convert_to_cog=validated.convert_to_cog, cog_profile=validated.cog_profile, cog_profile_options=validated.cog_profile_options, cog_overview_level=validated.cog_overview_level, lineage=validated.lineage, ttl=final_ttl, ) # Submit to client response = self.client.submit_ingestion(config) job_ids.append(response.job_id) logger.debug( "Submitted chunk %d with job_id: %s", i // chunk_size + 1, response.job_id, ) logger.info("Submitted %d ingestion jobs", len(job_ids)) return job_ids logger.info("Ingesting %d rasters in single job", len(full_urls)) # Build configuration using validated data config = self._build_config( raster_s3_urls=full_urls, collection=validated.collection, ingestion_batch_size=validated.ingestion_batch_size, convert_to_cog=validated.convert_to_cog, cog_profile=validated.cog_profile, cog_profile_options=validated.cog_profile_options, cog_overview_level=validated.cog_overview_level, lineage=validated.lineage, ttl=final_ttl, ) # Submit to client response = self.client.submit_ingestion(config) logger.info("Submitted ingestion job: %s", response.job_id) return response.job_id
# ------------------------------------------------------------------ # Cloud storage helpers # ------------------------------------------------------------------
[docs] def configure_storage( self, provider: str, **kwargs: Any, ) -> CloudObjectStore: """Initialize cloud storage credentials for uploading local rasters. Args: provider: Either "aws" or "azure". **kwargs: Provider specific keyword arguments. Returns: Configured CloudObjectStore instance. """ provider_lower = provider.lower() if provider_lower == "aws": self._storage = CloudObjectStore.from_aws(**kwargs) logger.debug("Configured AWS storage: bucket=%s", kwargs.get("bucket")) elif provider_lower == "azure": self._storage = CloudObjectStore.from_azure(**kwargs) logger.debug( "Configured Azure storage: account=%s, container=%s", kwargs.get("account_name"), kwargs.get("container"), ) else: raise StorageError("provider must be either 'aws' or 'azure'") return self._storage
[docs] def set_upload_handler(self, handler: AbstractRasterUploadHandler) -> None: """Override the default STAC upload path builder.""" self._upload_service.set_handler(handler) logger.debug("Set custom upload handler: %s", handler.__class__.__name__)
[docs] def build_stac_remote_path( self, collection: str, filename: str, subfolder: Optional[str] = None, ) -> str: """Return the STAC-compliant remote path for a raster.""" return self._upload_service.build_remote_path(collection, filename, subfolder)
[docs] def upload_raster_asset( self, local_path: LocalPath, *, collection: str, acquisition_date: DateInput, tile: Optional[str] = None, subfolder: Optional[str] = None, overwrite: bool = False, ensure_unique: bool = True, ) -> str: """Upload a raster by deriving the STAC filename from acquisition data. Args: local_path: Path to local raster file collection: STAC collection name acquisition_date: Date used to build the filename tile: Optional tile identifier appended to the filename subfolder: Optional nested folder under the collection overwrite: Whether to overwrite existing remote objects ensure_unique: When True (default) a UUID is appended to avoid collisions Returns: str: Remote cloud URL of uploaded file """ storage = self._ensure_storage() remote_url = self._upload_service.upload_raster_asset( storage=storage, local_path=local_path, collection=collection, acquisition_date=acquisition_date, tile=tile, subfolder=subfolder, overwrite=overwrite, ensure_unique=ensure_unique, ) logger.info( "Uploaded raster asset: %s -> %s", Path(local_path).name, remote_url ) return remote_url
[docs] def get_job_status(self, job_id: str) -> Dict[str, Any]: """Get status of a submitted ingestion job. Args: job_id: Job ID returned from ingest_rasters Returns: Dictionary with job status information Raises: APIError: If API request fails """ return self.client.get_job_status(job_id)
def _build_config( self, raster_s3_urls: List[str], collection: str, ingestion_batch_size: Optional[int], convert_to_cog: Optional[bool], cog_profile: Optional[str], cog_profile_options: Optional[Dict[str, Any]], cog_overview_level: Optional[int], lineage: Optional[Any], ttl: Optional[int], ) -> IngestionConfig: """Build ingestion configuration from validated inputs.""" # Build parameters dict only with provided values parameters: Dict[str, Any] = {} if lineage is not None: parameters["lineage"] = lineage if convert_to_cog is not None: parameters["convert_to_cog"] = convert_to_cog if cog_profile is not None: parameters["cog_profile"] = cog_profile if cog_profile_options is not None: parameters["cog_profile_options"] = cog_profile_options if cog_overview_level is not None: parameters["cog_overview_level"] = cog_overview_level # Handle TTL: build properties dict for STAC metadata properties: Dict[str, Any] = {} # TTL logic: # - If ttl is None: skip (don't add properties["ss:ttl"]) # - Otherwise: compute an expiry date as today's date + ttl days # and store it as ISO date string (YYYY-MM-DD) in properties["ss:ttl"]. if ttl is not None: # ttl is a positive integer at this point expiry_date = date.today() + timedelta(days=ttl) properties["ss:ttl"] = expiry_date.isoformat() # Only add properties to parameters if it has content if properties: parameters["properties"] = properties return IngestionConfig( raster_s3_urls=raster_s3_urls, collection=collection, costing_labels=DEFAULT_COSTING_LABELS.copy(), ingestion_batch_size=ingestion_batch_size, parameters=parameters, ) def _ensure_storage(self) -> CloudObjectStore: if not self._storage: raise StorageError( "Cloud storage credentials not configured. " "Call 'configure_storage' with AWS or Azure parameters before uploading." ) return self._storage