Source code for satorbis_kit.pgstac.clients.airflow
"""STAC-specific Airflow client wrapper."""
from typing import Any, Dict
from ...clients.airflow import AirflowClient
from ..exceptions import APIError
from ..models import IngestionConfig, IngestionResponse
from .base import AbstractSTACClient
[docs]
class AirflowSTACClient(AbstractSTACClient):
"""Client for STAC ingestion operations using Airflow.
This class wraps the generic AirflowClient and provides STAC-specific
functionality for submitting and tracking STAC ingestion jobs.
Attributes:
airflow_client: Generic Airflow client for API communication
dag_id: DAG identifier for STAC ingestion
"""
DAG_ID = "stac_item_batch_creation_dag"
def __init__(
self,
base_url: str,
username: str,
password: str,
):
"""Initialize STAC Airflow client.
Args:
base_url: Base URL for Airflow API (required)
username: Username for basic auth (required)
password: Password for basic auth (required)
Raises:
ImportError: If requests library is not installed
"""
self.airflow_client = AirflowClient(
base_url=base_url,
username=username,
password=password,
)
[docs]
def submit_ingestion(self, config: IngestionConfig) -> IngestionResponse:
"""Submit raster stacking and STAC ingestion request.
Args:
config: IngestionConfig object with job parameters
Returns:
IngestionResponse with job ID and status
Raises:
APIError: If API request fails
"""
try:
response_data = self.airflow_client.trigger_dag_run(
dag_id=self.DAG_ID,
config=config.to_payload(),
)
# Extract job ID from Airflow response
if "dag_run_id" not in response_data:
raise APIError(
"Airflow API response missing 'dag_run_id' field",
response=str(response_data),
)
# Create ingestion response
return IngestionResponse.from_dict(response_data)
except Exception as e:
# Wrap any exceptions in APIError
if isinstance(e, APIError):
raise
raise APIError(f"Failed to submit ingestion: {e}")
[docs]
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get status of an ingestion job.
Args:
job_id: Job ID returned from submit_ingestion
Returns:
Dictionary with job status information
Raises:
APIError: If API request fails
"""
try:
return self.airflow_client.get_dag_run_status(
dag_id=self.DAG_ID,
dag_run_id=job_id,
)
except Exception as e:
# Wrap any exceptions in APIError
if isinstance(e, APIError):
raise
raise APIError(f"Failed to get job status: {e}")