Source code for satorbis_kit.pgstac.models.response
"""Internal ingestion response model."""
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
[docs]
class IngestionResponse(BaseModel):
"""Internal response wrapper returned by ingestion backends."""
job_id: str
status: str
message: Optional[str] = None
details: Dict[str, Any] = Field(default_factory=dict)
[docs]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "IngestionResponse":
"""Create from an Airflow DAG-trigger response dict."""
return cls(
job_id=data["dag_run_id"],
status=data.get("state", "submitted"),
message=data.get("message"),
details={k: v for k, v in data.items() if k not in ("dag_run_id", "state", "message")},
)
model_config = {
"extra": "forbid",
}