satorbis_kit package

class satorbis_kit.CloudObjectStore(provider: str, store: obstore.store.ObjectStore, *, bucket: str | None = None, account_name: str | None = None, container: str | None = None, base_prefix: str = '')[source]

Bases: object

Generic helper for interacting with AWS S3 or Azure Blob via obstore.

build_url(remote_path: str) str[source]

Return the fully qualified URL for the given remote path.

download_file(remote_reference: str, destination: str | Path) Path[source]

Download a file given either a remote path or full URL.

classmethod from_aws(*, bucket: str, region: str | None = None, access_key_id: str | None = None, secret_access_key: str | None = None, session_token: str | None = None, base_prefix: str = '') CloudObjectStore[source]
classmethod from_azure(*, account_name: str, container: str, account_key: str | None = None, sas_token: str | None = None, client_id: str | None = None, client_secret: str | None = None, tenant_id: str | None = None, base_prefix: str = '') CloudObjectStore[source]
upload_file(local_path: str | Path, *, remote_path: str, overwrite: bool = False) str[source]

Upload a single file to cloud storage at the given remote path.

upload_files(uploads: Sequence[Tuple[str | Path, str]], *, overwrite: bool = False) List[str][source]

Upload multiple files. Each entry is a (local_path, remote_path) tuple.

class satorbis_kit.PatchConfig(input_tif: str, output_folder: str, patch_size: int = 2048, overlap: int = 0, downsample_factor: float | None = 1.0, driver: str = 'GTiff', rasterio_env: dict | None = None)[source]

Bases: object

Configuration for patch generation.

downsample_factor: float | None = 1.0
driver: str = 'GTiff'
input_tif: str
output_folder: str
overlap: int = 0
patch_size: int = 2048
rasterio_env: dict | None = None
class satorbis_kit.PatchGenerator(config: PatchConfig, backend: Backend | None = None)[source]

Bases: object

Main class for generating patches from GeoTIFF files.

Follows OOP principles: - Encapsulation: All patch generation logic in one class - Single Responsibility: Only responsible for patch generation - Dependency Injection: Accepts Backend for flexibility

generate(show_progress: bool = True) int[source]

Generate patches from the input GeoTIFF.

Parameters:

show_progress – Whether to show progress bar

Returns:

Number of patches created

class satorbis_kit.PatchProcessor[source]

Bases: object

Utility class for patch processing operations (SRP).

static downsample_patch(patch: numpy.ndarray, src_transform: rasterio.transform.Affine, downsample_factor: float, resampling: rasterio.enums.Resampling = rasterio.enums.Resampling.bilinear) Tuple[numpy.ndarray, rasterio.transform.Affine][source]

Downsample patch by downsample_factor.

static pad_to_size(data: numpy.ndarray, target_h: int, target_w: int, dtype) numpy.ndarray[source]

Pad data to (bands, target_h, target_w) with zeros on bottom/right.

class satorbis_kit.PatchWriter(config: PatchConfig)[source]

Bases: object

Utility class for writing patches to disk (SRP).

write_patch(patch_id: int, x: int, y: int, src_path: str) str[source]

Read, process, and write a single patch.

class satorbis_kit.STACIngestionManager(client: AbstractSTACClient, upload_handler: AbstractRasterUploadHandler | None = None)[source]

Bases: BaseSTACIngestionManager

Manager for STAC raster ingestion workflows.

This is the main public API for STAC ingestion. It provides factory methods for creating manager instances configured for different backends: - Airflow: Direct DAG triggering for smaller jobs - Spatial Engine: SQS-based queueing for large-scale jobs

The manager handles: - Input validation - Batch submission and chunking - Cloud storage configuration - Raster asset uploads with STAC-compliant naming

See also

classmethod from_airflow(airflow_base_url: str, airflow_username: str, airflow_password: str, upload_handler: AbstractRasterUploadHandler | None = None) STACIngestionManager[source]

Create manager for direct Airflow DAG triggering.

Use this method when you want to trigger Airflow DAGs directly. Suitable for smaller batch jobs.

Parameters:
  • airflow_base_url – Base URL for Airflow API (required)

  • airflow_username – Username for Airflow authentication (required)

  • airflow_password – Password for Airflow authentication (required)

  • upload_handler – Optional custom path builder for uploads

Returns:

STACIngestionManager configured for Airflow

Example

>>> manager = STACIngestionManager.from_airflow(
...     airflow_base_url="https://airflow.example.com",
...     airflow_username="admin",
...     airflow_password="secret",
... )
>>> job_id = manager.ingest_rasters(
...     raster_s3_urls=["s3://bucket/COL/20240401.tif"],
...     collection="TEST_COL",
... )
classmethod from_spatial_engine(base_url: str | None = None, api_key: str | None = None, timeout: int = 30, upload_handler: AbstractRasterUploadHandler | None = None) STACIngestionManager[source]

Create manager for spatial engine API with SQS queueing.

Use this method when you want to submit jobs through a spatial engine API (e.g., OpenEO) which handles SQS queueing. Better for large batch jobs and scalability.

Parameters:
  • base_url – Base URL for spatial engine API (defaults to https://dev.openeo.satsure.co)

  • api_key – Optional API key for Bearer token authentication

  • timeout – Request timeout in seconds (default: 30)

  • upload_handler – Optional custom path builder for uploads

Returns:

STACIngestionManager configured for spatial engine

Example

>>> manager = STACIngestionManager.from_spatial_engine(
...     base_url="https://api.example.com",
...     api_key="your-api-key",
... )
>>> job_id = manager.ingest_rasters(
...     raster_s3_urls=["s3://bucket/COL/20240401.tif"],
...     collection="TEST_COL",
... )
exception satorbis_kit.StorageError(message: str)[source]

Bases: Exception

Raised when cloud storage operations fail.

satorbis_kit.create_patches(input_tif: str, output_folder: str, patch_size: int = 2048, overlap: int = 0, downsample_factor: float | None = 1.0, rasterio_env: dict | None = None, backend: str = 'dask', scheduler_address: str | None = None, n_workers: int = 4, threads_per_worker: int = 1, processes: bool = True, show_progress: bool = True) int[source]

High-level wrapper that manages the backend lifecycle internally.

Parameters:
  • input_tif – Path to input GeoTIFF file

  • output_folder – Path to output folder for patches

  • patch_size – Size of each patch in pixels

  • overlap – Overlap between patches in pixels

  • downsample_factor – Downsampling factor (0-1, 1.0 = no downsampling)

  • rasterio_env – Rasterio environment settings

  • backend – Backend name (‘dask’ or ‘sedona’)

  • scheduler_address – Dask scheduler address (for ‘dask’ backend)

  • n_workers – Number of workers (for ‘dask’ backend)

  • threads_per_worker – Threads per worker (for ‘dask’ backend)

  • processes – Use processes instead of threads (for ‘dask’ backend)

  • show_progress – Show progress bar

Returns:

Number of patches created

satorbis_kit.parquet_to_gdb(input_path: str | Path | None = None, output_gdb: str | Path | None = None, aws_access_key_id: str | None = None, aws_secret_access_key: str | None = None, input_gdf: geopandas.GeoDataFrame | None = None) int[source]

Convert GeoParquet file(s) or GeoDataFrame to ESRI File Geodatabase format.

This is a simple wrapper function that handles the entire conversion process. For advanced usage, use the ESriGDB class directly.

Parameters:
  • input_path – Input source. Can be: - Local parquet path: “/path/to/file.parquet” or Path object - S3 parquet path: “s3://bucket-name/path/to/file.parquet” - CSV file path: “/path/to/paths.csv” (CSV should contain parquet paths in first column) - None if using input_gdf parameter

  • output_gdb – Path to output ESRI File Geodatabase (.gdb file). Can be string or Path object.

  • aws_access_key_id – Optional AWS access key ID. If not provided, will use AWS_ACCESS_KEY_ID environment variable.

  • aws_secret_access_key – Optional AWS secret access key. If not provided, will use AWS_SECRET_ACCESS_KEY environment variable.

  • input_gdf – Optional GeoDataFrame to use directly. If provided, input_path will be ignored.

Returns:

Number of layers created in the geodatabase

Examples

>>> from satorbis_kit import parquet_to_gdb
>>> import geopandas as gpd
>>>
>>> # Local file
>>> layers = parquet_to_gdb(
...     input_path="/path/to/file.parquet",
...     output_gdb="/path/to/output.gdb"
... )
>>>
>>> # S3 file
>>> layers = parquet_to_gdb(
...     input_path="s3://bucket/path/to/file.parquet",
...     output_gdb="/path/to/output.gdb",
...     aws_access_key_id="your-key",
...     aws_secret_access_key="your-secret"
... )
>>>
>>> # CSV with multiple parquet paths
>>> layers = parquet_to_gdb(
...     input_path="/path/to/paths.csv",
...     output_gdb="/path/to/output.gdb"
... )
>>>
>>> # Direct GeoDataFrame
>>> gdf = gpd.read_parquet("/path/to/file.parquet")
>>> layers = parquet_to_gdb(
...     input_gdf=gdf,
...     output_gdb="/path/to/output.gdb"
... )
satorbis_kit.stack_rasters_and_ingest_via_airflow(s3_urls: List[str], collection_name: str, airflow_base_url: str, airflow_username: str, airflow_password: str, convert_to_cog: bool | None = None, cog_profile: str | None = None, cog_profile_options: Dict[str, Any] | None = None, cog_overview_level: int | None = None, ingestion_batch_size: int | None = None, ttl: int | None = None, **kwargs: Any) str | List[str][source]

Convenience function to submit raster ingestion via Airflow.

This function provides a simple, function-based interface for basic use cases using direct Airflow DAG triggering.

For OpenEO/SQS-based submission, use stack_rasters_and_ingest_via_spatial_engine(). For more control and advanced features, use STACIngestionManager class directly.

Parameters:
  • s3_urls – List of S3 URLs to raster files

  • collection_name – STAC collection name

  • airflow_base_url – Airflow API URL (required)

  • airflow_username – Username for authentication (required)

  • airflow_password – Password for authentication (required)

  • convert_to_cog – Whether to convert to COG format (optional)

  • cog_profile – COG profile name (e.g., ‘lzw’, ‘deflate’) (optional)

  • cog_profile_options – Profile options for cog_translate (optional)

  • cog_overview_level – Number of overview levels (optional)

  • ingestion_batch_size – Batch size for ingestion (optional, default: 100)

  • ttl – Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. Note: In this function API, passing ttl=None will also default to 30 days. To explicitly skip TTL for permanent items, use the class-based API with ttl=None. The value is stored as an expiry date string in properties[“ss:ttl”] in STAC metadata, computed as today’s date + ttl days (ISO format YYYY-MM-DD).

  • **kwargs – Additional keyword arguments (for future extensions)

Returns:

Job ID(s) for tracking the ingestion workflow.

Return type:

Union[str, List[str]]

Raises:
satorbis_kit.stack_rasters_and_ingest_via_spatial_engine(s3_urls: List[str], collection_name: str, base_url: str | None = None, api_key: str | None = None, convert_to_cog: bool | None = None, cog_profile: str | None = None, cog_profile_options: Dict[str, Any] | None = None, cog_overview_level: int | None = None, ingestion_batch_size: int | None = None, timeout: int = 30, ttl: int | None = None, **kwargs: Any) str | List[str][source]

Convenience function to submit raster ingestion via spatial engine API.

This function provides a simple, function-based interface for submitting jobs through a spatial engine API (e.g., OpenEO) with SQS queueing. Better for large batch jobs.

For direct Airflow triggering, use stack_rasters_and_ingest(). For more control and advanced features, use STACIngestionManager class directly.

Parameters:
  • s3_urls – List of S3 URLs to raster files

  • collection_name – STAC collection name

  • base_url – Spatial engine API base URL (defaults to https://dev.openeo.satsure.co)

  • api_key – Optional API key for Bearer token authentication

  • convert_to_cog – Whether to convert to COG format (optional)

  • cog_profile – COG profile name (e.g., ‘lzw’, ‘deflate’) (optional)

  • cog_profile_options – Profile options for cog_translate (optional)

  • cog_overview_level – Number of overview levels (optional)

  • ingestion_batch_size – Batch size for ingestion (optional, default: 100)

  • timeout – Request timeout in seconds (default: 30)

  • ttl – Time To Live in days (optional). Integer representing days (e.g., ttl=10 means 10 days). If not provided, defaults to 30 days. Note: In this function API, passing ttl=None will also default to 30 days. To explicitly skip TTL for permanent items, use the class-based API with ttl=None. The value is stored as properties[“ss:ttl”] in STAC metadata.

  • **kwargs – Additional keyword arguments (for future extensions)

Returns:

Job ID(s) for tracking the ingestion workflow.

Return type:

Union[str, List[str]]

Raises:

Subpackages

Submodules