Source code for satorbis_kit.pgstac.validations.inputs
"""Input validation for STAC ingestion workflows."""
import re
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from pydantic import BaseModel, Field, field_validator
from ..exceptions import ValidationError
from ..models.constants import FILENAME_PATTERNS, VALID_COG_PROFILES
[docs]
class IngestionInput(BaseModel):
"""Validated input parameters for STAC ingestion.
This Pydantic model validates all user inputs before submitting to Airflow.
It performs comprehensive validation including:
- S3 URL format and structure
- Filename patterns matching STAC GenericMetaExtractor
- Date validation in filenames
- Collection name format
- COG profile and options
- Batch size constraints
- TTL (Time To Live) validation
All validation errors are raised with detailed, actionable messages.
Attributes:
raster_s3_urls: List of S3 URLs to raster files (required)
collection: STAC collection name (required)
ingestion_batch_size: Batch size for ingestion (optional, default: 100)
convert_to_cog: Whether to convert to COG format (optional)
cog_profile: COG compression profile (optional)
cog_profile_options: Profile options for cog_translate (optional)
cog_overview_level: Number of overview levels (optional)
lineage: Lineage information (optional)
ttl: Time To Live in days (optional). If None, TTL is skipped for permanent items.
If not provided, defaults to 30 days. Value must be a positive integer.
Example:
>>> validated = IngestionInput(
... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"],
... collection="TEST_COL",
... convert_to_cog=True,
... cog_profile="lzw",
... )
>>> # Also accepts .tiff extension
>>> validated = IngestionInput(
... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tiff"],
... collection="TEST_COL",
... )
"""
# Required fields
raster_s3_urls: List[str] = Field(
..., min_length=1, description="List of S3 URLs to raster files"
)
collection: str = Field(
...,
min_length=1,
max_length=100,
pattern=r"^[a-zA-Z0-9_-]+$",
description="STAC collection name",
)
# Optional fields
ingestion_batch_size: Optional[int] = Field(
None, gt=0, description="Batch size for ingestion"
)
convert_to_cog: Optional[bool] = Field(
None, description="Whether to convert to COG format"
)
cog_profile: Optional[str] = Field(None, description="COG profile name")
cog_profile_options: Optional[Dict[str, Any]] = Field(
None, description="Profile options for cog_translate"
)
cog_overview_level: Optional[int] = Field(
None, ge=0, description="Number of overview levels"
)
lineage: Optional[Any] = Field(None, description="Lineage information")
ttl: Optional[int] = Field(
None,
gt=0,
description="Time To Live in days. None to skip TTL for permanent items.",
)
@field_validator("raster_s3_urls")
@classmethod
def validate_s3_urls(cls, v: List[str]) -> List[str]:
"""Validate each S3 URL for format, structure, and filename patterns.
Args:
v: List of S3 URLs to validate
Returns:
Validated list of S3 URLs
Raises:
ValueError: If any URL fails validation
"""
if not v:
raise ValueError("raster_s3_urls list cannot be empty")
for i, url in enumerate(v):
try:
cls._validate_single_s3_url(url)
except ValueError as e:
raise ValueError(f"Invalid S3 URL at index {i}: {e}")
return v
@classmethod
def _validate_single_s3_url(cls, url: str) -> None:
"""Validate a single S3 URL against all requirements.
Validates:
- S3 URL scheme (s3://, s3a://, s3n://)
- Bucket and object key presence
- .tif or .tiff file extension
- Path structure (product_id/path/filename.tif or filename.tiff)
- Filename pattern matching STAC GenericMetaExtractor
- Date format and validity in filename
Args:
url: S3 URL to validate
Raises:
ValueError: If URL fails any validation check
"""
if not url:
raise ValueError("S3 URL cannot be empty")
if not isinstance(url, str):
raise ValueError(f"S3 URL must be a string, got {type(url).__name__}")
parsed = urlparse(url)
# Check for s3:// scheme
if parsed.scheme not in ("s3", "s3a", "s3n"):
raise ValueError(
f"Invalid S3 URL scheme: '{parsed.scheme}'. "
f"Expected 's3://', 's3a://', or 's3n://'"
)
# Check for bucket name
if not parsed.netloc:
raise ValueError(f"S3 URL missing bucket name: {url}")
# Check for object key
if not parsed.path or parsed.path == "/":
raise ValueError(f"S3 URL missing object key: {url}")
# Extract the object key (path without leading slash)
object_key = parsed.path.lstrip("/")
# Validate file extension (accept both .tif and .tiff)
if not (object_key.endswith(".tif") or object_key.endswith(".tiff")):
raise ValueError(f"S3 URL must point to a .tif or .tiff file. Got: {url}")
# Validate path structure: must have at least product_id/filename.tif or filename.tiff
path_parts = object_key.split("/")
if len(path_parts) < 2:
raise ValueError(
f"S3 URL path must follow structure: product_id/path/filename.tif or filename.tiff. Got: {url}"
)
# Extract filename (last part)
filename = path_parts[-1]
# Validate filename matches one of the expected patterns
pattern_matched = False
for pattern in FILENAME_PATTERNS:
if re.match(pattern, filename):
pattern_matched = True
break
if not pattern_matched:
raise ValueError(
f"S3 URL filename does not match expected patterns. "
f"Filename must follow one of these formats:\n"
f" - YYYYMMDD.tif or .tiff (e.g., 20240601.tif)\n"
f" - YYYYMMDD_TILE.tif or .tiff (e.g., 20240601_T21LWH.tif)\n"
f" - YYYYMMDD_RID_VERSION.tif or .tiff (e.g., 20240601_RID123_VER01.tif)\n"
f" - YYYYMMDD_data_VERSION_SEASON.tif or .tiff (e.g., 20240601_data_VER01_SEA02.tif)\n"
f"Got filename: {filename}"
)
# Validate date format in filename (YYYYMMDD)
date_match = re.match(r"^(\d{8})", filename)
if date_match:
date_str = date_match.group(1)
year = int(date_str[0:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
# Basic date validation
if year < 1900 or year > 2100:
raise ValueError(
f"Invalid year in filename: {year}. "
f"Expected year between 1900 and 2100. Filename: {filename}"
)
if month < 1 or month > 12:
raise ValueError(
f"Invalid month in filename: {month}. "
f"Expected month between 01 and 12. Filename: {filename}"
)
if day < 1 or day > 31:
raise ValueError(
f"Invalid day in filename: {day}. "
f"Expected day between 01 and 31. Filename: {filename}"
)
@field_validator("collection")
@classmethod
def validate_collection_name(cls, v: str) -> str:
"""Validate collection name format.
Args:
v: Collection name to validate
Returns:
Validated collection name
Raises:
ValueError: If collection name is invalid
"""
if not v:
raise ValueError("collection name cannot be empty")
if len(v) > 100:
raise ValueError(
f"collection is too long ({len(v)} chars). Maximum is 100 characters"
)
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise ValueError(
f"collection '{v}' contains invalid characters. "
"Only alphanumeric, hyphens, and underscores are allowed"
)
return v
@field_validator("cog_profile")
@classmethod
def validate_cog_profile(cls, v: Optional[str]) -> Optional[str]:
"""Validate COG profile name.
Args:
v: COG profile name to validate
Returns:
Validated COG profile name
Raises:
ValueError: If profile is invalid
"""
if v is None:
return v
if v.lower() not in VALID_COG_PROFILES:
raise ValueError(
f"Invalid cog_profile: '{v}'. "
f"Valid options: {', '.join(sorted(VALID_COG_PROFILES))}"
)
return v
@field_validator("cog_profile_options")
@classmethod
def validate_cog_profile_options(
cls, v: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Validate COG profile options.
Args:
v: COG profile options dictionary to validate
Returns:
Validated options dictionary
Raises:
ValueError: If any option is invalid
"""
if v is None:
return v
if not isinstance(v, dict):
raise ValueError(
f"cog_profile_options must be a dictionary, got {type(v).__name__}"
)
# Validate blockxsize
if "blockxsize" in v:
blockxsize = v["blockxsize"]
if not isinstance(blockxsize, int) or blockxsize <= 0:
raise ValueError(
f"blockxsize must be a positive integer, got {blockxsize}"
)
# Validate blockysize
if "blockysize" in v:
blockysize = v["blockysize"]
if not isinstance(blockysize, int) or blockysize <= 0:
raise ValueError(
f"blockysize must be a positive integer, got {blockysize}"
)
# Validate predictor
if "predictor" in v:
predictor = v["predictor"]
if not isinstance(predictor, int) or predictor not in (1, 2, 3):
raise ValueError(
f"predictor must be 1 (None), 2 (Horizontal), or 3 (FloatingPoint), got {predictor}"
)
# Validate quality (for JPEG/WEBP)
if "quality" in v:
quality = v["quality"]
if not isinstance(quality, int) or not (1 <= quality <= 100):
raise ValueError(
f"quality must be an integer between 1 and 100, got {quality}"
)
return v
@field_validator("ttl", mode="before")
@classmethod
def validate_ttl(cls, v: Any) -> Optional[int]:
"""Validate TTL value.
Args:
v: TTL value in days to validate (before type coercion)
Returns:
Validated TTL value
Raises:
ValueError: If TTL is invalid
"""
if v is None:
return v
# Explicitly reject booleans since bool is a subclass of int in Python
# Check before Pydantic coerces True/False to 1/0
if isinstance(v, bool):
raise ValueError(
f"ttl must be an integer, got {type(v).__name__}. "
"Use ttl=None to skip TTL for permanent items."
)
# After boolean check, validate it's an integer
if not isinstance(v, int):
raise ValueError(f"ttl must be an integer, got {type(v).__name__}")
if v <= 0:
raise ValueError(
f"ttl must be a positive integer (days), got {v}. "
"Use ttl=None to skip TTL for permanent items."
)
return v
model_config = {
"extra": "forbid", # Don't allow extra fields
"str_strip_whitespace": True, # Strip whitespace from strings
}
[docs]
def validate_inputs(**kwargs) -> IngestionInput:
"""Validate ingestion inputs using Pydantic.
This is a convenience function that wraps IngestionInput validation
and converts Pydantic errors to custom ValidationError.
Args:
**kwargs: Keyword arguments for ingestion (see IngestionInput for fields)
Returns:
Validated IngestionInput model
Raises:
ValidationError: If validation fails with detailed error message
Example:
>>> validated = validate_inputs(
... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"],
... collection="TEST_COL",
... ingestion_batch_size=100,
... )
"""
try:
return IngestionInput(**kwargs)
except Exception as e:
# Convert Pydantic validation errors to our custom ValidationError
raise ValidationError(str(e))