Skip to content

taps.executor.dask

DaskDistributedExecutor

DaskDistributedExecutor(
    client: Client,
    *,
    wait_for_workers: int | None = None,
    wait_for_workers_timeout: float | None = None
)

Bases: Executor

Dask task execution engine.

Parameters:

  • client (Client) –

    Dask distributed client.

  • wait_for_workers (int | None, default: None ) –

    Wait for n workers to connect to the scheduler before. Useful when connecting to a remote scheduler; a local cluster created by the client already ensures workers are connected.

  • wait_for_workers_timeout (float | None, default: None ) –

    Maximum seconds to wait for workers to connect to the scheduler.

Source code in taps/executor/dask.py
def __init__(
    self,
    client: Client,
    *,
    wait_for_workers: int | None = None,
    wait_for_workers_timeout: float | None = None,
) -> None:
    self.client = client

    if wait_for_workers is not None:
        logger.debug(
            f'Waiting for {wait_for_workers} Dask worker(s) to connect '
            f'to the client (timeout: {wait_for_workers_timeout})',
        )
        self.client.wait_for_workers(
            wait_for_workers,
            timeout=wait_for_workers_timeout,
        )
        logger.debug('Dask workers connected')

submit

submit(
    function: Callable[P, T],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[T]

    Future-like object representing the result of the execution of the callable.

Source code in taps/executor/dask.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future]-like object representing \
        the result of the execution of the callable.
    """
    return self.client.submit(function, *args, **kwargs)

map

map(
    function: Callable[P, T],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[T]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, T]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    Sets the Dask batch size.

Returns:

  • Iterator[T]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in taps/executor/dask.py
def map(
    self,
    function: Callable[P, T],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[T]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: Sets the Dask batch size.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    # Based on the Parsl implementation.
    # https://github.com/Parsl/parsl/blob/7fba7d634ccade76618ee397d3c951c5cbf2cd49/parsl/concurrent/__init__.py#L58
    futures = self.client.map(
        function,
        *iterables,  # type: ignore[arg-type,unused-ignore]
        batch_size=chunksize,
    )

    def _result_iterator() -> Generator[T, None, None]:
        futures.reverse()
        while futures:
            yield futures.pop().result(timeout)

    return _result_iterator()

shutdown

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the client.

Source code in taps/executor/dask.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the client."""
    if DaskFuture._cb_executor is not None:
        # Dask runs future callbacks in threads of a ThreadPoolExecutor
        # that is a class attributed of Dask's future. Shutting down
        # the client causes all futures to get cancelled, which can
        # cause a currently executing callback to raise a CancelledError
        # if the callback accesses the future's result.
        DaskFuture._cb_executor.shutdown(wait=wait)
        DaskFuture._cb_executor = None

    # Note: wait and cancel_futures are not implemented.
    self.client.close()

DaskDistributedConfig

Bases: ExecutorConfig

DaskDistributedExecutor plugin configuration.

Parameters:

  • name (Literal[str], default: 'dask' ) –

    Executor name.

  • scheduler (str | None, default: None ) –

    Dask scheduler address.

  • use_threads (bool, default: False ) –

    Use threads instead of processes for dask workers.

  • workers (int | None, default: None ) –

    Maximum number of dask workers.

  • daemon_workers (bool, default: True ) –

    Configure if workers are daemon.

  • wait_for_workers (int | None, default: None ) –

    Wait for N workers to connect before starting. Useful when connecting to a remote scheduler.

  • wait_for_workers_timeout (float | None, default: None ) –

    Timeout (seconds) for waiting for workers to connect.

get_executor

get_executor() -> DaskDistributedExecutor

Create an executor instance from the config.

Source code in taps/executor/dask.py
def get_executor(self) -> DaskDistributedExecutor:
    """Create an executor instance from the config."""
    if self.scheduler is not None:
        client = Client(self.scheduler)
    else:
        dask.config.set(
            {'distributed.worker.daemon': self.daemon_workers},
        )
        client = Client(
            n_workers=self.workers,
            processes=not self.use_threads,
            dashboard_address=None,
        )

    return DaskDistributedExecutor(
        client,
        wait_for_workers=self.wait_for_workers,
        wait_for_workers_timeout=self.wait_for_workers_timeout,
    )