"""Function-based convenience API for STAC ingestion.
This module provides simple function-based helpers for common ingestion workflows,
serving as a lightweight alternative to the class-based :class:`STACIngestionManager` API.
"""
from typing import Any, Dict, List, Optional, Union
from .ingestion_manager import _TTL_NOT_PROVIDED
from .stac_manager import STACIngestionManager
[docs]
def stack_rasters_and_ingest_via_airflow(
s3_urls: List[str],
collection_name: str,
airflow_base_url: str,
airflow_username: str,
airflow_password: str,
convert_to_cog: Optional[bool] = None,
cog_profile: Optional[str] = None,
cog_profile_options: Optional[Dict[str, Any]] = None,
cog_overview_level: Optional[int] = None,
ingestion_batch_size: Optional[int] = None,
ttl: Optional[int] = None,
**kwargs: Any,
) -> Union[str, List[str]]:
"""Convenience function to submit raster ingestion via Airflow.
This function provides a simple, function-based interface for basic use cases
using direct Airflow DAG triggering.
For OpenEO/SQS-based submission, use :func:`stack_rasters_and_ingest_via_spatial_engine`.
For more control and advanced features, use :class:`STACIngestionManager` class directly.
Args:
s3_urls: List of S3 URLs to raster files
collection_name: STAC collection name
airflow_base_url: Airflow API URL (required)
airflow_username: Username for authentication (required)
airflow_password: Password for authentication (required)
convert_to_cog: Whether to convert to COG format (optional)
cog_profile: COG profile name (e.g., 'lzw', 'deflate') (optional)
cog_profile_options: Profile options for cog_translate (optional)
cog_overview_level: Number of overview levels (optional)
ingestion_batch_size: Batch size for ingestion (optional, default: 100)
ttl: Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days).
If not provided, defaults to 30 days. Note: In this function API, passing ttl=None
will also default to 30 days. To explicitly skip TTL for permanent items, use the
class-based API with ttl=None. The value is stored as an expiry date string in
properties["ss:ttl"] in STAC metadata, computed as today's date + ttl days
(ISO format YYYY-MM-DD).
**kwargs: Additional keyword arguments (for future extensions)
Returns:
Union[str, List[str]]: Job ID(s) for tracking the ingestion workflow.
Raises:
ValidationError: If any input parameters are invalid
APIError: If Airflow API request fails
"""
# Import here to avoid circular import
from ..clients import AirflowSTACClient
client = AirflowSTACClient(
base_url=airflow_base_url,
username=airflow_username,
password=airflow_password,
)
manager = STACIngestionManager(client=client)
# Convert None (default) to sentinel to distinguish "not provided" from "explicitly None"
ttl_to_pass = _TTL_NOT_PROVIDED if ttl is None else ttl
return manager.ingest_rasters(
raster_s3_urls=s3_urls,
collection=collection_name,
ingestion_batch_size=ingestion_batch_size,
convert_to_cog=convert_to_cog,
cog_profile=cog_profile,
cog_profile_options=cog_profile_options,
cog_overview_level=cog_overview_level,
ttl=ttl_to_pass,
)
[docs]
def stack_rasters_and_ingest_via_spatial_engine(
s3_urls: List[str],
collection_name: str,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
convert_to_cog: Optional[bool] = None,
cog_profile: Optional[str] = None,
cog_profile_options: Optional[Dict[str, Any]] = None,
cog_overview_level: Optional[int] = None,
ingestion_batch_size: Optional[int] = None,
timeout: int = 30,
ttl: Optional[int] = None,
**kwargs: Any,
) -> Union[str, List[str]]:
"""Convenience function to submit raster ingestion via spatial engine API.
This function provides a simple, function-based interface for submitting
jobs through a spatial engine API (e.g., OpenEO) with SQS queueing. Better
for large batch jobs.
For direct Airflow triggering, use :func:`stack_rasters_and_ingest`.
For more control and advanced features, use :class:`STACIngestionManager` class directly.
Args:
s3_urls: List of S3 URLs to raster files
collection_name: STAC collection name
base_url: Spatial engine API base URL (defaults to https://dev.openeo.satsure.co)
api_key: Optional API key for Bearer token authentication
convert_to_cog: Whether to convert to COG format (optional)
cog_profile: COG profile name (e.g., 'lzw', 'deflate') (optional)
cog_profile_options: Profile options for cog_translate (optional)
cog_overview_level: Number of overview levels (optional)
ingestion_batch_size: Batch size for ingestion (optional, default: 100)
timeout: Request timeout in seconds (default: 30)
ttl: Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days).
If not provided, defaults to 30 days. Note: In this function API, passing ttl=None
will also default to 30 days. To explicitly skip TTL for permanent items, use the
class-based API with ttl=None. The value is stored as properties["ss:ttl"] in STAC metadata.
**kwargs: Additional keyword arguments (for future extensions)
Returns:
Union[str, List[str]]: Job ID(s) for tracking the ingestion workflow.
Raises:
ValidationError: If any input parameters are invalid
APIError: If spatial engine API request fails
"""
# Import here to avoid circular import
from ..clients import OpenEOSTACClient
client = OpenEOSTACClient(
base_url=base_url or "https://dev.openeo.satsure.co",
api_key=api_key,
timeout=timeout,
)
manager = STACIngestionManager(client=client)
# Convert None (default) to sentinel to distinguish "not provided" from "explicitly None"
ttl_to_pass = _TTL_NOT_PROVIDED if ttl is None else ttl
return manager.ingest_rasters(
raster_s3_urls=s3_urls,
collection=collection_name,
ingestion_batch_size=ingestion_batch_size,
convert_to_cog=convert_to_cog,
cog_profile=cog_profile,
cog_profile_options=cog_profile_options,
cog_overview_level=cog_overview_level,
ttl=ttl_to_pass,
)