Source code for satorbis_kit.pgstac.models.response

"""Internal ingestion response model."""

from typing import Any, Dict, Optional

from pydantic import BaseModel, Field


[docs] class IngestionResponse(BaseModel): """Internal response wrapper returned by ingestion backends.""" job_id: str status: str message: Optional[str] = None details: Dict[str, Any] = Field(default_factory=dict)
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "IngestionResponse": """Create from an Airflow DAG-trigger response dict.""" return cls( job_id=data["dag_run_id"], status=data.get("state", "submitted"), message=data.get("message"), details={k: v for k, v in data.items() if k not in ("dag_run_id", "state", "message")}, )
model_config = { "extra": "forbid", }