Skip to content

taps.executor.taskvine

TaskVineExecutor

TaskVineExecutor(
    *args: Any,
    cores_per_task: int,
    serverless: bool,
    debug: bool = False,
    **kwargs: Any
)

Bases: FuturesExecutor

TaskVine executor.

Extends TaskVine's FuturesExecutor to enable support for serverless mode.

Warning

The CCTools package is not installed with TaPS. We recommend installing both TaPS and CCTools into a Conda environment. See the CCTools installation guide for all options. Version 7.13.2 and later of ndcctools is required.

Parameters:

  • args (Any, default: () ) –

    Positional arguments to pass to FuturesExecutor.

  • cores_per_task (int) –

    Number of cores required by each task.

  • serverless (bool) –

    Enable serverless mode to preload libraries on workers.

  • debug (bool, default: False ) –

    Enable additional TaskVine logging.

  • kwargs (Any, default: {} ) –

    Keyword arguments to pass to FuturesExecutor.

Raises:

Source code in taps/executor/taskvine.py
def __init__(
    self,
    *args: Any,
    cores_per_task: int,
    serverless: bool,
    debug: bool = False,
    **kwargs: Any,
) -> None:
    if taskvine_import_error is not None:
        raise taskvine_import_error

    super().__init__(*args, **kwargs)
    self.lib_installed: set[str] = set()
    self.cores_per_task = cores_per_task
    self.worker_cores = kwargs['opts'].get('cores', None)
    self.serverless = serverless

    if debug:
        self.manager.tune('watch-library-logfiles', 1)

submit

submit(
    function: Callable[P, R],
    /,
    *args: args,
    **kwargs: kwargs,
) -> VineFuture[R]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, R]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • VineFuture[R]

    Future-like object representing the result of the execution of the callable.

Source code in taps/executor/taskvine.py
def submit(
    self,
    function: Callable[P, R],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> VineFuture[R]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future]-like object representing \
        the result of the execution of the callable.
    """
    if self.serverless:
        return self._submit_serverless(function, *args, **kwargs)
    else:
        fn = self.future_task(function, *args, **kwargs)
        fn.set_cores(self.cores_per_task)
        self.task_table.append(fn)
        return super().submit(fn, *args, **kwargs)

map

map(
    function: Callable[P, R],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[R]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, R]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    Sets the Dask batch size.

Returns:

  • Iterator[R]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in taps/executor/taskvine.py
def map(
    self,
    function: Callable[P, R],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[R]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: Sets the Dask batch size.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    # This is a modified version of Executor.map() that does not call
    # cancel() on the futures if a future raises an exception. TaskVine's
    # VineFuture.cancel() has an attribute lookup bug when using a
    # FutureFunctionCall.
    if timeout is not None:
        end_time = timeout + time.monotonic()

    futures = [
        self.submit(function, *args)  # type: ignore[call-arg]
        for args in zip(*iterables)
    ]

    def _result_iterator() -> Generator[R, None, None]:
        futures.reverse()
        while futures:
            if timeout is None:
                yield futures.pop().result(timeout)
            else:
                yield futures.pop().result(end_time - time.monotonic())

    return _result_iterator()

TaskVineConfig

Bases: ExecutorConfig

TaskVine executor plugin configuration.

Parameters:

  • name (Literal[str], default: 'taskvine' ) –

    Executor name.

  • cores_per_task (int, default: 1 ) –

    Number of cores per task.

  • cores_per_worker (int | None, default: None ) –

    Number of cores per worker.

  • debug (bool, default: False ) –

    Enable additional TaskVine logging.

  • factory (bool, default: True ) –

    Launch workers from a factory.

  • port (int, default: 9123 ) –

    Port of TaskVine manager.

  • serverless (bool, default: False ) –

    Use TaskVine serverless mode.

  • workers (int, default: 1 ) –

    TaskVine workers when using a factory.

get_executor

get_executor() -> FuturesExecutor

Create an executor instance from the config.

Source code in taps/executor/taskvine.py
def get_executor(self) -> FuturesExecutor:
    """Create an executor instance from the config."""
    opts: dict[str, Any] = {
        'min_workers': self.workers,
        'max_workers': self.workers,
    }
    if self.cores_per_worker is not None:
        opts['cores'] = self.cores_per_worker

    return TaskVineExecutor(
        manager_name='taps-taskvine-manager',
        port=self.port,
        opts=opts,
        cores_per_task=self.cores_per_task,
        debug=self.debug,
        factory=self.factory,
        serverless=self.serverless,
    )