Source code for satorbis_kit.pgstac.clients.airflow

"""STAC-specific Airflow client wrapper."""

from typing import Any, Dict

from ...clients.airflow import AirflowClient
from ..exceptions import APIError
from ..models import IngestionConfig, IngestionResponse
from .base import AbstractSTACClient


[docs] class AirflowSTACClient(AbstractSTACClient): """Client for STAC ingestion operations using Airflow. This class wraps the generic AirflowClient and provides STAC-specific functionality for submitting and tracking STAC ingestion jobs. Attributes: airflow_client: Generic Airflow client for API communication dag_id: DAG identifier for STAC ingestion """ DAG_ID = "stac_item_batch_creation_dag" def __init__( self, base_url: str, username: str, password: str, ): """Initialize STAC Airflow client. Args: base_url: Base URL for Airflow API (required) username: Username for basic auth (required) password: Password for basic auth (required) Raises: ImportError: If requests library is not installed """ self.airflow_client = AirflowClient( base_url=base_url, username=username, password=password, )
[docs] def submit_ingestion(self, config: IngestionConfig) -> IngestionResponse: """Submit raster stacking and STAC ingestion request. Args: config: IngestionConfig object with job parameters Returns: IngestionResponse with job ID and status Raises: APIError: If API request fails """ try: response_data = self.airflow_client.trigger_dag_run( dag_id=self.DAG_ID, config=config.to_payload(), ) # Extract job ID from Airflow response if "dag_run_id" not in response_data: raise APIError( "Airflow API response missing 'dag_run_id' field", response=str(response_data), ) # Create ingestion response return IngestionResponse.from_dict(response_data) except Exception as e: # Wrap any exceptions in APIError if isinstance(e, APIError): raise raise APIError(f"Failed to submit ingestion: {e}")
[docs] def get_job_status(self, job_id: str) -> Dict[str, Any]: """Get status of an ingestion job. Args: job_id: Job ID returned from submit_ingestion Returns: Dictionary with job status information Raises: APIError: If API request fails """ try: return self.airflow_client.get_dag_run_status( dag_id=self.DAG_ID, dag_run_id=job_id, ) except Exception as e: # Wrap any exceptions in APIError if isinstance(e, APIError): raise raise APIError(f"Failed to get job status: {e}")