Source code for satorbis_kit.dask_utils
from contextlib import contextmanager
from typing import Iterator, Optional
from dask.distributed import Client, LocalCluster
[docs]
@contextmanager
def dask_client(
scheduler_address: Optional[str] = None,
n_workers: int = 4,
threads_per_worker: int = 1,
processes: bool = True,
) -> Iterator[Client]:
"""Factory/context manager for a Dask ``Client``.
- If ``scheduler_address`` is provided, connects to that scheduler.
- Otherwise, starts a local ``LocalCluster`` with the given worker config.
- Set ``processes=False`` to run threads-only workers (useful on macOS to avoid spawn issues).
Yields an active ``Client`` and guarantees cleanup on exit.
"""
client: Optional[Client] = None
cluster: Optional[LocalCluster] = None
try:
if scheduler_address:
client = Client(scheduler_address)
else:
cluster = LocalCluster(
n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes
)
client = Client(cluster)
yield client
finally:
if client is not None:
client.close()
if cluster is not None:
cluster.close()