Source code for satorbis_kit.vector_operation.wherobots_merge

"""
Wherobots Vector Merge Operations

Functions for merging vector datasets on Wherobots Cloud.
"""

from datetime import datetime
from typing import List, Optional

from .wherobots_config import (
    DEFAULT_API_KEY,
    DEFAULT_REGION,
    DEFAULT_RUNTIME,
    DEFAULT_SCRIPT_BASE_URI,
    DEFAULT_TIMEOUT_SECONDS,
)
from .wherobots_status import submit_job


[docs] def merge_vectors_wherobots( input_base_paths: List[str], output_base_path: str, vector_types: Optional[List[str]] = None, api_key: Optional[str] = None, region: Optional[str] = None, script_base_uri: Optional[str] = None, runtime: Optional[str] = None, timeout_seconds: Optional[int] = None, job_name_prefix: str = "vector-merge", ) -> dict: """ Submit a vector merge job to Wherobots Cloud with simplified interface. This function abstracts away API configuration details. Users only need to provide input/output paths. Args: input_base_paths: List of base path patterns for input data (e.g., ["s3://bucket/path1/*/*/*", "s3://bucket/path2/*/*/*"]) output_base_path: Base output path (e.g., "s3://bucket/output/") vector_types: List of vector types to process. If None, processes all types. Options: ["building", "habitation", "imaged_area", etc.] or None for all api_key: Wherobots API key. If None, uses hardcoded default. region: Wherobots region. If None, uses hardcoded default. script_base_uri: Base URI where merge scripts are stored. If None, uses hardcoded default. runtime: Runtime size. Default: "large" timeout_seconds: Job timeout in seconds. Default: 14400 (4 hours) job_name_prefix: Prefix for job name. Default: "vector-merge" Returns: Dictionary with job submission result, including 'id' (run_id) Example: >>> result = merge_vectors_wherobots( ... input_base_paths=[ ... "s3://bucket/QC_PASSED/matched/*/*/*", ... "s3://bucket/QC_PASSED/unmatched/*/*/*", ... ], ... output_base_path="s3://bucket/merged/", ... ) >>> run_id = result["id"] """ # Use defaults (hardcoded) or provided arguments api_key = api_key or DEFAULT_API_KEY region = region or DEFAULT_REGION script_base_uri = script_base_uri or DEFAULT_SCRIPT_BASE_URI runtime = runtime or DEFAULT_RUNTIME timeout_seconds = timeout_seconds or DEFAULT_TIMEOUT_SECONDS # Determine which script to use if vector_types is None or (len(vector_types) == 1 and vector_types[0] == "all"): # Use merge_all_vectors script script_name = "wherobots_merge_all_vectors.py" else: # For single or specific vector types, we'd need to use a different approach # For now, use merge_all_vectors with --vector-types filter script_name = "wherobots_merge_all_vectors.py" script_uri = f"{script_base_uri.rstrip('/')}/{script_name}" # Build script arguments script_args = [ "--output-base-path", output_base_path, "--input-base-paths", ] + input_base_paths # Add vector types filter if specified if vector_types and len(vector_types) > 0 and vector_types[0] != "all": script_args.extend(["--vector-types"] + vector_types) # Generate job name timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") job_name = f"{job_name_prefix}-{timestamp}" # Submit job result = submit_job( api_key=api_key, region=region, script_uri=script_uri, script_args=script_args, runtime=runtime, name=job_name, timeout_seconds=timeout_seconds, ) return result
[docs] def merge_vectors_wherobots_simple( input_paths: List[str], output_path: str, api_key: Optional[str] = None, region: Optional[str] = None, script_base_uri: Optional[str] = None, runtime: Optional[str] = None, timeout_seconds: Optional[int] = None, job_name_prefix: str = "vector-merge", ) -> dict: """ Submit a vector merge job to Wherobots Cloud with any list of input paths. This function accepts any list of file paths (including regex/wildcards) and merges them. Args: input_paths: List of input file paths/patterns (can include wildcards/regex) (e.g., ["s3://bucket/path1/*.parquet", "s3://bucket/path2/*/*.parquet"]) output_path: Full output path where merged result will be saved api_key: Wherobots API key (optional, uses hardcoded default if None) region: Wherobots region (optional, uses hardcoded default if None) script_base_uri: Base URI for scripts (optional, uses hardcoded default if None) runtime: Runtime size (optional, defaults to "medium") timeout_seconds: Job timeout (optional, defaults to 7200) job_name_prefix: Job name prefix (optional) Returns: Dictionary with job submission result Example: >>> result = merge_vectors_wherobots_simple( ... input_paths=[ ... "s3://bucket/path1/*/*/*_building.parquet", ... "s3://bucket/path2/*/*/*_building.parquet", ... ], ... output_path="s3://bucket/merged/building_footprint_polygon/", ... ) """ # Use defaults (hardcoded) or provided arguments api_key = api_key or DEFAULT_API_KEY region = region or DEFAULT_REGION script_base_uri = script_base_uri or DEFAULT_SCRIPT_BASE_URI runtime = runtime or "tiny" timeout_seconds = timeout_seconds or 7200 # Use single vector merge script script_uri = f"{script_base_uri.rstrip('/')}/wherobots_vector_merge.py" # Build script arguments - use paths as-is (they can contain wildcards/regex) script_args = ( ["--input-paths"] + input_paths + ["--output-path", output_path, "--format", "geoparquet"] ) # Generate job name timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") job_name = f"{job_name_prefix}-{timestamp}" # Submit job result = submit_job( api_key=api_key, region=region, script_uri=script_uri, script_args=script_args, runtime=runtime, name=job_name, timeout_seconds=timeout_seconds, ) return result