Source code for satorbis_kit.esri_gdb

import os
import shutil
import tempfile
import time
from pathlib import Path
from typing import Optional, Union

import fsspec
import geopandas as gpd
import pandas as pd


[docs] class ESriGDB: """ A package for converting GeoParquet files (from local or S3) to ESRI File Geodatabase format. Supports multiple input types: - Single GeoParquet file (local or S3) - CSV file containing paths to multiple parquet files - Direct GeoDataFrame input """ def __init__( self, input_path: Optional[Union[str, Path]] = None, output_gdb: Optional[Union[str, Path]] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, input_gdf: Optional[gpd.GeoDataFrame] = None, ): """ Initialize ESriGDB processor. Args: input_path: Input source. Can be: - Local parquet path: "/path/to/file.parquet" or Path object - S3 parquet path: "s3://bucket-name/path/to/file.parquet" - CSV file path: "/path/to/paths.csv" (CSV should contain parquet paths) - None if using input_gdf parameter output_gdb: Path to output ESRI File Geodatabase (.gdb file). Can be string or Path object. aws_access_key_id: Optional AWS access key ID. If not provided, will use AWS_ACCESS_KEY_ID environment variable. aws_secret_access_key: Optional AWS secret access key. If not provided, will use AWS_SECRET_ACCESS_KEY environment variable. input_gdf: Optional GeoDataFrame to use directly. If provided, input_path will be ignored. """ if input_gdf is not None: if not isinstance(input_gdf, gpd.GeoDataFrame): raise ValueError("input_gdf must be a GeoDataFrame") self.input_path = None self.input_gdf = input_gdf elif input_path is not None: self.input_path = str(input_path) self.input_gdf = None else: raise ValueError("Either input_path or input_gdf must be provided") if output_gdb is None: raise ValueError("output_gdb must be provided") output_gdb_str = str(output_gdb) self.output_gdb_is_s3 = output_gdb_str.startswith("s3://") self.s3_output_gdb: Optional[str] = None self._temp_output_dir: Optional[Path] = None if self.output_gdb_is_s3: self.s3_output_gdb = output_gdb_str.rstrip("/") self._temp_output_dir = Path(tempfile.mkdtemp(prefix="esri_gdb_output_")) self.output_gdb = self._temp_output_dir / Path(self.s3_output_gdb).name else: self.output_gdb = Path(output_gdb) # Ensure output directory exists self.output_gdb.parent.mkdir(parents=True, exist_ok=True) self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key # Store the GeoDataFrame (will be loaded in process method) self.gdf = None def _download_from_s3(self, s3_path: str) -> Path: """ Download a file from S3 to a temporary local path. Args: s3_path: S3 path to file (e.g., "s3://bucket/path/file") Returns: Path to the downloaded local file """ print(f"⬇️ Downloading from S3: {s3_path}") storage_options = { "key": self.aws_access_key_id, "secret": self.aws_secret_access_key, } fs = fsspec.filesystem("s3", **storage_options) temp_dir = Path(tempfile.mkdtemp(prefix="esri_gdb_")) local_path = temp_dir / Path(s3_path).name fs.get_file(s3_path, str(local_path)) return local_path def _read_from_csv( self, csv_path: Union[str, Path], path_column: Optional[str] = None ) -> gpd.GeoDataFrame: """ Read a CSV file containing parquet paths and combine them into one GeoDataFrame. Args: csv_path: Path to CSV file containing parquet paths path_column: Name of the column containing parquet paths. If None, will use the first column. Returns: Combined GeoDataFrame from all parquet files """ print(f"Reading CSV file: {csv_path}") csv_path_str = str(csv_path) if csv_path_str.startswith("s3://"): csv_local_path = self._download_from_s3(csv_path_str) else: csv_local_path = Path(csv_path_str) df = pd.read_csv(csv_local_path) # Determine which column contains the paths if path_column is None: path_column = df.columns[0] if path_column not in df.columns: raise ValueError( f"Column '{path_column}' not found in CSV. Available columns: {list(df.columns)}" ) parquet_paths = df[path_column].dropna().tolist() print(f"Found {len(parquet_paths)} parquet paths in CSV") # Read each parquet file and combine gdfs = [] storage_options = { "key": self.aws_access_key_id, "secret": self.aws_secret_access_key, } for i, parquet_path in enumerate(parquet_paths, 1): print(f"Reading parquet file {i}/{len(parquet_paths)}: {parquet_path}") try: gdf = gpd.read_parquet(parquet_path, storage_options=storage_options) gdfs.append(gdf) except Exception as e: print(f"⚠️ Warning: Failed to read {parquet_path}: {e}") continue if not gdfs: raise ValueError("No parquet files could be read from CSV") # Combine all GeoDataFrames print(f"Combining {len(gdfs)} GeoDataFrames into one...") combined_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True)) print(f"Combined GeoDataFrame has {len(combined_gdf)} rows") return combined_gdf def _read_from_local(self, local_path: Union[str, Path]) -> gpd.GeoDataFrame: """ Read a parquet file from local filesystem. Args: local_path: Path to local parquet file Returns: GeoDataFrame from the parquet file """ print(f"Reading local parquet file: {local_path}") return gpd.read_parquet(local_path) def _read_from_s3(self, s3_path: str) -> gpd.GeoDataFrame: """ Read a parquet file from S3. Args: s3_path: S3 path to parquet file (e.g., "s3://bucket/path/file.parquet") Returns: GeoDataFrame from the S3 parquet file """ print(f"Reading S3 parquet file: {s3_path}") storage_options = { "key": self.aws_access_key_id, "secret": self.aws_secret_access_key, } return gpd.read_parquet(s3_path, storage_options=storage_options) def _upload_to_s3(self) -> None: """ Upload the generated geodatabase directory to S3. """ if not self.output_gdb_is_s3 or self.s3_output_gdb is None: return if not self.output_gdb.exists(): raise FileNotFoundError(f"Local geodatabase not found: {self.output_gdb}") print(f"⬆️ Uploading geodatabase to S3: {self.s3_output_gdb}") storage_options = { "key": self.aws_access_key_id, "secret": self.aws_secret_access_key, } fs = fsspec.filesystem("s3", **storage_options) base_path = self.output_gdb target_prefix = self.s3_output_gdb for root, _, files in os.walk(base_path): for file_name in files: local_file = Path(root) / file_name relative_path = local_file.relative_to(base_path) remote_path = f"{target_prefix}/{relative_path.as_posix()}" fs.put_file(str(local_file), remote_path) print(f"✅ Upload complete: {self.s3_output_gdb}") def _load_data(self) -> gpd.GeoDataFrame: """ Load data from the input source (CSV, parquet file, or GeoDataFrame). Returns: GeoDataFrame ready for processing """ # If GeoDataFrame was provided directly, use it if self.input_gdf is not None: print("Using provided GeoDataFrame directly") return self.input_gdf.copy() # If input_path is None, we shouldn't be here if self.input_path is None: raise ValueError("No input source provided") input_path = self.input_path # Check if it's a CSV file if input_path.lower().endswith(".csv"): return self._read_from_csv(input_path) # Check if it's an S3 path if input_path.startswith("s3://"): return self._read_from_s3(input_path) # Otherwise, treat as local file return self._read_from_local(input_path)
[docs] def process(self) -> int: """ Process the input data and convert it to ESRI File Geodatabase format. Supports multiple input types: - Single parquet file (local or S3) - CSV file with parquet paths - Direct GeoDataFrame Groups data by class_name and geometry type, creating separate layers in the output geodatabase for each combination. Returns: Number of layers created """ start = time.time() # Load data from the input source self.gdf = self._load_data() end = time.time() print(f"Data loading execution time: {end - start:.4f} seconds") layers_created = 0 # Process each class and geometry type combination for class_name, class_gdf in self.gdf.groupby("class_name"): for geom_type, sub_gdf in class_gdf.groupby(class_gdf.geom_type): layer_name = f"{class_name.lower()}_{geom_type.lower()}" print(f"🟢 Writing layer: {layer_name} with {len(sub_gdf)} features") # Drop invalid or empty geometries sub_gdf = sub_gdf[sub_gdf.geometry.notnull() & ~sub_gdf.geometry.is_empty] try: sub_gdf.to_file(self.output_gdb, driver="OpenFileGDB", layer=layer_name) layers_created += 1 except Exception as e: print(f"❌ Failed for {layer_name}: {e}") if self.output_gdb_is_s3: self._upload_to_s3() if self._temp_output_dir is not None: shutil.rmtree(self._temp_output_dir, ignore_errors=True) return layers_created
# ============================================================================ # Simple Function API (like create_patches) # ============================================================================
[docs] def parquet_to_gdb( input_path: Optional[Union[str, Path]] = None, output_gdb: Optional[Union[str, Path]] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, input_gdf: Optional[gpd.GeoDataFrame] = None, ) -> int: """ Convert GeoParquet file(s) or GeoDataFrame to ESRI File Geodatabase format. This is a simple wrapper function that handles the entire conversion process. For advanced usage, use the ESriGDB class directly. Args: input_path: Input source. Can be: - Local parquet path: "/path/to/file.parquet" or Path object - S3 parquet path: "s3://bucket-name/path/to/file.parquet" - CSV file path: "/path/to/paths.csv" (CSV should contain parquet paths in first column) - None if using input_gdf parameter output_gdb: Path to output ESRI File Geodatabase (.gdb file). Can be string or Path object. aws_access_key_id: Optional AWS access key ID. If not provided, will use AWS_ACCESS_KEY_ID environment variable. aws_secret_access_key: Optional AWS secret access key. If not provided, will use AWS_SECRET_ACCESS_KEY environment variable. input_gdf: Optional GeoDataFrame to use directly. If provided, input_path will be ignored. Returns: Number of layers created in the geodatabase Examples: >>> from satorbis_kit import parquet_to_gdb >>> import geopandas as gpd >>> >>> # Local file >>> layers = parquet_to_gdb( ... input_path="/path/to/file.parquet", ... output_gdb="/path/to/output.gdb" ... ) >>> >>> # S3 file >>> layers = parquet_to_gdb( ... input_path="s3://bucket/path/to/file.parquet", ... output_gdb="/path/to/output.gdb", ... aws_access_key_id="your-key", ... aws_secret_access_key="your-secret" ... ) >>> >>> # CSV with multiple parquet paths >>> layers = parquet_to_gdb( ... input_path="/path/to/paths.csv", ... output_gdb="/path/to/output.gdb" ... ) >>> >>> # Direct GeoDataFrame >>> gdf = gpd.read_parquet("/path/to/file.parquet") >>> layers = parquet_to_gdb( ... input_gdf=gdf, ... output_gdb="/path/to/output.gdb" ... ) """ processor = ESriGDB( input_path=input_path, output_gdb=output_gdb, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, input_gdf=input_gdf, ) return processor.process()