Skip to content

taps.engine

EngineConfig

Bases: BaseModel

App engine configuration.

Parameters:

  • executor (ExecutorConfig, default: ProcessPoolConfig(name='process-pool', max_processes=4, context=None) ) –

    Executor configuration.

  • filter (FilterConfig | None, default: None ) –

    Filter configuration.

  • transformer (TransformerConfig | None, default: None ) –

    Transformer configuration.

  • task_record_file_name (str | None, default: 'tasks.jsonl' ) –

    Name of line-delimted JSON file to log task records to.

get_engine

get_engine() -> Engine

Create an engine from the configuration.

Source code in taps/engine/_config.py
def get_engine(self) -> Engine:
    """Create an engine from the configuration."""
    record_logger = (
        JSONRecordLogger(self.task_record_file_name)
        if self.task_record_file_name is not None
        else None
    )

    return Engine(
        executor=self.executor.get_executor(),
        filter_=(
            self.filter.get_filter() if self.filter is not None else None
        ),
        transformer=(
            self.transformer.get_transformer()
            if self.transformer is not None
            else None
        ),
        record_logger=record_logger,
    )

Engine

Engine(
    executor: Executor,
    *,
    filter_: Filter | None = None,
    transformer: Transformer[Any] | None = None,
    record_logger: RecordLogger | None = None
)

Application execution engine.

Parameters:

  • executor (Executor) –

    Task compute executor.

  • filter_ (Filter | None, default: None ) –

    Data filter.

  • transformer (Transformer[Any] | None, default: None ) –

    Data transformer.

  • record_logger (RecordLogger | None, default: None ) –

    Task record logger.

Source code in taps/engine/_engine.py
def __init__(
    self,
    executor: Executor,
    *,
    filter_: Filter | None = None,
    transformer: Transformer[Any] | None = None,
    record_logger: RecordLogger | None = None,
) -> None:
    self.executor = executor
    self.transformer: TaskTransformer[Any] = TaskTransformer(
        transformer,
        filter_,
    )
    self.record_logger = (
        record_logger if record_logger is not None else NullRecordLogger()
    )

    # Maps user provided functions to the Task object so they are only
    # wrapped once. This is only used for user provided functions that
    # were not already decorated with @task. This is tricky to type,
    # so we just use Any.
    self._registered_tasks: dict[Callable[[Any], Any], Task[Any, Any]] = {}

    # Internal bookkeeping
    self._running_tasks: dict[FutureProtocol[Any], TaskFuture[Any]] = {}
    self._total_tasks = 0

tasks_executed property

tasks_executed: int

Total number of tasks submitted for execution.

submit

submit(
    function: Callable[P, R], /, *args: Any, **kwargs: Any
) -> TaskFuture[R]

Schedule the callable to be executed.

This function can also accept TaskFuture objects as input to denote dependencies between a parent and this child task.

Parameters:

  • function (Callable[P, R]) –

    Task to execute or a function to turn into a Task.

  • args (Any, default: () ) –

    Positional arguments for the task.

  • kwargs (Any, default: {} ) –

    Keyword arguments for the task.

Returns:

Source code in taps/engine/_engine.py
def submit(
    self,
    function: Callable[P, R],
    /,
    *args: Any,
    **kwargs: Any,
) -> TaskFuture[R]:
    """Schedule the callable to be executed.

    This function can also accept
    [`TaskFuture`][taps.engine.TaskFuture] objects as input
    to denote dependencies between a parent and this child task.

    Args:
        function: [`Task`][taps.engine.task.Task] to execute or a function
            to turn into a [`Task`][taps.engine.task.Task].
        args: Positional arguments for the task.
        kwargs: Keyword arguments for the task.

    Returns:
        [`TaskFuture`][taps.engine.TaskFuture] object representing the \
        result of the execution of the callable accessible via \
        [`TaskFuture.result()`][taps.engine.TaskFuture.result].
    """
    task_id = uuid.uuid4()
    task = self._get_task(function)

    parents = [
        str(arg.info.task_id)
        for arg in (*args, *kwargs.values())
        if isinstance(arg, TaskFuture)
    ]
    info = TaskInfo(
        task_id=str(task_id),
        name=task.name,
        parent_task_ids=parents,
        submit_time=time.time(),
    )

    # Extract executor futures from inside TaskFuture objects
    args = tuple(
        arg.future if isinstance(arg, TaskFuture) else arg for arg in args
    )
    kwargs = {
        k: v.future if isinstance(v, TaskFuture) else v
        for k, v in kwargs.items()
    }

    args = self.transformer.transform_iterable(args)
    kwargs = self.transformer.transform_mapping(kwargs)

    future = self.executor.submit(
        task,
        *args,
        **kwargs,
        _transformer=self.transformer,
    )
    logger.log(
        TRACE_LOG_LEVEL,
        f'Submitted task to executor (id={task_id}, name={info.name}, '
        f'parents=[{", ".join(info.parent_task_ids)}])',
    )

    self._total_tasks += 1

    task_future = TaskFuture(future, info, self.transformer)
    self._running_tasks[future] = task_future
    future.add_done_callback(self._task_done_callback)

    return task_future

map

map(
    function: Callable[P, R],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[R]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, R]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    Currently not supported. If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the executor. If set to one, the items in the list will be sent one at a time.

Returns:

  • Iterator[R]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in taps/engine/_engine.py
def map(
    self,
    function: Callable[P, R],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[R]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: Currently not supported. If greater than one, the
            iterables will be chopped into chunks of size chunksize
            and submitted to the executor. If set to one, the items in the
            list will be sent one at a time.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    # Source: https://github.com/python/cpython/blob/ec1398e117fb142cc830495503dbdbb1ddafe941/Lib/concurrent/futures/_base.py#L583-L625
    if timeout is not None:
        end_time = timeout + time.monotonic()

    tasks = [self.submit(function, *args) for args in zip(*iterables)]

    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def _result_iterator() -> Generator[R, None, None]:
        # reverse to keep finishing order
        tasks.reverse()
        while tasks:
            # Careful not to keep a reference to the popped future
            if timeout is None:
                yield _result_or_cancel(tasks.pop())
            else:
                yield _result_or_cancel(
                    tasks.pop(),
                    end_time - time.monotonic(),
                )

    return _result_iterator()

shutdown

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the executor.

Parameters:

  • wait (bool, default: True ) –

    Wait on all pending futures to complete.

  • cancel_futures (bool, default: False ) –

    Cancel all pending futures that the executor has not started running. Only used in Python 3.9 and later.

Source code in taps/engine/_engine.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the executor.

    Args:
        wait: Wait on all pending futures to complete.
        cancel_futures: Cancel all pending futures that the executor
            has not started running. Only used in Python 3.9 and later.
    """
    if sys.version_info >= (3, 9):  # pragma: >=3.9 cover
        self.executor.shutdown(
            wait=wait,
            cancel_futures=cancel_futures,
        )
    else:  # pragma: <3.9 cover
        self.executor.shutdown(wait=wait)
    self.transformer.close()
    self.record_logger.close()
    logger.debug('Engine shutdown')

TaskFuture

TaskFuture(
    future: FutureProtocol[TaskResult[R]],
    info: TaskInfo,
    transformer: TaskTransformer[Any],
)

Bases: Generic[R]

Task future.

Note

This class should not be instantiated by clients.

Parameters:

Source code in taps/engine/_engine.py
def __init__(
    self,
    future: FutureProtocol[TaskResult[R]],
    info: TaskInfo,
    transformer: TaskTransformer[Any],
) -> None:
    self.info = info
    self.future = future
    self.transformer = transformer

cancel

cancel() -> bool

Attempt to cancel the task.

If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Source code in taps/engine/_engine.py
def cancel(self) -> bool:
    """Attempt to cancel the task.

    If the call is currently being executed or finished running and
    cannot be cancelled then the method will return `False`, otherwise
    the call will be cancelled and the method will return `True`.
    """
    return self.future.cancel()

done

done() -> bool

Return True is the call was successfully cancelled or finished.

Source code in taps/engine/_engine.py
def done(self) -> bool:
    """Return `True` is the call was successfully cancelled or finished."""
    return self.future.done()

exception

exception() -> BaseException | None

Get the exception raised by the task or None if successful.

Source code in taps/engine/_engine.py
def exception(self) -> BaseException | None:
    """Get the exception raised by the task or `None` if successful."""
    return self.future.exception()

result

result(timeout: float | None = None) -> R

Get the result of the task.

Parameters:

  • timeout (float | None, default: None ) –

    If the task has not finished, wait up to timeout seconds.

Returns:

  • R

    Task result if the task completed successfully.

Raises:

  • TimeoutError

    If timeout is specified and the task does not complete within timeout seconds.

Source code in taps/engine/_engine.py
def result(self, timeout: float | None = None) -> R:
    """Get the result of the task.

    Args:
        timeout: If the task has not finished, wait up to `timeout`
            seconds.

    Returns:
        Task result if the task completed successfully.

    Raises:
        TimeoutError: If `timeout` is specified and the task does not
            complete within `timeout` seconds.
    """
    task_result = self.future.result(timeout=timeout)
    result = self.transformer.resolve(task_result.value)
    return result

as_completed

as_completed(
    tasks: Sequence[TaskFuture[R]],
    timeout: float | None = None,
) -> Generator[TaskFuture[R], None, None]

Return an iterator which yields tasks as they complete.

Parameters:

  • tasks (Sequence[TaskFuture[R]]) –

    Sequence of tasks.

  • timeout (float | None, default: None ) –

    Seconds to wait for a task to complete. If no task completes in that time, a TimeoutError is raised. If timeout is None, there is no limit to the wait time.

Returns:

  • None

    Iterator which yields futures as they complete (finished or cancelled futures).

Source code in taps/engine/_engine.py
def as_completed(
    tasks: Sequence[TaskFuture[R]],
    timeout: float | None = None,
) -> Generator[TaskFuture[R], None, None]:
    """Return an iterator which yields tasks as they complete.

    Args:
        tasks: Sequence of tasks.
        timeout: Seconds to wait for a task to complete. If no task completes
            in that time, a `TimeoutError` is raised. If timeout is `None`,
            there is no limit to the wait time.

    Returns:
        Iterator which yields futures as they complete (finished or cancelled \
        futures).
    """
    if len(tasks) == 0:
        return

    futures = {task.future: task for task in tasks}
    kwargs = {'timeout': timeout}

    # as_completed is tricky to type here.
    _as_completed: Any
    if len(tasks) == 0 or isinstance(tasks[0].future, Future):
        if taskvine_available and isinstance(
            tasks[0].future,
            VineFuture,
        ):  # pragma: no cover
            _as_completed = as_completed_taskvine
        else:
            _as_completed = as_completed_python
    elif isinstance(tasks[0].future, DaskFuture):
        _as_completed = as_completed_dask
        if sys.version_info < (3, 9):  # pragma: <3.9 cover
            kwargs = {}
    else:  # pragma: no cover
        raise ValueError(f'Unsupported future type {type(tasks[0])}.')

    for completed in _as_completed(futures.keys(), **kwargs):
        yield futures[completed]

wait

wait(
    tasks: Sequence[TaskFuture[R]],
    timeout: float | None = None,
    return_when: str = "ALL_COMPLETED",
) -> tuple[set[TaskFuture[R]], set[TaskFuture[R]]]

Wait for tasks to finish.

Parameters:

  • tasks (Sequence[TaskFuture[R]]) –

    Sequence of tasks to wait on.

  • timeout (float | None, default: None ) –

    Maximum number of seconds to wait on tasks. Can be None to wait indefinitely.

  • return_when (str, default: 'ALL_COMPLETED' ) –

    Either "ALL_COMPLETED" or "FIRST_COMPLETED".

Returns:

Source code in taps/engine/_engine.py
def wait(
    tasks: Sequence[TaskFuture[R]],
    timeout: float | None = None,
    return_when: str = 'ALL_COMPLETED',
) -> tuple[set[TaskFuture[R]], set[TaskFuture[R]]]:
    """Wait for tasks to finish.

    Args:
        tasks: Sequence of tasks to wait on.
        timeout: Maximum number of seconds to wait on tasks. Can be `None` to
            wait indefinitely.
        return_when: Either `"ALL_COMPLETED"` or `"FIRST_COMPLETED"`.

    Returns:
        Tuple containing the set of completed tasks and the set of not \
        completed tasks.
    """
    result = namedtuple('result', ['done', 'not_done'])

    if len(tasks) == 0:
        return result(set(), set())

    futures = {task.future: task for task in tasks}

    if len(tasks) == 0 or isinstance(tasks[0].future, Future):
        if taskvine_available and isinstance(
            tasks[0].future,
            VineFuture,
        ):  # pragma: no cover
            _wait = wait_taskvine
        else:
            _wait = wait_python
    elif isinstance(tasks[0].future, DaskFuture):
        _wait = wait_dask
    else:  # pragma: no cover
        raise ValueError(f'Unsupported future type {type(tasks[0])}.')

    results = _wait(
        list(futures.keys()),
        timeout=timeout,
        return_when=return_when,
    )

    try:
        completed_futures, not_completed_futures = results
    except TypeError:  # pragma: no cover
        # This handles the return type of TaskVine's wait().
        completed_futures, not_completed_futures = (
            results.done,
            results.not_done,
        )

    completed_tasks = {futures[f] for f in completed_futures}
    not_completed_tasks = {futures[f] for f in not_completed_futures}

    return result(completed_tasks, not_completed_tasks)