"""
OpenEO Patch Generation Script with Overlap and Dask Support
Provides functionality to generate raster patches using OpenEO backend with
Dask-enabled parallel processing for improved performance on large datasets.
"""
from typing import Optional
import openeo
from satorbis_kit.constants import Constants
[docs]
def generate_patches(
input_s3_file: str,
aws_account_id: str = "686585748973",
tile_width: int = 1024,
tile_height: int = 1024,
overlap: int = 256,
output_resolution_cm: Optional[int] = None,
s3_intermediate_path: str = "sentinel_patches/50cm",
s3_bucket: str = "satsure-airflow-pipelines",
compress: str = "LZW",
output_format: str = "tif",
job_title: Optional[str] = None,
wait_for_completion: bool = False,
cost_customer: str = "agriculture-team",
cost_project: str = "crop-monitoring",
) -> openeo.rest.job.BatchJob:
"""
Generate raster patches from large GeoTIFF files using OpenEO backend with Dask support.
This function splits a large raster file into smaller, manageable patches (tiles)
for processing. It leverages the OpenEO backend with Dask-enabled parallel processing
for improved performance and supports S3 storage for both input and output.
Args:
input_s3_file (str): S3 URI to input raster file.
Example: "s3://satsure-geoscape-staging/GEOSCAPE_URBAN_BFP/M0631_Perth_Block1A_85mm_UCEM2f210_MGA2020z50_28-Oct-2024/20250109_M0631_Perth_Block1A_85mm_UCEM2f210_MGA2020z50_28-Oct-2024_tile_0001.tif"
aws_account_id (str, optional): AWS account ID used by the backend for S3 access.
Defaults to "686585748973".
tile_width (int, optional): Width of each patch in pixels. Defaults to 1024.
tile_height (int, optional): Height of each patch in pixels. Defaults to 1024.
overlap (int, optional): Overlapping pixel count between adjacent tiles.
Provides context at tile boundaries for ML/AI tasks. Defaults to 256.
output_resolution_cm (int, optional): Output resolution in centimeters.
Use 1000 for 10m, 500 for 5m, 50 for 50cm. Can be None to preserve original resolution.
Defaults to None.
s3_intermediate_path (str, optional): S3 path prefix for output patches.
Defaults to "sentinel_patches/502cm".
s3_bucket (str, optional): S3 bucket name for output storage.
Defaults to "satsure-airflow-pipelines".
compress (str, optional): Compression method (LZW, DEFLATE, NONE).
Defaults to "LZW".
output_format (str, optional): Output file format ("tif", "jpeg", "png").
Defaults to "tif".
job_title (str, optional): Custom job title. If None, auto-generated
from input filename with Dask support indication.
wait_for_completion (bool, optional): If True, blocks until job completes.
If False, returns immediately after starting. Defaults to False.
cost_customer (str, optional): Customer for cost tracking.
Defaults to "agriculture-team".
cost_project (str, optional): Project for cost tracking.
Defaults to "crop-monitoring".
Returns:
openeo.rest.job.BatchJob: OpenEO batch job object that can be monitored
and managed. Use job.status() to check status.
Raises:
ConnectionError: If unable to connect to OpenEO backend.
ValueError: If S3 paths are invalid.
Examples:
Basic usage with defaults and Dask support:
>>> from satorbis_kit.raster import generate_patches
>>> job = generate_patches(
... input_s3_file="s3://satsure-geoscape-staging/GEOSCAPE_URBAN_BFP/.tif"
... )
>>> print(job.status())
Custom tile size with preserved resolution:
>>> job = generate_patches(
... input_s3_file="s3://bucket/input.tif",
... tile_width=2048,
... tile_height=2048,
... overlap=512,
... output_resolution_cm=None, # Preserve original resolution
... s3_intermediate_path="patches/original_res",
... wait_for_completion=True
... )
Monitor job progress with Dask:
>>> job = generate_patches(input_s3_file="s3://bucket/input.tif")
>>> print(job.status()) # Check status
>>> logs = job.logs() # View logs
>>> job.start_and_wait() # Wait for completion
Using OpenEO directly with Dask-enabled process:
>>> import openeo
>>> from satorbis_kit.constants import Constants
>>> con = openeo.connect(Constants.DEV_URL)
>>> cube = con.datacube_from_process(
... process_id="generate_patches", # Uses Dask-enabled function
... input_s3_file="s3://bucket/input.tif",
... tile_width=1024,
... tile_height=1024,
... overlap=256,
... output_resolution_cm=None,
... s3_intermediate_path="sentinel_patches/502cm",
... s3_bucket="satsure-airflow-pipelines",
... compress="LZW"
... )
>>> job = cube.create_job(title="Patch Generation Job with Overlap and Dask",
... job_options={
... "cost_customer": "agriculture-team",
... "cost_project": "crop-monitoring",
... })
>>> job.start_and_wait()
Note:
- Requires AWS credentials configured for S3 access
- OpenEO backend accessible via Constants.DEV_URL
- Output patches are stored in S3 at: s3://{s3_bucket}/{s3_intermediate_path}/
- Uses Dask for parallel processing to improve performance
- Cost tracking is enabled through job_options
"""
# Connect to OpenEO
con = openeo.connect(
Constants.DEV_URL,
)
# Use datacube_from_process with generate_patches_dask for parallel processing
cube = con.datacube_from_process(
process_id="generate_patches", # Use Dask-enabled function
input_s3_file=input_s3_file,
aws_account_id=aws_account_id,
tile_width=tile_width,
tile_height=tile_height,
overlap=overlap,
output_resolution_cm=output_resolution_cm,
s3_intermediate_path=s3_intermediate_path,
s3_bucket=s3_bucket,
compress=compress,
output_format=output_format,
)
# Create job with cost tracking options
if job_title is None:
job_title = f"Patch Generation Job with Overlap and Dask - {input_s3_file.split('/')[-1]}"
job = cube.create_job(
title=job_title,
job_options={
"cost_customer": cost_customer,
"cost_project": cost_project,
},
)
# Start job and optionally wait
if wait_for_completion:
job.start_and_wait()
else:
job.start()
return job