Source code for satorbis_kit.pgstac.manager.api

"""Function-based convenience API for STAC ingestion.

This module provides simple function-based helpers for common ingestion workflows,
serving as a lightweight alternative to the class-based :class:`STACIngestionManager` API.
"""

from typing import Any, Dict, List, Optional, Union

from .ingestion_manager import _TTL_NOT_PROVIDED
from .stac_manager import STACIngestionManager


[docs] def stack_rasters_and_ingest_via_airflow( s3_urls: List[str], collection_name: str, airflow_base_url: str, airflow_username: str, airflow_password: str, convert_to_cog: Optional[bool] = None, cog_profile: Optional[str] = None, cog_profile_options: Optional[Dict[str, Any]] = None, cog_overview_level: Optional[int] = None, ingestion_batch_size: Optional[int] = None, ttl: Optional[int] = None, **kwargs: Any, ) -> Union[str, List[str]]: """Convenience function to submit raster ingestion via Airflow. This function provides a simple, function-based interface for basic use cases using direct Airflow DAG triggering. For OpenEO/SQS-based submission, use :func:`stack_rasters_and_ingest_via_spatial_engine`. For more control and advanced features, use :class:`STACIngestionManager` class directly. Args: s3_urls: List of S3 URLs to raster files collection_name: STAC collection name airflow_base_url: Airflow API URL (required) airflow_username: Username for authentication (required) airflow_password: Password for authentication (required) convert_to_cog: Whether to convert to COG format (optional) cog_profile: COG profile name (e.g., 'lzw', 'deflate') (optional) cog_profile_options: Profile options for cog_translate (optional) cog_overview_level: Number of overview levels (optional) ingestion_batch_size: Batch size for ingestion (optional, default: 100) ttl: Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. Note: In this function API, passing ttl=None will also default to 30 days. To explicitly skip TTL for permanent items, use the class-based API with ttl=None. The value is stored as an expiry date string in properties["ss:ttl"] in STAC metadata, computed as today's date + ttl days (ISO format YYYY-MM-DD). **kwargs: Additional keyword arguments (for future extensions) Returns: Union[str, List[str]]: Job ID(s) for tracking the ingestion workflow. Raises: ValidationError: If any input parameters are invalid APIError: If Airflow API request fails """ # Import here to avoid circular import from ..clients import AirflowSTACClient client = AirflowSTACClient( base_url=airflow_base_url, username=airflow_username, password=airflow_password, ) manager = STACIngestionManager(client=client) # Convert None (default) to sentinel to distinguish "not provided" from "explicitly None" ttl_to_pass = _TTL_NOT_PROVIDED if ttl is None else ttl return manager.ingest_rasters( raster_s3_urls=s3_urls, collection=collection_name, ingestion_batch_size=ingestion_batch_size, convert_to_cog=convert_to_cog, cog_profile=cog_profile, cog_profile_options=cog_profile_options, cog_overview_level=cog_overview_level, ttl=ttl_to_pass, )
[docs] def stack_rasters_and_ingest_via_spatial_engine( s3_urls: List[str], collection_name: str, base_url: Optional[str] = None, api_key: Optional[str] = None, convert_to_cog: Optional[bool] = None, cog_profile: Optional[str] = None, cog_profile_options: Optional[Dict[str, Any]] = None, cog_overview_level: Optional[int] = None, ingestion_batch_size: Optional[int] = None, timeout: int = 30, ttl: Optional[int] = None, **kwargs: Any, ) -> Union[str, List[str]]: """Convenience function to submit raster ingestion via spatial engine API. This function provides a simple, function-based interface for submitting jobs through a spatial engine API (e.g., OpenEO) with SQS queueing. Better for large batch jobs. For direct Airflow triggering, use :func:`stack_rasters_and_ingest`. For more control and advanced features, use :class:`STACIngestionManager` class directly. Args: s3_urls: List of S3 URLs to raster files collection_name: STAC collection name base_url: Spatial engine API base URL (defaults to https://dev.openeo.satsure.co) api_key: Optional API key for Bearer token authentication convert_to_cog: Whether to convert to COG format (optional) cog_profile: COG profile name (e.g., 'lzw', 'deflate') (optional) cog_profile_options: Profile options for cog_translate (optional) cog_overview_level: Number of overview levels (optional) ingestion_batch_size: Batch size for ingestion (optional, default: 100) timeout: Request timeout in seconds (default: 30) ttl: Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. Note: In this function API, passing ttl=None will also default to 30 days. To explicitly skip TTL for permanent items, use the class-based API with ttl=None. The value is stored as properties["ss:ttl"] in STAC metadata. **kwargs: Additional keyword arguments (for future extensions) Returns: Union[str, List[str]]: Job ID(s) for tracking the ingestion workflow. Raises: ValidationError: If any input parameters are invalid APIError: If spatial engine API request fails """ # Import here to avoid circular import from ..clients import OpenEOSTACClient client = OpenEOSTACClient( base_url=base_url or "https://dev.openeo.satsure.co", api_key=api_key, timeout=timeout, ) manager = STACIngestionManager(client=client) # Convert None (default) to sentinel to distinguish "not provided" from "explicitly None" ttl_to_pass = _TTL_NOT_PROVIDED if ttl is None else ttl return manager.ingest_rasters( raster_s3_urls=s3_urls, collection=collection_name, ingestion_batch_size=ingestion_batch_size, convert_to_cog=convert_to_cog, cog_profile=cog_profile, cog_profile_options=cog_profile_options, cog_overview_level=cog_overview_level, ttl=ttl_to_pass, )