"""Core STAC Ingestion Manager implementation.
This module contains the core :class:`BaseSTACIngestionManager` class that provides
the base implementation for submitting raster ingestion jobs to STAC via Airflow or compute engine.
"""
import logging
from datetime import date, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from ...storage import CloudObjectStore, StorageError
from ..clients import AbstractSTACClient
from ..models import DEFAULT_COSTING_LABELS, IngestionConfig
from ..uploader import AbstractRasterUploadHandler, DateInput, RasterUploadService
from ..validations import validate_inputs
logger = logging.getLogger(__name__)
LocalPath = Union[str, Path]
# Sentinel value to distinguish "not provided" from "explicitly None" for TTL
_TTL_NOT_PROVIDED = object()
[docs]
class BaseSTACIngestionManager:
"""Base manager for STAC raster ingestion workflows.
This is the base class that provides the core implementation for STAC ingestion.
End users should use :class:`~.stac_manager.STACIngestionManager` instead, which
adds factory methods for creating manager instances.
This class provides:
- Input validation and batch processing
- Cloud storage configuration
- Raster asset uploads with STAC-compliant naming
- Job submission and status tracking
Attributes:
client: STAC client instance for API communication (Airflow or OpenEO)
Note:
This is a base class. Use :class:`~.stac_manager.STACIngestionManager.from_airflow()`
or :class:`~.stac_manager.STACIngestionManager.from_spatial_engine()` to create
instances.
"""
def __init__(
self,
client: AbstractSTACClient,
upload_handler: Optional[AbstractRasterUploadHandler] = None,
):
"""Initialize STAC Ingestion Manager with a STAC client.
Note: Use factory methods `from_airflow()` or `from_spatial_engine()` instead
of calling this constructor directly.
Args:
client: STAC client instance (Airflow or OpenEO)
upload_handler: Optional custom path builder used when uploading
local rasters before ingestion.
"""
self.client = client
self._storage: Optional[CloudObjectStore] = None
self._upload_service = RasterUploadService(upload_handler)
[docs]
def ingest_rasters(
self,
raster_s3_urls: List[str],
collection: str,
ingestion_batch_size: Optional[int] = None,
convert_to_cog: Optional[bool] = None,
cog_profile: Optional[str] = None,
cog_profile_options: Optional[Dict[str, Any]] = None,
cog_overview_level: Optional[int] = None,
lineage: Optional[Any] = None,
ttl: Any = _TTL_NOT_PROVIDED,
) -> Union[str, List[str]]:
"""Submit raster stacking and STAC ingestion job.
This method validates all inputs, builds the configuration, and submits
the job to the configured client. It returns a job ID that can be used
to track progress.
If the number of raster S3 URLs exceeds 1000, the list is split into chunks
of 1000, and multiple jobs are submitted. In this case, a list of job IDs
is returned.
Args:
raster_s3_urls: List of S3 URLs to raster files
collection: STAC collection name
ingestion_batch_size: Batch size for ingestion (optional, default: 100)
convert_to_cog: Whether to convert to COG format (optional)
cog_profile: COG profile name (e.g., 'lzw', 'deflate') (optional)
cog_profile_options: Profile options for cog_translate (optional)
cog_overview_level: Number of overview levels (optional)
lineage: Optional lineage information
ttl: Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days).
If not provided, defaults to 30 days. If None, TTL is skipped for permanent items.
The value is stored as an expiry date string in properties["ss:ttl"] in STAC metadata,
computed as today's date + ttl days (ISO format YYYY-MM-DD).
Returns:
Union[str, List[str]]: Unique job ID(s) for tracking the ingestion workflow.
Returns a single string if one job is submitted, or a list of strings if
multiple jobs are submitted due to large input size (>1000 URLs).
Raises:
ValidationError: If any input parameters are invalid
APIError: If API request fails
"""
# Handle TTL: convert sentinel to None for validation, but track if it was provided
ttl_for_validation = None if ttl is _TTL_NOT_PROVIDED else ttl
ttl_was_provided = ttl is not _TTL_NOT_PROVIDED
# Validate all inputs using Pydantic
validated = validate_inputs(
raster_s3_urls=raster_s3_urls,
collection=collection,
ingestion_batch_size=ingestion_batch_size,
convert_to_cog=convert_to_cog,
cog_profile=cog_profile,
cog_profile_options=cog_profile_options,
cog_overview_level=cog_overview_level,
lineage=lineage,
ttl=ttl_for_validation,
)
# Determine final TTL value for config:
# - If ttl was not provided: default to 30 days
# - If ttl=None was explicitly provided: skip TTL
# - If ttl was a positive integer: use that value
if not ttl_was_provided:
# Not provided: default to 30 days
final_ttl = 30
elif validated.ttl is None:
# Explicitly None: skip TTL
final_ttl = None
else:
# Positive integer provided: use that value
final_ttl = validated.ttl
full_urls = validated.raster_s3_urls
chunk_size = 1000
if len(full_urls) > chunk_size:
logger.info(
"Ingesting %d rasters in %d-URL chunks (total: %d chunks)",
len(full_urls),
chunk_size,
(len(full_urls) + chunk_size - 1) // chunk_size,
)
job_ids = []
for i in range(0, len(full_urls), chunk_size):
chunk_urls = full_urls[i : i + chunk_size]
# Build configuration for this chunk
config = self._build_config(
raster_s3_urls=chunk_urls,
collection=validated.collection,
ingestion_batch_size=validated.ingestion_batch_size,
convert_to_cog=validated.convert_to_cog,
cog_profile=validated.cog_profile,
cog_profile_options=validated.cog_profile_options,
cog_overview_level=validated.cog_overview_level,
lineage=validated.lineage,
ttl=final_ttl,
)
# Submit to client
response = self.client.submit_ingestion(config)
job_ids.append(response.job_id)
logger.debug(
"Submitted chunk %d with job_id: %s",
i // chunk_size + 1,
response.job_id,
)
logger.info("Submitted %d ingestion jobs", len(job_ids))
return job_ids
logger.info("Ingesting %d rasters in single job", len(full_urls))
# Build configuration using validated data
config = self._build_config(
raster_s3_urls=full_urls,
collection=validated.collection,
ingestion_batch_size=validated.ingestion_batch_size,
convert_to_cog=validated.convert_to_cog,
cog_profile=validated.cog_profile,
cog_profile_options=validated.cog_profile_options,
cog_overview_level=validated.cog_overview_level,
lineage=validated.lineage,
ttl=final_ttl,
)
# Submit to client
response = self.client.submit_ingestion(config)
logger.info("Submitted ingestion job: %s", response.job_id)
return response.job_id
# ------------------------------------------------------------------
# Cloud storage helpers
# ------------------------------------------------------------------
[docs]
def set_upload_handler(self, handler: AbstractRasterUploadHandler) -> None:
"""Override the default STAC upload path builder."""
self._upload_service.set_handler(handler)
logger.debug("Set custom upload handler: %s", handler.__class__.__name__)
[docs]
def build_stac_remote_path(
self,
collection: str,
filename: str,
subfolder: Optional[str] = None,
) -> str:
"""Return the STAC-compliant remote path for a raster."""
return self._upload_service.build_remote_path(collection, filename, subfolder)
[docs]
def upload_raster_asset(
self,
local_path: LocalPath,
*,
collection: str,
acquisition_date: DateInput,
tile: Optional[str] = None,
subfolder: Optional[str] = None,
overwrite: bool = False,
ensure_unique: bool = True,
) -> str:
"""Upload a raster by deriving the STAC filename from acquisition data.
Args:
local_path: Path to local raster file
collection: STAC collection name
acquisition_date: Date used to build the filename
tile: Optional tile identifier appended to the filename
subfolder: Optional nested folder under the collection
overwrite: Whether to overwrite existing remote objects
ensure_unique: When True (default) a UUID is appended to avoid collisions
Returns:
str: Remote cloud URL of uploaded file
"""
storage = self._ensure_storage()
remote_url = self._upload_service.upload_raster_asset(
storage=storage,
local_path=local_path,
collection=collection,
acquisition_date=acquisition_date,
tile=tile,
subfolder=subfolder,
overwrite=overwrite,
ensure_unique=ensure_unique,
)
logger.info(
"Uploaded raster asset: %s -> %s", Path(local_path).name, remote_url
)
return remote_url
[docs]
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get status of a submitted ingestion job.
Args:
job_id: Job ID returned from ingest_rasters
Returns:
Dictionary with job status information
Raises:
APIError: If API request fails
"""
return self.client.get_job_status(job_id)
def _build_config(
self,
raster_s3_urls: List[str],
collection: str,
ingestion_batch_size: Optional[int],
convert_to_cog: Optional[bool],
cog_profile: Optional[str],
cog_profile_options: Optional[Dict[str, Any]],
cog_overview_level: Optional[int],
lineage: Optional[Any],
ttl: Optional[int],
) -> IngestionConfig:
"""Build ingestion configuration from validated inputs."""
# Build parameters dict only with provided values
parameters: Dict[str, Any] = {}
if lineage is not None:
parameters["lineage"] = lineage
if convert_to_cog is not None:
parameters["convert_to_cog"] = convert_to_cog
if cog_profile is not None:
parameters["cog_profile"] = cog_profile
if cog_profile_options is not None:
parameters["cog_profile_options"] = cog_profile_options
if cog_overview_level is not None:
parameters["cog_overview_level"] = cog_overview_level
# Handle TTL: build properties dict for STAC metadata
properties: Dict[str, Any] = {}
# TTL logic:
# - If ttl is None: skip (don't add properties["ss:ttl"])
# - Otherwise: compute an expiry date as today's date + ttl days
# and store it as ISO date string (YYYY-MM-DD) in properties["ss:ttl"].
if ttl is not None:
# ttl is a positive integer at this point
expiry_date = date.today() + timedelta(days=ttl)
properties["ss:ttl"] = expiry_date.isoformat()
# Only add properties to parameters if it has content
if properties:
parameters["properties"] = properties
return IngestionConfig(
raster_s3_urls=raster_s3_urls,
collection=collection,
costing_labels=DEFAULT_COSTING_LABELS.copy(),
ingestion_batch_size=ingestion_batch_size,
parameters=parameters,
)
def _ensure_storage(self) -> CloudObjectStore:
if not self._storage:
raise StorageError(
"Cloud storage credentials not configured. "
"Call 'configure_storage' with AWS or Azure parameters before uploading."
)
return self._storage