Skip to content

taps.executor.dask

DaskDistributedExecutor

DaskDistributedExecutor(client: Client)

Bases: Executor

Dask task execution engine.

Parameters:

  • client (Client) –

    Dask distributed client.

Source code in taps/executor/dask.py
def __init__(self, client: Client) -> None:
    self.client = client

submit()

submit(
    function: Callable[P, T],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[T]

    Future-like object representing the result of the execution of the callable.

Source code in taps/executor/dask.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future]-like object representing \
        the result of the execution of the callable.
    """
    return self.client.submit(function, *args, **kwargs)

map()

map(
    function: Callable[P, T],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[T]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, T]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    Sets the Dask batch size.

Returns:

  • Iterator[T]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in taps/executor/dask.py
def map(
    self,
    function: Callable[P, T],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[T]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: Sets the Dask batch size.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    # Based on the Parsl implementation.
    # https://github.com/Parsl/parsl/blob/7fba7d634ccade76618ee397d3c951c5cbf2cd49/parsl/concurrent/__init__.py#L58
    futures = self.client.map(function, *iterables, batch_size=chunksize)

    def _result_iterator() -> Generator[T, None, None]:
        futures.reverse()
        while futures:
            yield futures.pop().result(timeout)

    return _result_iterator()

shutdown()

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the client.

Source code in taps/executor/dask.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the client."""
    if DaskFuture._cb_executor is not None:
        # Dask runs future callbacks in threads of a ThreadPoolExecutor
        # that is a class attributed of Dask's future. Shutting down
        # the client causes all futures to get cancelled, which can
        # cause a currently executing callback to raise a CancelledError
        # if the callback accesses the future's result.
        DaskFuture._cb_executor.shutdown(wait=wait)
        DaskFuture._cb_executor = None

    # Note: wait and cancel_futures are not implemented.
    self.client.close()

DaskDistributedConfig

Bases: ExecutorConfig

DaskDistributedExecutor plugin configuration.

Attributes:

  • scheduler (Optional[str]) –

    Dask scheduler address.

  • use_threads (bool) –

    Use threads rather than processes for local clusters.

  • workers (Optional[int]) –

    Number of Dask workers for local clusters.

  • daemon_workers (bool) –

    Daemonize Dask workers.

get_executor()

get_executor() -> DaskDistributedExecutor

Create an executor instance from the config.

Source code in taps/executor/dask.py
def get_executor(self) -> DaskDistributedExecutor:
    """Create an executor instance from the config."""
    if self.scheduler is not None:
        client = Client(self.scheduler)
    else:
        dask.config.set(
            {'distributed.worker.daemon': self.daemon_workers},
        )
        client = Client(
            n_workers=self.workers,
            processes=not self.use_threads,
            dashboard_address=None,
        )
    return DaskDistributedExecutor(client)