Source code for satorbis_kit.pgstac.clients.openeo

"""STAC-specific OpenEO API client for SQS-based ingestion."""

from typing import Any, Dict, Optional

import requests

from ..exceptions import APIError
from ..models import IngestionConfig, IngestionResponse
from .base import AbstractSTACClient


[docs] class OpenEOSTACClient(AbstractSTACClient): """Client for STAC ingestion operations using OpenEO API with SQS. This client submits STAC ingestion jobs to an OpenEO API endpoint that handles SQS queueing, providing better scalability for large batch jobs compared to direct Airflow triggering. Attributes: base_url: Base URL for OpenEO API session: Requests session for HTTP communication """ def __init__( self, base_url: str, api_key: Optional[str] = None, timeout: int = 30, ): """Initialize OpenEO STAC client. Args: base_url: Base URL for OpenEO API. api_key: Optional API key for Bearer token authentication timeout: Request timeout in seconds (default: 30) Raises: ImportError: If requests library is not installed """ self.base_url = base_url.rstrip("/") self.api_key = api_key self.timeout = timeout # Create session with common headers self.session = requests.Session() self.session.headers.update( { "Content-Type": "application/json", "Accept": "application/json", } ) if self.api_key: self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
[docs] def submit_ingestion(self, config: IngestionConfig) -> IngestionResponse: """Submit raster stacking and STAC ingestion request via OpenEO API. Args: config: IngestionConfig object with job parameters Returns: IngestionResponse with job ID and status Raises: APIError: If API request fails """ endpoint = f"{self.base_url}/ss-core/v1/ingestion/submit-sqs-job" # Build payload for OpenEO API (different format than Airflow) payload = self._build_openeo_payload(config) try: response = self.session.post( endpoint, json=payload, timeout=self.timeout, ) response.raise_for_status() response_data = response.json() # Extract job ID from OpenEO response # OpenEO API returns: {"job_id": "...", "user_id": "...", "status": "...", "created": "...", "sqs_messages_sent": ...} if "job_id" not in response_data: raise APIError( "OpenEO API response missing 'job_id' field", response=str(response_data), ) # Create ingestion response return IngestionResponse( job_id=response_data["job_id"], status=response_data.get("status", "submitted"), message=f"Job submitted successfully. SQS messages sent: {response_data.get('sqs_messages_sent', 'N/A')}", details={ k: v for k, v in response_data.items() if k not in ("job_id", "status", "message") }, ) except requests.exceptions.RequestException as e: # Network/HTTP-level errors from the OpenEO endpoint raise APIError(f"Failed to submit ingestion to OpenEO API: {e}")
[docs] def get_job_status(self, job_id: str) -> Dict[str, Any]: """Get status of an ingestion job from OpenEO API. Args: job_id: Job ID returned from submit_ingestion Returns: Dictionary with job status information Raises: APIError: If API request fails """ endpoint = f"{self.base_url}/ss-core/v1/ingestion/jobs/{job_id}/status" try: response = self.session.get( endpoint, timeout=self.timeout, ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: # Network/HTTP-level errors from the OpenEO endpoint raise APIError(f"Failed to get job status from OpenEO API: {e}")
def _build_openeo_payload(self, config: IngestionConfig) -> Dict[str, Any]: """Build payload for OpenEO API format. The OpenEO API expects a flatter structure compared to Airflow's nested "conf" wrapper. Args: config: IngestionConfig object Returns: Dictionary payload ready for OpenEO API """ # Use defaults if None batch_size = ( config.ingestion_batch_size if config.ingestion_batch_size is not None else 100 ) params = config.parameters if config.parameters is not None else {} payload = { "raster_s3_urls": config.raster_s3_urls, "collection": config.collection, "ingestion_batch_size": batch_size, "parameters": params, } # Include costing labels if provided if config.costing_labels: payload["costing_labels"] = config.costing_labels return payload