import os
import shutil
import tempfile
import time
from pathlib import Path
from typing import Optional, Union
import fsspec
import geopandas as gpd
import pandas as pd
[docs]
class ESriGDB:
"""
A package for converting GeoParquet files (from local or S3) to ESRI File Geodatabase format.
Supports multiple input types:
- Single GeoParquet file (local or S3)
- CSV file containing paths to multiple parquet files
- Direct GeoDataFrame input
"""
def __init__(
self,
input_path: Optional[Union[str, Path]] = None,
output_gdb: Optional[Union[str, Path]] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
input_gdf: Optional[gpd.GeoDataFrame] = None,
):
"""
Initialize ESriGDB processor.
Args:
input_path: Input source. Can be:
- Local parquet path: "/path/to/file.parquet" or Path object
- S3 parquet path: "s3://bucket-name/path/to/file.parquet"
- CSV file path: "/path/to/paths.csv" (CSV should contain parquet paths)
- None if using input_gdf parameter
output_gdb: Path to output ESRI File Geodatabase (.gdb file).
Can be string or Path object.
aws_access_key_id: Optional AWS access key ID. If not provided,
will use AWS_ACCESS_KEY_ID environment variable.
aws_secret_access_key: Optional AWS secret access key. If not provided,
will use AWS_SECRET_ACCESS_KEY environment variable.
input_gdf: Optional GeoDataFrame to use directly. If provided,
input_path will be ignored.
"""
if input_gdf is not None:
if not isinstance(input_gdf, gpd.GeoDataFrame):
raise ValueError("input_gdf must be a GeoDataFrame")
self.input_path = None
self.input_gdf = input_gdf
elif input_path is not None:
self.input_path = str(input_path)
self.input_gdf = None
else:
raise ValueError("Either input_path or input_gdf must be provided")
if output_gdb is None:
raise ValueError("output_gdb must be provided")
output_gdb_str = str(output_gdb)
self.output_gdb_is_s3 = output_gdb_str.startswith("s3://")
self.s3_output_gdb: Optional[str] = None
self._temp_output_dir: Optional[Path] = None
if self.output_gdb_is_s3:
self.s3_output_gdb = output_gdb_str.rstrip("/")
self._temp_output_dir = Path(tempfile.mkdtemp(prefix="esri_gdb_output_"))
self.output_gdb = self._temp_output_dir / Path(self.s3_output_gdb).name
else:
self.output_gdb = Path(output_gdb)
# Ensure output directory exists
self.output_gdb.parent.mkdir(parents=True, exist_ok=True)
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
# Store the GeoDataFrame (will be loaded in process method)
self.gdf = None
def _download_from_s3(self, s3_path: str) -> Path:
"""
Download a file from S3 to a temporary local path.
Args:
s3_path: S3 path to file (e.g., "s3://bucket/path/file")
Returns:
Path to the downloaded local file
"""
print(f"⬇️ Downloading from S3: {s3_path}")
storage_options = {
"key": self.aws_access_key_id,
"secret": self.aws_secret_access_key,
}
fs = fsspec.filesystem("s3", **storage_options)
temp_dir = Path(tempfile.mkdtemp(prefix="esri_gdb_"))
local_path = temp_dir / Path(s3_path).name
fs.get_file(s3_path, str(local_path))
return local_path
def _read_from_csv(
self, csv_path: Union[str, Path], path_column: Optional[str] = None
) -> gpd.GeoDataFrame:
"""
Read a CSV file containing parquet paths and combine them into one GeoDataFrame.
Args:
csv_path: Path to CSV file containing parquet paths
path_column: Name of the column containing parquet paths.
If None, will use the first column.
Returns:
Combined GeoDataFrame from all parquet files
"""
print(f"Reading CSV file: {csv_path}")
csv_path_str = str(csv_path)
if csv_path_str.startswith("s3://"):
csv_local_path = self._download_from_s3(csv_path_str)
else:
csv_local_path = Path(csv_path_str)
df = pd.read_csv(csv_local_path)
# Determine which column contains the paths
if path_column is None:
path_column = df.columns[0]
if path_column not in df.columns:
raise ValueError(
f"Column '{path_column}' not found in CSV. Available columns: {list(df.columns)}"
)
parquet_paths = df[path_column].dropna().tolist()
print(f"Found {len(parquet_paths)} parquet paths in CSV")
# Read each parquet file and combine
gdfs = []
storage_options = {
"key": self.aws_access_key_id,
"secret": self.aws_secret_access_key,
}
for i, parquet_path in enumerate(parquet_paths, 1):
print(f"Reading parquet file {i}/{len(parquet_paths)}: {parquet_path}")
try:
gdf = gpd.read_parquet(parquet_path, storage_options=storage_options)
gdfs.append(gdf)
except Exception as e:
print(f"⚠️ Warning: Failed to read {parquet_path}: {e}")
continue
if not gdfs:
raise ValueError("No parquet files could be read from CSV")
# Combine all GeoDataFrames
print(f"Combining {len(gdfs)} GeoDataFrames into one...")
combined_gdf = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True))
print(f"Combined GeoDataFrame has {len(combined_gdf)} rows")
return combined_gdf
def _read_from_local(self, local_path: Union[str, Path]) -> gpd.GeoDataFrame:
"""
Read a parquet file from local filesystem.
Args:
local_path: Path to local parquet file
Returns:
GeoDataFrame from the parquet file
"""
print(f"Reading local parquet file: {local_path}")
return gpd.read_parquet(local_path)
def _read_from_s3(self, s3_path: str) -> gpd.GeoDataFrame:
"""
Read a parquet file from S3.
Args:
s3_path: S3 path to parquet file (e.g., "s3://bucket/path/file.parquet")
Returns:
GeoDataFrame from the S3 parquet file
"""
print(f"Reading S3 parquet file: {s3_path}")
storage_options = {
"key": self.aws_access_key_id,
"secret": self.aws_secret_access_key,
}
return gpd.read_parquet(s3_path, storage_options=storage_options)
def _upload_to_s3(self) -> None:
"""
Upload the generated geodatabase directory to S3.
"""
if not self.output_gdb_is_s3 or self.s3_output_gdb is None:
return
if not self.output_gdb.exists():
raise FileNotFoundError(f"Local geodatabase not found: {self.output_gdb}")
print(f"⬆️ Uploading geodatabase to S3: {self.s3_output_gdb}")
storage_options = {
"key": self.aws_access_key_id,
"secret": self.aws_secret_access_key,
}
fs = fsspec.filesystem("s3", **storage_options)
base_path = self.output_gdb
target_prefix = self.s3_output_gdb
for root, _, files in os.walk(base_path):
for file_name in files:
local_file = Path(root) / file_name
relative_path = local_file.relative_to(base_path)
remote_path = f"{target_prefix}/{relative_path.as_posix()}"
fs.put_file(str(local_file), remote_path)
print(f"✅ Upload complete: {self.s3_output_gdb}")
def _load_data(self) -> gpd.GeoDataFrame:
"""
Load data from the input source (CSV, parquet file, or GeoDataFrame).
Returns:
GeoDataFrame ready for processing
"""
# If GeoDataFrame was provided directly, use it
if self.input_gdf is not None:
print("Using provided GeoDataFrame directly")
return self.input_gdf.copy()
# If input_path is None, we shouldn't be here
if self.input_path is None:
raise ValueError("No input source provided")
input_path = self.input_path
# Check if it's a CSV file
if input_path.lower().endswith(".csv"):
return self._read_from_csv(input_path)
# Check if it's an S3 path
if input_path.startswith("s3://"):
return self._read_from_s3(input_path)
# Otherwise, treat as local file
return self._read_from_local(input_path)
[docs]
def process(self) -> int:
"""
Process the input data and convert it to ESRI File Geodatabase format.
Supports multiple input types:
- Single parquet file (local or S3)
- CSV file with parquet paths
- Direct GeoDataFrame
Groups data by class_name and geometry type, creating separate layers
in the output geodatabase for each combination.
Returns:
Number of layers created
"""
start = time.time()
# Load data from the input source
self.gdf = self._load_data()
end = time.time()
print(f"Data loading execution time: {end - start:.4f} seconds")
layers_created = 0
# Process each class and geometry type combination
for class_name, class_gdf in self.gdf.groupby("class_name"):
for geom_type, sub_gdf in class_gdf.groupby(class_gdf.geom_type):
layer_name = f"{class_name.lower()}_{geom_type.lower()}"
print(f"🟢 Writing layer: {layer_name} with {len(sub_gdf)} features")
# Drop invalid or empty geometries
sub_gdf = sub_gdf[sub_gdf.geometry.notnull() & ~sub_gdf.geometry.is_empty]
try:
sub_gdf.to_file(self.output_gdb, driver="OpenFileGDB", layer=layer_name)
layers_created += 1
except Exception as e:
print(f"❌ Failed for {layer_name}: {e}")
if self.output_gdb_is_s3:
self._upload_to_s3()
if self._temp_output_dir is not None:
shutil.rmtree(self._temp_output_dir, ignore_errors=True)
return layers_created
# ============================================================================
# Simple Function API (like create_patches)
# ============================================================================
[docs]
def parquet_to_gdb(
input_path: Optional[Union[str, Path]] = None,
output_gdb: Optional[Union[str, Path]] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
input_gdf: Optional[gpd.GeoDataFrame] = None,
) -> int:
"""
Convert GeoParquet file(s) or GeoDataFrame to ESRI File Geodatabase format.
This is a simple wrapper function that handles the entire conversion process.
For advanced usage, use the ESriGDB class directly.
Args:
input_path: Input source. Can be:
- Local parquet path: "/path/to/file.parquet" or Path object
- S3 parquet path: "s3://bucket-name/path/to/file.parquet"
- CSV file path: "/path/to/paths.csv" (CSV should contain parquet paths in first column)
- None if using input_gdf parameter
output_gdb: Path to output ESRI File Geodatabase (.gdb file).
Can be string or Path object.
aws_access_key_id: Optional AWS access key ID. If not provided,
will use AWS_ACCESS_KEY_ID environment variable.
aws_secret_access_key: Optional AWS secret access key. If not provided,
will use AWS_SECRET_ACCESS_KEY environment variable.
input_gdf: Optional GeoDataFrame to use directly. If provided,
input_path will be ignored.
Returns:
Number of layers created in the geodatabase
Examples:
>>> from satorbis_kit import parquet_to_gdb
>>> import geopandas as gpd
>>>
>>> # Local file
>>> layers = parquet_to_gdb(
... input_path="/path/to/file.parquet",
... output_gdb="/path/to/output.gdb"
... )
>>>
>>> # S3 file
>>> layers = parquet_to_gdb(
... input_path="s3://bucket/path/to/file.parquet",
... output_gdb="/path/to/output.gdb",
... aws_access_key_id="your-key",
... aws_secret_access_key="your-secret"
... )
>>>
>>> # CSV with multiple parquet paths
>>> layers = parquet_to_gdb(
... input_path="/path/to/paths.csv",
... output_gdb="/path/to/output.gdb"
... )
>>>
>>> # Direct GeoDataFrame
>>> gdf = gpd.read_parquet("/path/to/file.parquet")
>>> layers = parquet_to_gdb(
... input_gdf=gdf,
... output_gdb="/path/to/output.gdb"
... )
"""
processor = ESriGDB(
input_path=input_path,
output_gdb=output_gdb,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
input_gdf=input_gdf,
)
return processor.process()