Skip to content

taps.executor.utils

FutureDependencyExecutor

FutureDependencyExecutor(executor: Executor)

Bases: Executor

Executor wrapper that adds DAG-like features.

An Executor implementation that wraps another executor with logic for delaying task submission until all Future instances which are args or kwargs of a task have completed. In other words, child tasks will not be scheduled until the results of the child's parent tasks are available.

Parameters:

  • executor (Executor) –

    Executor to wrap.

Source code in taps/executor/utils.py
def __init__(self, executor: Executor) -> None:
    self.executor = executor
    self._tasks: dict[Future[Any], _Task[Any, Any]] = {}

submit

submit(
    function: Callable[P, T],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[T]

    Future object representing the result of the execution of the callable.

Source code in taps/executor/utils.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future] object representing the \
        result of the execution of the callable.
    """
    client_future: Future[T] = Future()
    task = _Task(self.executor, function, args, kwargs, client_future)
    self._tasks[client_future] = task
    client_future.add_done_callback(self._task_future_callback)
    return client_future

map

map(
    function: Callable[P, T],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[T]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, T]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the executor. If set to one, the items in the list will be sent one at a time.

Returns:

  • Iterator[T]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Raises:

Source code in taps/executor/utils.py
def map(
    self,
    function: Callable[P, T],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[T]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: If greater than one, the iterables will be chopped into
            chunks of size chunksize and submitted to the executor. If set
            to one, the items in the list will be sent one at a time.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.

    Raises:
        ValueError: if chunksize is less than one.
    """
    # Based on concurrent.futures.ProcessPoolExecutor.map()
    # https://github.com/python/cpython/blob/37959e25cbbe1d207c660b5bc9583b9bd1403f1a/Lib/concurrent/futures/process.py
    if chunksize < 1:
        raise ValueError('chunksize must be >= 1.')

    results = super().map(
        functools.partial(_process_chunk, function),
        _get_chunks(*iterables, chunksize=chunksize),
        timeout=timeout,
    )

    def _result_iterator(
        iterable: Iterator[list[T]],
    ) -> Generator[T, None, None]:
        for element in iterable:
            element.reverse()
            while element:
                yield element.pop()

    return _result_iterator(results)

shutdown

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the executor.

Parameters:

  • wait (bool, default: True ) –

    Wait on all pending futures to complete.

  • cancel_futures (bool, default: False ) –

    Cancel all pending futures that the executor has not started running. Only used in Python 3.9 and later.

Source code in taps/executor/utils.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the executor.

    Args:
        wait: Wait on all pending futures to complete.
        cancel_futures: Cancel all pending futures that the executor
            has not started running. Only used in Python 3.9 and later.
    """
    if sys.version_info >= (3, 9):  # pragma: >=3.9 cover
        self.executor.shutdown(wait=wait, cancel_futures=cancel_futures)
    else:  # pragma: <3.9 cover
        self.executor.shutdown(wait=wait)