"""
Wherobots Vector Merge Operations
Functions for merging vector datasets on Wherobots Cloud.
"""
from datetime import datetime
from typing import List, Optional
from .wherobots_config import (
DEFAULT_API_KEY,
DEFAULT_REGION,
DEFAULT_RUNTIME,
DEFAULT_SCRIPT_BASE_URI,
DEFAULT_TIMEOUT_SECONDS,
)
from .wherobots_status import submit_job
[docs]
def merge_vectors_wherobots(
input_base_paths: List[str],
output_base_path: str,
vector_types: Optional[List[str]] = None,
api_key: Optional[str] = None,
region: Optional[str] = None,
script_base_uri: Optional[str] = None,
runtime: Optional[str] = None,
timeout_seconds: Optional[int] = None,
job_name_prefix: str = "vector-merge",
) -> dict:
"""
Submit a vector merge job to Wherobots Cloud with simplified interface.
This function abstracts away API configuration details. Users only need to
provide input/output paths.
Args:
input_base_paths: List of base path patterns for input data
(e.g., ["s3://bucket/path1/*/*/*", "s3://bucket/path2/*/*/*"])
output_base_path: Base output path (e.g., "s3://bucket/output/")
vector_types: List of vector types to process. If None, processes all types.
Options: ["building", "habitation", "imaged_area", etc.] or None for all
api_key: Wherobots API key. If None, uses hardcoded default.
region: Wherobots region. If None, uses hardcoded default.
script_base_uri: Base URI where merge scripts are stored. If None, uses hardcoded default.
runtime: Runtime size. Default: "large"
timeout_seconds: Job timeout in seconds. Default: 14400 (4 hours)
job_name_prefix: Prefix for job name. Default: "vector-merge"
Returns:
Dictionary with job submission result, including 'id' (run_id)
Example:
>>> result = merge_vectors_wherobots(
... input_base_paths=[
... "s3://bucket/QC_PASSED/matched/*/*/*",
... "s3://bucket/QC_PASSED/unmatched/*/*/*",
... ],
... output_base_path="s3://bucket/merged/",
... )
>>> run_id = result["id"]
"""
# Use defaults (hardcoded) or provided arguments
api_key = api_key or DEFAULT_API_KEY
region = region or DEFAULT_REGION
script_base_uri = script_base_uri or DEFAULT_SCRIPT_BASE_URI
runtime = runtime or DEFAULT_RUNTIME
timeout_seconds = timeout_seconds or DEFAULT_TIMEOUT_SECONDS
# Determine which script to use
if vector_types is None or (len(vector_types) == 1 and vector_types[0] == "all"):
# Use merge_all_vectors script
script_name = "wherobots_merge_all_vectors.py"
else:
# For single or specific vector types, we'd need to use a different approach
# For now, use merge_all_vectors with --vector-types filter
script_name = "wherobots_merge_all_vectors.py"
script_uri = f"{script_base_uri.rstrip('/')}/{script_name}"
# Build script arguments
script_args = [
"--output-base-path",
output_base_path,
"--input-base-paths",
] + input_base_paths
# Add vector types filter if specified
if vector_types and len(vector_types) > 0 and vector_types[0] != "all":
script_args.extend(["--vector-types"] + vector_types)
# Generate job name
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
job_name = f"{job_name_prefix}-{timestamp}"
# Submit job
result = submit_job(
api_key=api_key,
region=region,
script_uri=script_uri,
script_args=script_args,
runtime=runtime,
name=job_name,
timeout_seconds=timeout_seconds,
)
return result
[docs]
def merge_vectors_wherobots_simple(
input_paths: List[str],
output_path: str,
api_key: Optional[str] = None,
region: Optional[str] = None,
script_base_uri: Optional[str] = None,
runtime: Optional[str] = None,
timeout_seconds: Optional[int] = None,
job_name_prefix: str = "vector-merge",
) -> dict:
"""
Submit a vector merge job to Wherobots Cloud with any list of input paths.
This function accepts any list of file paths (including regex/wildcards) and merges them.
Args:
input_paths: List of input file paths/patterns (can include wildcards/regex)
(e.g., ["s3://bucket/path1/*.parquet", "s3://bucket/path2/*/*.parquet"])
output_path: Full output path where merged result will be saved
api_key: Wherobots API key (optional, uses hardcoded default if None)
region: Wherobots region (optional, uses hardcoded default if None)
script_base_uri: Base URI for scripts (optional, uses hardcoded default if None)
runtime: Runtime size (optional, defaults to "medium")
timeout_seconds: Job timeout (optional, defaults to 7200)
job_name_prefix: Job name prefix (optional)
Returns:
Dictionary with job submission result
Example:
>>> result = merge_vectors_wherobots_simple(
... input_paths=[
... "s3://bucket/path1/*/*/*_building.parquet",
... "s3://bucket/path2/*/*/*_building.parquet",
... ],
... output_path="s3://bucket/merged/building_footprint_polygon/",
... )
"""
# Use defaults (hardcoded) or provided arguments
api_key = api_key or DEFAULT_API_KEY
region = region or DEFAULT_REGION
script_base_uri = script_base_uri or DEFAULT_SCRIPT_BASE_URI
runtime = runtime or "tiny"
timeout_seconds = timeout_seconds or 7200
# Use single vector merge script
script_uri = f"{script_base_uri.rstrip('/')}/wherobots_vector_merge.py"
# Build script arguments - use paths as-is (they can contain wildcards/regex)
script_args = (
["--input-paths"] + input_paths + ["--output-path", output_path, "--format", "geoparquet"]
)
# Generate job name
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
job_name = f"{job_name_prefix}-{timestamp}"
# Submit job
result = submit_job(
api_key=api_key,
region=region,
script_uri=script_uri,
script_args=script_args,
runtime=runtime,
name=job_name,
timeout_seconds=timeout_seconds,
)
return result