Source code for satorbis_kit.raster.merge_patches
"""
Patch merging module with OpenEO integration.
Provides functionality to merge raster patches using OpenEO backend.
"""
from typing import Optional
import openeo
from satorbis_kit.constants import Constants
[docs]
def merge_patches(
input_s3_path: str,
output_s3_path: str,
compress: str = "LZW",
job_title: Optional[str] = None,
wait_for_completion: bool = False,
) -> openeo.rest.job.BatchJob:
"""
Merge previously generated raster patches back into a single GeoTIFF file.
This function takes a directory of raster patches (tiles) and merges them into
a single continuous raster file using the OpenEO backend. It handles proper
georeferencing and seamless stitching of patches.
Args:
input_s3_path (str): S3 URI to directory containing patches to merge.
Must end with '/'. Example: "s3://bucket/patches/"
output_s3_path (str): S3 URI for merged output file.
Example: "s3://bucket/merged.tif"
compress (str, optional): Compression method for output (LZW, DEFLATE, NONE).
Defaults to "LZW".
job_title (str, optional): Custom job title. If None, auto-generated
from output filename.
wait_for_completion (bool, optional): If True, blocks until job completes.
If False, returns immediately after starting. Defaults to False.
Returns:
openeo.rest.job.BatchJob: OpenEO batch job object that can be monitored
and managed. Use job.status() to check status.
Raises:
ConnectionError: If unable to connect to OpenEO backend.
ValueError: If S3 paths are invalid or patches directory is empty.
Examples:
Basic merge operation:
>>> from satorbis_kit.raster import merge_patches
>>> job = merge_patches(
... input_s3_path="s3://satsure-airflow-pipelines/sentinel_patches/100cm/",
... output_s3_path="s3://satsure-airflow-pipelines/outputs/merged.tif"
... )
>>> print(job.status())
Wait for completion:
>>> job = merge_patches(
... input_s3_path="s3://bucket/patches/",
... output_s3_path="s3://bucket/merged.tif",
... compress="DEFLATE",
... wait_for_completion=True
... )
>>> print(f"Merge complete: {job.status()}")
Complete workflow (generate + merge):
>>> from satorbis_kit.raster import generate_patches, merge_patches
>>>
>>> # Generate patches
>>> gen_job = generate_patches(
... input_s3_file="s3://bucket/large_image.tif",
... tile_width=1024,
... tile_height=1024,
... s3_intermediate_path="temp_patches/",
... wait_for_completion=True
... )
>>>
>>> # Merge patches back
>>> merge_job = merge_patches(
... input_s3_path="s3://satsure-airflow-pipelines/temp_patches/",
... output_s3_path="s3://bucket/final_output.tif",
... wait_for_completion=True
... )
Using OpenEO directly:
>>> import openeo
>>> from satorbis_kit.constants import Constants
>>> con = openeo.connect(Constants.DEV_URL)
>>> cube = con.datacube_from_process(
... process_id="merge_patches",
... input_s3_path="s3://bucket/patches/",
... output_s3_path="s3://bucket/merged.tif",
... compress="LZW"
... )
>>> job = cube.create_job(title="Patch Merge Job")
>>> job.start_and_wait()
Note:
- Requires AWS credentials configured for S3 access
- OpenEO backend accessible via Constants.DEV_URL
- Input directory must contain valid raster patches
- Patches must have compatible georeferencing and overlap
"""
# Connect to OpenEO
con = openeo.connect(
Constants.DEV_URL,
)
# Create datacube from patch merge process
cube = con.datacube_from_process(
process_id="merge_patches",
input_s3_path=input_s3_path,
output_s3_path=output_s3_path,
compress=compress,
)
# Create job with descriptive title
if job_title is None:
job_title = f"Patch Merge Job - {output_s3_path.split('/')[-1]}"
job = cube.create_job(title=job_title)
# Start job and optionally wait
if wait_for_completion:
job.start_and_wait()
else:
job.start()
return job