Source code for satorbis_kit.raster.merge_patches

"""
Patch merging module with OpenEO integration.

Provides functionality to merge raster patches using OpenEO backend.
"""

from typing import Optional
import openeo

from satorbis_kit.constants import Constants


[docs] def merge_patches( input_s3_path: str, output_s3_path: str, compress: str = "LZW", job_title: Optional[str] = None, wait_for_completion: bool = False, ) -> openeo.rest.job.BatchJob: """ Merge previously generated raster patches back into a single GeoTIFF file. This function takes a directory of raster patches (tiles) and merges them into a single continuous raster file using the OpenEO backend. It handles proper georeferencing and seamless stitching of patches. Args: input_s3_path (str): S3 URI to directory containing patches to merge. Must end with '/'. Example: "s3://bucket/patches/" output_s3_path (str): S3 URI for merged output file. Example: "s3://bucket/merged.tif" compress (str, optional): Compression method for output (LZW, DEFLATE, NONE). Defaults to "LZW". job_title (str, optional): Custom job title. If None, auto-generated from output filename. wait_for_completion (bool, optional): If True, blocks until job completes. If False, returns immediately after starting. Defaults to False. Returns: openeo.rest.job.BatchJob: OpenEO batch job object that can be monitored and managed. Use job.status() to check status. Raises: ConnectionError: If unable to connect to OpenEO backend. ValueError: If S3 paths are invalid or patches directory is empty. Examples: Basic merge operation: >>> from satorbis_kit.raster import merge_patches >>> job = merge_patches( ... input_s3_path="s3://satsure-airflow-pipelines/sentinel_patches/100cm/", ... output_s3_path="s3://satsure-airflow-pipelines/outputs/merged.tif" ... ) >>> print(job.status()) Wait for completion: >>> job = merge_patches( ... input_s3_path="s3://bucket/patches/", ... output_s3_path="s3://bucket/merged.tif", ... compress="DEFLATE", ... wait_for_completion=True ... ) >>> print(f"Merge complete: {job.status()}") Complete workflow (generate + merge): >>> from satorbis_kit.raster import generate_patches, merge_patches >>> >>> # Generate patches >>> gen_job = generate_patches( ... input_s3_file="s3://bucket/large_image.tif", ... tile_width=1024, ... tile_height=1024, ... s3_intermediate_path="temp_patches/", ... wait_for_completion=True ... ) >>> >>> # Merge patches back >>> merge_job = merge_patches( ... input_s3_path="s3://satsure-airflow-pipelines/temp_patches/", ... output_s3_path="s3://bucket/final_output.tif", ... wait_for_completion=True ... ) Using OpenEO directly: >>> import openeo >>> from satorbis_kit.constants import Constants >>> con = openeo.connect(Constants.DEV_URL) >>> cube = con.datacube_from_process( ... process_id="merge_patches", ... input_s3_path="s3://bucket/patches/", ... output_s3_path="s3://bucket/merged.tif", ... compress="LZW" ... ) >>> job = cube.create_job(title="Patch Merge Job") >>> job.start_and_wait() Note: - Requires AWS credentials configured for S3 access - OpenEO backend accessible via Constants.DEV_URL - Input directory must contain valid raster patches - Patches must have compatible georeferencing and overlap """ # Connect to OpenEO con = openeo.connect( Constants.DEV_URL, ) # Create datacube from patch merge process cube = con.datacube_from_process( process_id="merge_patches", input_s3_path=input_s3_path, output_s3_path=output_s3_path, compress=compress, ) # Create job with descriptive title if job_title is None: job_title = f"Patch Merge Job - {output_s3_path.split('/')[-1]}" job = cube.create_job(title=job_title) # Start job and optionally wait if wait_for_completion: job.start_and_wait() else: job.start() return job