Skip to content

taps.executor.dask

DaskDistributedExecutor

DaskDistributedExecutor(client: Client)

Bases: Executor

Dask task execution engine.

Parameters:

  • client (Client) –

    Dask distributed client.

Source code in taps/executor/dask.py
def __init__(self, client: Client) -> None:
    self.client = client

submit()

submit(
    function: Callable[P, T],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[T]

    Future-like object representing the result of the execution of the callable.

Source code in taps/executor/dask.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future]-like object representing \
        the result of the execution of the callable.
    """
    return self.client.submit(function, *args, **kwargs)

map()

map(
    function: Callable[P, T],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[T]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, T]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    Sets the Dask batch size.

Returns:

  • Iterator[T]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in taps/executor/dask.py
def map(
    self,
    function: Callable[P, T],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[T]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: Sets the Dask batch size.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    # Based on the Parsl implementation.
    # https://github.com/Parsl/parsl/blob/7fba7d634ccade76618ee397d3c951c5cbf2cd49/parsl/concurrent/__init__.py#L58
    futures = self.client.map(function, *iterables, batch_size=chunksize)

    def _result_iterator() -> Generator[T, None, None]:
        futures.reverse()
        while futures:
            yield futures.pop().result(timeout)

    return _result_iterator()

shutdown()

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the client.

Source code in taps/executor/dask.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the client."""
    if DaskFuture._cb_executor is not None:
        # Dask runs future callbacks in threads of a ThreadPoolExecutor
        # that is a class attributed of Dask's future. Shutting down
        # the client causes all futures to get cancelled, which can
        # cause a currently executing callback to raise a CancelledError
        # if the callback accesses the future's result.
        DaskFuture._cb_executor.shutdown(wait=wait)
        DaskFuture._cb_executor = None

    # Note: wait and cancel_futures are not implemented.
    self.client.close()

DaskDistributedConfig

Bases: ExecutorConfig

Dask Distributed configuration.

Attributes:

  • dask_scheduler_address (Optional[str]) –

    Dask scheduler address.

  • dask_use_threads (bool) –

    Use threads rather than processes for local clusters.

  • dask_workers (Optional[int]) –

    Number of Dask workers for local clusters.

add_argument_group() classmethod

add_argument_group(
    parser: ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True
) -> None

Add model fields as arguments of an argument group on the parser.

Parameters:

  • parser (ArgumentParser) –

    Parser to add a new argument group to.

  • argv (Sequence[str] | None, default: None ) –

    Optional sequence of string arguments.

  • required (bool, default: True ) –

    Mark arguments without defaults as required.

Source code in taps/config.py
@classmethod
def add_argument_group(
    cls,
    parser: argparse.ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True,
) -> None:
    """Add model fields as arguments of an argument group on the parser.

    Args:
        parser: Parser to add a new argument group to.
        argv: Optional sequence of string arguments.
        required: Mark arguments without defaults as required.
    """
    group = parser.add_argument_group(cls.__name__)
    for field_name, field_info in cls.model_fields.items():
        arg_name = field_name.replace('_', '-').lower()
        group.add_argument(
            f'--{arg_name}',
            dest=field_name,
            # type=field_info.annotation,
            default=field_info.get_default(),
            required=field_info.is_required() and required,
            help=field_info.description,
        )

get_executor()

get_executor() -> DaskDistributedExecutor

Create an executor instance from the config.

Source code in taps/executor/dask.py
def get_executor(self) -> DaskDistributedExecutor:
    """Create an executor instance from the config."""
    if self.dask_scheduler_address is not None:
        client = Client(self.dask_scheduler_address)
    else:
        dask.config.set(
            {'distributed.worker.daemon': self.dask_daemon_workers},
        )
        client = Client(
            n_workers=self.dask_workers,
            processes=not self.dask_use_threads,
            dashboard_address=None,
        )
    return DaskDistributedExecutor(client)