Source code for satorbis_kit.pgstac.clients.openeo
"""STAC-specific OpenEO API client for SQS-based ingestion."""
from typing import Any, Dict, Optional
import requests
from ..exceptions import APIError
from ..models import IngestionConfig, IngestionResponse
from .base import AbstractSTACClient
[docs]
class OpenEOSTACClient(AbstractSTACClient):
"""Client for STAC ingestion operations using OpenEO API with SQS.
This client submits STAC ingestion jobs to an OpenEO API endpoint that
handles SQS queueing, providing better scalability for large batch jobs
compared to direct Airflow triggering.
Attributes:
base_url: Base URL for OpenEO API
session: Requests session for HTTP communication
"""
def __init__(
self,
base_url: str,
api_key: Optional[str] = None,
timeout: int = 30,
):
"""Initialize OpenEO STAC client.
Args:
base_url: Base URL for OpenEO API.
api_key: Optional API key for Bearer token authentication
timeout: Request timeout in seconds (default: 30)
Raises:
ImportError: If requests library is not installed
"""
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
# Create session with common headers
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/json",
"Accept": "application/json",
}
)
if self.api_key:
self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
[docs]
def submit_ingestion(self, config: IngestionConfig) -> IngestionResponse:
"""Submit raster stacking and STAC ingestion request via OpenEO API.
Args:
config: IngestionConfig object with job parameters
Returns:
IngestionResponse with job ID and status
Raises:
APIError: If API request fails
"""
endpoint = f"{self.base_url}/ss-core/v1/ingestion/submit-sqs-job"
# Build payload for OpenEO API (different format than Airflow)
payload = self._build_openeo_payload(config)
try:
response = self.session.post(
endpoint,
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
response_data = response.json()
# Extract job ID from OpenEO response
# OpenEO API returns: {"job_id": "...", "user_id": "...", "status": "...", "created": "...", "sqs_messages_sent": ...}
if "job_id" not in response_data:
raise APIError(
"OpenEO API response missing 'job_id' field",
response=str(response_data),
)
# Create ingestion response
return IngestionResponse(
job_id=response_data["job_id"],
status=response_data.get("status", "submitted"),
message=f"Job submitted successfully. SQS messages sent: {response_data.get('sqs_messages_sent', 'N/A')}",
details={
k: v
for k, v in response_data.items()
if k not in ("job_id", "status", "message")
},
)
except requests.exceptions.RequestException as e:
# Network/HTTP-level errors from the OpenEO endpoint
raise APIError(f"Failed to submit ingestion to OpenEO API: {e}")
[docs]
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get status of an ingestion job from OpenEO API.
Args:
job_id: Job ID returned from submit_ingestion
Returns:
Dictionary with job status information
Raises:
APIError: If API request fails
"""
endpoint = f"{self.base_url}/ss-core/v1/ingestion/jobs/{job_id}/status"
try:
response = self.session.get(
endpoint,
timeout=self.timeout,
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
# Network/HTTP-level errors from the OpenEO endpoint
raise APIError(f"Failed to get job status from OpenEO API: {e}")
def _build_openeo_payload(self, config: IngestionConfig) -> Dict[str, Any]:
"""Build payload for OpenEO API format.
The OpenEO API expects a flatter structure compared to Airflow's
nested "conf" wrapper.
Args:
config: IngestionConfig object
Returns:
Dictionary payload ready for OpenEO API
"""
# Use defaults if None
batch_size = (
config.ingestion_batch_size
if config.ingestion_batch_size is not None
else 100
)
params = config.parameters if config.parameters is not None else {}
payload = {
"raster_s3_urls": config.raster_s3_urls,
"collection": config.collection,
"ingestion_batch_size": batch_size,
"parameters": params,
}
# Include costing labels if provided
if config.costing_labels:
payload["costing_labels"] = config.costing_labels
return payload