Source code for satorbis_kit.dask_utils

from contextlib import contextmanager
from typing import Iterator, Optional

from dask.distributed import Client, LocalCluster


[docs] @contextmanager def dask_client( scheduler_address: Optional[str] = None, n_workers: int = 4, threads_per_worker: int = 1, processes: bool = True, ) -> Iterator[Client]: """Factory/context manager for a Dask ``Client``. - If ``scheduler_address`` is provided, connects to that scheduler. - Otherwise, starts a local ``LocalCluster`` with the given worker config. - Set ``processes=False`` to run threads-only workers (useful on macOS to avoid spawn issues). Yields an active ``Client`` and guarantees cleanup on exit. """ client: Optional[Client] = None cluster: Optional[LocalCluster] = None try: if scheduler_address: client = Client(scheduler_address) else: cluster = LocalCluster( n_workers=n_workers, threads_per_worker=threads_per_worker, processes=processes ) client = Client(cluster) yield client finally: if client is not None: client.close() if cluster is not None: cluster.close()