Source code for satorbis_kit.raster.patch_generator

"""
OpenEO Patch Generation Script with Overlap and Dask Support

Provides functionality to generate raster patches using OpenEO backend with
Dask-enabled parallel processing for improved performance on large datasets.
"""

from typing import Optional
import openeo
from satorbis_kit.constants import Constants


[docs] def generate_patches( input_s3_file: str, aws_account_id: str = "686585748973", tile_width: int = 1024, tile_height: int = 1024, overlap: int = 256, output_resolution_cm: Optional[int] = None, s3_intermediate_path: str = "sentinel_patches/50cm", s3_bucket: str = "satsure-airflow-pipelines", compress: str = "LZW", output_format: str = "tif", job_title: Optional[str] = None, wait_for_completion: bool = False, cost_customer: str = "agriculture-team", cost_project: str = "crop-monitoring", ) -> openeo.rest.job.BatchJob: """ Generate raster patches from large GeoTIFF files using OpenEO backend with Dask support. This function splits a large raster file into smaller, manageable patches (tiles) for processing. It leverages the OpenEO backend with Dask-enabled parallel processing for improved performance and supports S3 storage for both input and output. Args: input_s3_file (str): S3 URI to input raster file. Example: "s3://satsure-geoscape-staging/GEOSCAPE_URBAN_BFP/M0631_Perth_Block1A_85mm_UCEM2f210_MGA2020z50_28-Oct-2024/20250109_M0631_Perth_Block1A_85mm_UCEM2f210_MGA2020z50_28-Oct-2024_tile_0001.tif" aws_account_id (str, optional): AWS account ID used by the backend for S3 access. Defaults to "686585748973". tile_width (int, optional): Width of each patch in pixels. Defaults to 1024. tile_height (int, optional): Height of each patch in pixels. Defaults to 1024. overlap (int, optional): Overlapping pixel count between adjacent tiles. Provides context at tile boundaries for ML/AI tasks. Defaults to 256. output_resolution_cm (int, optional): Output resolution in centimeters. Use 1000 for 10m, 500 for 5m, 50 for 50cm. Can be None to preserve original resolution. Defaults to None. s3_intermediate_path (str, optional): S3 path prefix for output patches. Defaults to "sentinel_patches/502cm". s3_bucket (str, optional): S3 bucket name for output storage. Defaults to "satsure-airflow-pipelines". compress (str, optional): Compression method (LZW, DEFLATE, NONE). Defaults to "LZW". output_format (str, optional): Output file format ("tif", "jpeg", "png"). Defaults to "tif". job_title (str, optional): Custom job title. If None, auto-generated from input filename with Dask support indication. wait_for_completion (bool, optional): If True, blocks until job completes. If False, returns immediately after starting. Defaults to False. cost_customer (str, optional): Customer for cost tracking. Defaults to "agriculture-team". cost_project (str, optional): Project for cost tracking. Defaults to "crop-monitoring". Returns: openeo.rest.job.BatchJob: OpenEO batch job object that can be monitored and managed. Use job.status() to check status. Raises: ConnectionError: If unable to connect to OpenEO backend. ValueError: If S3 paths are invalid. Examples: Basic usage with defaults and Dask support: >>> from satorbis_kit.raster import generate_patches >>> job = generate_patches( ... input_s3_file="s3://satsure-geoscape-staging/GEOSCAPE_URBAN_BFP/.tif" ... ) >>> print(job.status()) Custom tile size with preserved resolution: >>> job = generate_patches( ... input_s3_file="s3://bucket/input.tif", ... tile_width=2048, ... tile_height=2048, ... overlap=512, ... output_resolution_cm=None, # Preserve original resolution ... s3_intermediate_path="patches/original_res", ... wait_for_completion=True ... ) Monitor job progress with Dask: >>> job = generate_patches(input_s3_file="s3://bucket/input.tif") >>> print(job.status()) # Check status >>> logs = job.logs() # View logs >>> job.start_and_wait() # Wait for completion Using OpenEO directly with Dask-enabled process: >>> import openeo >>> from satorbis_kit.constants import Constants >>> con = openeo.connect(Constants.DEV_URL) >>> cube = con.datacube_from_process( ... process_id="generate_patches", # Uses Dask-enabled function ... input_s3_file="s3://bucket/input.tif", ... tile_width=1024, ... tile_height=1024, ... overlap=256, ... output_resolution_cm=None, ... s3_intermediate_path="sentinel_patches/502cm", ... s3_bucket="satsure-airflow-pipelines", ... compress="LZW" ... ) >>> job = cube.create_job(title="Patch Generation Job with Overlap and Dask", ... job_options={ ... "cost_customer": "agriculture-team", ... "cost_project": "crop-monitoring", ... }) >>> job.start_and_wait() Note: - Requires AWS credentials configured for S3 access - OpenEO backend accessible via Constants.DEV_URL - Output patches are stored in S3 at: s3://{s3_bucket}/{s3_intermediate_path}/ - Uses Dask for parallel processing to improve performance - Cost tracking is enabled through job_options """ # Connect to OpenEO con = openeo.connect( Constants.DEV_URL, ) # Use datacube_from_process with generate_patches_dask for parallel processing cube = con.datacube_from_process( process_id="generate_patches", # Use Dask-enabled function input_s3_file=input_s3_file, aws_account_id=aws_account_id, tile_width=tile_width, tile_height=tile_height, overlap=overlap, output_resolution_cm=output_resolution_cm, s3_intermediate_path=s3_intermediate_path, s3_bucket=s3_bucket, compress=compress, output_format=output_format, ) # Create job with cost tracking options if job_title is None: job_title = f"Patch Generation Job with Overlap and Dask - {input_s3_file.split('/')[-1]}" job = cube.create_job( title=job_title, job_options={ "cost_customer": cost_customer, "cost_project": cost_project, }, ) # Start job and optionally wait if wait_for_completion: job.start_and_wait() else: job.start() return job