Source code for satorbis_kit.pgstac.validations.inputs

"""Input validation for STAC ingestion workflows."""

import re
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

from pydantic import BaseModel, Field, field_validator

from ..exceptions import ValidationError
from ..models.constants import FILENAME_PATTERNS, VALID_COG_PROFILES


[docs] class IngestionInput(BaseModel): """Validated input parameters for STAC ingestion. This Pydantic model validates all user inputs before submitting to Airflow. It performs comprehensive validation including: - S3 URL format and structure - Filename patterns matching STAC GenericMetaExtractor - Date validation in filenames - Collection name format - COG profile and options - Batch size constraints - TTL (Time To Live) validation All validation errors are raised with detailed, actionable messages. Attributes: raster_s3_urls: List of S3 URLs to raster files (required) collection: STAC collection name (required) ingestion_batch_size: Batch size for ingestion (optional, default: 100) convert_to_cog: Whether to convert to COG format (optional) cog_profile: COG compression profile (optional) cog_profile_options: Profile options for cog_translate (optional) cog_overview_level: Number of overview levels (optional) lineage: Lineage information (optional) ttl: Time To Live in days (optional). If None, TTL is skipped for permanent items. If not provided, defaults to 30 days. Value must be a positive integer. Example: >>> validated = IngestionInput( ... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"], ... collection="TEST_COL", ... convert_to_cog=True, ... cog_profile="lzw", ... ) >>> # Also accepts .tiff extension >>> validated = IngestionInput( ... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tiff"], ... collection="TEST_COL", ... ) """ # Required fields raster_s3_urls: List[str] = Field( ..., min_length=1, description="List of S3 URLs to raster files" ) collection: str = Field( ..., min_length=1, max_length=100, pattern=r"^[a-zA-Z0-9_-]+$", description="STAC collection name", ) # Optional fields ingestion_batch_size: Optional[int] = Field( None, gt=0, description="Batch size for ingestion" ) convert_to_cog: Optional[bool] = Field( None, description="Whether to convert to COG format" ) cog_profile: Optional[str] = Field(None, description="COG profile name") cog_profile_options: Optional[Dict[str, Any]] = Field( None, description="Profile options for cog_translate" ) cog_overview_level: Optional[int] = Field( None, ge=0, description="Number of overview levels" ) lineage: Optional[Any] = Field(None, description="Lineage information") ttl: Optional[int] = Field( None, gt=0, description="Time To Live in days. None to skip TTL for permanent items.", ) @field_validator("raster_s3_urls") @classmethod def validate_s3_urls(cls, v: List[str]) -> List[str]: """Validate each S3 URL for format, structure, and filename patterns. Args: v: List of S3 URLs to validate Returns: Validated list of S3 URLs Raises: ValueError: If any URL fails validation """ if not v: raise ValueError("raster_s3_urls list cannot be empty") for i, url in enumerate(v): try: cls._validate_single_s3_url(url) except ValueError as e: raise ValueError(f"Invalid S3 URL at index {i}: {e}") return v @classmethod def _validate_single_s3_url(cls, url: str) -> None: """Validate a single S3 URL against all requirements. Validates: - S3 URL scheme (s3://, s3a://, s3n://) - Bucket and object key presence - .tif or .tiff file extension - Path structure (product_id/path/filename.tif or filename.tiff) - Filename pattern matching STAC GenericMetaExtractor - Date format and validity in filename Args: url: S3 URL to validate Raises: ValueError: If URL fails any validation check """ if not url: raise ValueError("S3 URL cannot be empty") if not isinstance(url, str): raise ValueError(f"S3 URL must be a string, got {type(url).__name__}") parsed = urlparse(url) # Check for s3:// scheme if parsed.scheme not in ("s3", "s3a", "s3n"): raise ValueError( f"Invalid S3 URL scheme: '{parsed.scheme}'. " f"Expected 's3://', 's3a://', or 's3n://'" ) # Check for bucket name if not parsed.netloc: raise ValueError(f"S3 URL missing bucket name: {url}") # Check for object key if not parsed.path or parsed.path == "/": raise ValueError(f"S3 URL missing object key: {url}") # Extract the object key (path without leading slash) object_key = parsed.path.lstrip("/") # Validate file extension (accept both .tif and .tiff) if not (object_key.endswith(".tif") or object_key.endswith(".tiff")): raise ValueError(f"S3 URL must point to a .tif or .tiff file. Got: {url}") # Validate path structure: must have at least product_id/filename.tif or filename.tiff path_parts = object_key.split("/") if len(path_parts) < 2: raise ValueError( f"S3 URL path must follow structure: product_id/path/filename.tif or filename.tiff. Got: {url}" ) # Extract filename (last part) filename = path_parts[-1] # Validate filename matches one of the expected patterns pattern_matched = False for pattern in FILENAME_PATTERNS: if re.match(pattern, filename): pattern_matched = True break if not pattern_matched: raise ValueError( f"S3 URL filename does not match expected patterns. " f"Filename must follow one of these formats:\n" f" - YYYYMMDD.tif or .tiff (e.g., 20240601.tif)\n" f" - YYYYMMDD_TILE.tif or .tiff (e.g., 20240601_T21LWH.tif)\n" f" - YYYYMMDD_RID_VERSION.tif or .tiff (e.g., 20240601_RID123_VER01.tif)\n" f" - YYYYMMDD_data_VERSION_SEASON.tif or .tiff (e.g., 20240601_data_VER01_SEA02.tif)\n" f"Got filename: {filename}" ) # Validate date format in filename (YYYYMMDD) date_match = re.match(r"^(\d{8})", filename) if date_match: date_str = date_match.group(1) year = int(date_str[0:4]) month = int(date_str[4:6]) day = int(date_str[6:8]) # Basic date validation if year < 1900 or year > 2100: raise ValueError( f"Invalid year in filename: {year}. " f"Expected year between 1900 and 2100. Filename: {filename}" ) if month < 1 or month > 12: raise ValueError( f"Invalid month in filename: {month}. " f"Expected month between 01 and 12. Filename: {filename}" ) if day < 1 or day > 31: raise ValueError( f"Invalid day in filename: {day}. " f"Expected day between 01 and 31. Filename: {filename}" ) @field_validator("collection") @classmethod def validate_collection_name(cls, v: str) -> str: """Validate collection name format. Args: v: Collection name to validate Returns: Validated collection name Raises: ValueError: If collection name is invalid """ if not v: raise ValueError("collection name cannot be empty") if len(v) > 100: raise ValueError( f"collection is too long ({len(v)} chars). Maximum is 100 characters" ) if not re.match(r"^[a-zA-Z0-9_-]+$", v): raise ValueError( f"collection '{v}' contains invalid characters. " "Only alphanumeric, hyphens, and underscores are allowed" ) return v @field_validator("cog_profile") @classmethod def validate_cog_profile(cls, v: Optional[str]) -> Optional[str]: """Validate COG profile name. Args: v: COG profile name to validate Returns: Validated COG profile name Raises: ValueError: If profile is invalid """ if v is None: return v if v.lower() not in VALID_COG_PROFILES: raise ValueError( f"Invalid cog_profile: '{v}'. " f"Valid options: {', '.join(sorted(VALID_COG_PROFILES))}" ) return v @field_validator("cog_profile_options") @classmethod def validate_cog_profile_options( cls, v: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """Validate COG profile options. Args: v: COG profile options dictionary to validate Returns: Validated options dictionary Raises: ValueError: If any option is invalid """ if v is None: return v if not isinstance(v, dict): raise ValueError( f"cog_profile_options must be a dictionary, got {type(v).__name__}" ) # Validate blockxsize if "blockxsize" in v: blockxsize = v["blockxsize"] if not isinstance(blockxsize, int) or blockxsize <= 0: raise ValueError( f"blockxsize must be a positive integer, got {blockxsize}" ) # Validate blockysize if "blockysize" in v: blockysize = v["blockysize"] if not isinstance(blockysize, int) or blockysize <= 0: raise ValueError( f"blockysize must be a positive integer, got {blockysize}" ) # Validate predictor if "predictor" in v: predictor = v["predictor"] if not isinstance(predictor, int) or predictor not in (1, 2, 3): raise ValueError( f"predictor must be 1 (None), 2 (Horizontal), or 3 (FloatingPoint), got {predictor}" ) # Validate quality (for JPEG/WEBP) if "quality" in v: quality = v["quality"] if not isinstance(quality, int) or not (1 <= quality <= 100): raise ValueError( f"quality must be an integer between 1 and 100, got {quality}" ) return v @field_validator("ttl", mode="before") @classmethod def validate_ttl(cls, v: Any) -> Optional[int]: """Validate TTL value. Args: v: TTL value in days to validate (before type coercion) Returns: Validated TTL value Raises: ValueError: If TTL is invalid """ if v is None: return v # Explicitly reject booleans since bool is a subclass of int in Python # Check before Pydantic coerces True/False to 1/0 if isinstance(v, bool): raise ValueError( f"ttl must be an integer, got {type(v).__name__}. " "Use ttl=None to skip TTL for permanent items." ) # After boolean check, validate it's an integer if not isinstance(v, int): raise ValueError(f"ttl must be an integer, got {type(v).__name__}") if v <= 0: raise ValueError( f"ttl must be a positive integer (days), got {v}. " "Use ttl=None to skip TTL for permanent items." ) return v model_config = { "extra": "forbid", # Don't allow extra fields "str_strip_whitespace": True, # Strip whitespace from strings }
[docs] def validate_inputs(**kwargs) -> IngestionInput: """Validate ingestion inputs using Pydantic. This is a convenience function that wraps IngestionInput validation and converts Pydantic errors to custom ValidationError. Args: **kwargs: Keyword arguments for ingestion (see IngestionInput for fields) Returns: Validated IngestionInput model Raises: ValidationError: If validation fails with detailed error message Example: >>> validated = validate_inputs( ... raster_s3_urls=["s3://bucket/TEST_COL/path/20240401.tif"], ... collection="TEST_COL", ... ingestion_batch_size=100, ... ) """ try: return IngestionInput(**kwargs) except Exception as e: # Convert Pydantic validation errors to our custom ValidationError raise ValidationError(str(e))