Skip to content

taps.engine

EngineConfig

Bases: BaseModel

App engine configuration.

Attributes:

  • executor (ExecutorConfig) –

    Executor configuration.

  • filter (FilterConfig) –

    Filter configuration.

  • transformer (TransformerConfig) –

    Transformer configuration.

  • task_record_file_name (Optional[str]) –

    Name of line-delimited JSON file that task records are logged to.

get_engine()

get_engine() -> Engine

Create an engine from the configuration.

Source code in taps/engine/_config.py
def get_engine(self) -> Engine:
    """Create an engine from the configuration."""
    record_logger = (
        JSONRecordLogger(self.task_record_file_name)
        if self.task_record_file_name is not None
        else None
    )

    return Engine(
        executor=self.executor.get_executor(),
        data_filter=self.filter.get_filter(),
        data_transformer=self.transformer.get_transformer(),
        record_logger=record_logger,
    )

Engine

Engine(
    executor: Executor,
    *,
    data_filter: Filter | None = None,
    data_transformer: Transformer[Any] | None = None,
    record_logger: RecordLogger | None = None
)

Application execution engine.

Parameters:

  • executor (Executor) –

    Task compute executor.

  • data_filter (Filter | None, default: None ) –

    Data filter.

  • data_transformer (Transformer[Any] | None, default: None ) –

    Data transformer.

  • record_logger (RecordLogger | None, default: None ) –

    Task record logger.

Source code in taps/engine/_engine.py
def __init__(
    self,
    executor: Executor,
    *,
    data_filter: Filter | None = None,
    data_transformer: Transformer[Any] | None = None,
    record_logger: RecordLogger | None = None,
) -> None:
    self.executor = executor
    self.data_transformer: TaskTransformer[Any] = TaskTransformer(
        NullTransformer()
        if data_transformer is None
        else data_transformer,
        NullFilter() if data_filter is None else data_filter,
    )
    self.record_logger = (
        record_logger if record_logger is not None else NullRecordLogger()
    )

    # Maps user provided functions to the wrapped function.
    # This is tricky to type, so we just use Any.
    self._registered_tasks: dict[
        Callable[[Any], Any],
        _TaskWrapper[Any, Any],
    ] = {}

    # Internal bookkeeping
    self._running_tasks: dict[Future[Any], TaskFuture[Any]] = {}
    self._total_tasks = 0

tasks_executed property

tasks_executed: int

Total number of tasks submitted for execution.

submit()

submit(
    function: Callable[P, T], /, *args: Any, **kwargs: Any
) -> TaskFuture[T]

Schedule the callable to be executed.

This function can also accept TaskFuture objects as input to denote dependencies between a parent and this child task.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (Any, default: () ) –

    Positional arguments.

  • kwargs (Any, default: {} ) –

    Keyword arguments.

Returns:

Source code in taps/engine/_engine.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: Any,
    **kwargs: Any,
) -> TaskFuture[T]:
    """Schedule the callable to be executed.

    This function can also accept
    [`TaskFuture`][taps.engine.TaskFuture] objects as input
    to denote dependencies between a parent and this child task.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`TaskFuture`][taps.engine.TaskFuture] object representing the \
        result of the execution of the callable accessible via \
        [`TaskFuture.result()`][taps.engine.TaskFuture.result].
    """
    task_id = uuid.uuid4()

    if function not in self._registered_tasks:
        self._registered_tasks[function] = _TaskWrapper(
            function,
            task_id=task_id,
            data_transformer=self.data_transformer,
        )

    task = cast(
        Callable[P, _TaskResult[T]],
        self._registered_tasks[function],
    )

    parents = [
        str(arg.info.task_id)
        for arg in (*args, *kwargs.values())
        if isinstance(arg, TaskFuture)
    ]
    info = TaskInfo(
        task_id=str(task_id),
        function_name=function.__name__,
        parent_task_ids=parents,
        submit_time=time.time(),
    )

    # Extract executor futures from inside TaskFuture objects
    args = tuple(
        arg._future if isinstance(arg, TaskFuture) else arg for arg in args
    )
    kwargs = {
        k: v._future if isinstance(v, TaskFuture) else v
        for k, v in kwargs.items()
    }

    args = self.data_transformer.transform_iterable(args)
    kwargs = self.data_transformer.transform_mapping(kwargs)

    future = self.executor.submit(task, *args, **kwargs)
    self._total_tasks += 1

    task_future = TaskFuture(future, info, self.data_transformer)
    self._running_tasks[future] = task_future
    future.add_done_callback(self._task_done_callback)

    return task_future

map()

map(
    function: Callable[P, T],
    *iterables: Iterable[args],
    timeout: float | None = None,
    chunksize: int = 1
) -> Iterator[T]

Map a function onto iterables of arguments.

Parameters:

  • function (Callable[P, T]) –

    A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[args], default: () ) –

    Variable number of iterables.

  • timeout (float | None, default: None ) –

    The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize (int, default: 1 ) –

    Currently no supported. If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the executor. If set to one, the items in the list will be sent one at a time.

Returns:

  • Iterator[T]

    An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.

Source code in taps/engine/_engine.py
def map(
    self,
    function: Callable[P, T],
    *iterables: Iterable[P.args],
    timeout: float | None = None,
    chunksize: int = 1,
) -> Iterator[T]:
    """Map a function onto iterables of arguments.

    Args:
        function: A callable that will take as many arguments as there are
            passed iterables.
        iterables: Variable number of iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize: Currently no supported. If greater than one, the
            iterables will be chopped into chunks of size chunksize
            and submitted to the executor. If set to one, the items in the
            list will be sent one at a time.

    Returns:
        An iterator equivalent to: `map(func, *iterables)` but the calls \
        may be evaluated out-of-order.
    """
    # Source: https://github.com/python/cpython/blob/ec1398e117fb142cc830495503dbdbb1ddafe941/Lib/concurrent/futures/_base.py#L583-L625
    if timeout is not None:
        end_time = timeout + time.monotonic()

    tasks = [self.submit(function, *args) for args in zip(*iterables)]

    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def _result_iterator() -> Generator[T, None, None]:
        # reverse to keep finishing order
        tasks.reverse()
        while tasks:
            # Careful not to keep a reference to the popped future
            if timeout is None:
                yield _result_or_cancel(tasks.pop())
            else:
                yield _result_or_cancel(
                    tasks.pop(),
                    end_time - time.monotonic(),
                )

    return _result_iterator()

shutdown()

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the executor.

Parameters:

  • wait (bool, default: True ) –

    Wait on all pending futures to complete.

  • cancel_futures (bool, default: False ) –

    Cancel all pending futures that the executor has not started running. Only used in Python 3.9 and later.

Source code in taps/engine/_engine.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the executor.

    Args:
        wait: Wait on all pending futures to complete.
        cancel_futures: Cancel all pending futures that the executor
            has not started running. Only used in Python 3.9 and later.
    """
    if sys.version_info >= (3, 9):  # pragma: >=3.9 cover
        self.executor.shutdown(
            wait=wait,
            cancel_futures=cancel_futures,
        )
    else:  # pragma: <3.9 cover
        self.executor.shutdown(wait=wait)
    self.data_transformer.close()
    self.record_logger.close()

ExceptionInfo dataclass

ExceptionInfo(type: str, message: str, traceback: str)

Task exception information.

ExecutionInfo dataclass

ExecutionInfo(
    hostname: str,
    execution_start_time: float,
    execution_end_time: float,
    task_start_time: float,
    task_end_time: float,
    input_transform_start_time: float,
    input_transform_end_time: float,
    result_transform_start_time: float,
    result_transform_end_time: float,
)

Task execution information.

TaskFuture

TaskFuture(
    future: Future[_TaskResult[T]],
    info: TaskInfo,
    data_transformer: TaskTransformer[Any],
)

Bases: Generic[T]

Task future.

Note

This class should not be instantiated by clients.

Attributes:

  • info

    Task information and metadata.

Parameters:

  • future (Future[_TaskResult[T]]) –

    Underlying future returned by the compute executor.

  • info (TaskInfo) –

    Task information and metadata.

  • data_transformer (TaskTransformer[Any]) –

    Data transformer used to resolve the task result.

Source code in taps/engine/_engine.py
def __init__(
    self,
    future: Future[_TaskResult[T]],
    info: TaskInfo,
    data_transformer: TaskTransformer[Any],
) -> None:
    self.info = info
    self._future = future
    self._data_transformer = data_transformer

cancel()

cancel() -> bool

Attempt to cancel the task.

If the call is currently being executed or finished running and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Source code in taps/engine/_engine.py
def cancel(self) -> bool:
    """Attempt to cancel the task.

    If the call is currently being executed or finished running and
    cannot be cancelled then the method will return `False`, otherwise
    the call will be cancelled and the method will return `True`.
    """
    return self._future.cancel()

done()

done() -> bool

Return True is the call was successfully cancelled or finished.

Source code in taps/engine/_engine.py
def done(self) -> bool:
    """Return `True` is the call was successfully cancelled or finished."""
    return self._future.done()

exception()

exception() -> BaseException | None

Get the exception raised by the task or None if successful.

Source code in taps/engine/_engine.py
def exception(self) -> BaseException | None:
    """Get the exception raised by the task or `None` if successful."""
    return self._future.exception()

result()

result(timeout: float | None = None) -> T

Get the result of the task.

Parameters:

  • timeout (float | None, default: None ) –

    If the task has not finished, wait up to timeout seconds.

Returns:

  • T

    Task result if the task completed successfully.

Raises:

  • TimeoutError

    If timeout is specified and the task does not complete within timeout seconds.

Source code in taps/engine/_engine.py
def result(self, timeout: float | None = None) -> T:
    """Get the result of the task.

    Args:
        timeout: If the task has not finished, wait up to `timeout`
            seconds.

    Returns:
        Task result if the task completed successfully.

    Raises:
        TimeoutError: If `timeout` is specified and the task does not
            complete within `timeout` seconds.
    """
    task_result = self._future.result(timeout=timeout)
    result = self._data_transformer.resolve(task_result.result)
    return result

TaskInfo dataclass

TaskInfo(
    task_id: str,
    function_name: str,
    parent_task_ids: list[str],
    submit_time: float,
    received_time: float | None = None,
    success: bool | None = None,
    exception: ExceptionInfo | None = None,
    execution: ExecutionInfo | None = None,
)

Task information.

TaskTransformer

TaskTransformer(
    transformer: Transformer[IdentifierT], filter_: Filter
)

Bases: Generic[IdentifierT]

Task data transformer.

This class combines a simple object Transformer and a Filter into useful methods for transforming the positional arguments, keyword arguments, and results of tasks.

Parameters:

  • transformer (Transformer[IdentifierT]) –

    Object transformer.

  • filter_ (Filter) –

    A filter which when called on an object returns True if the object should be transformed.

Source code in taps/engine/_transform.py
def __init__(
    self,
    transformer: Transformer[IdentifierT],
    filter_: Filter,
) -> None:
    self.transformer = transformer
    self.filter_ = filter_

close()

close() -> None

Close the transformer.

Source code in taps/engine/_transform.py
def close(self) -> None:
    """Close the transformer."""
    self.transformer.close()

transform()

transform(obj: T) -> T | IdentifierT

Transform an object.

Transforms obj into an identifier if it passes the filter check. The identifier can later be used to resolve the object.

Source code in taps/engine/_transform.py
def transform(self, obj: T) -> T | IdentifierT:
    """Transform an object.

    Transforms `obj` into an identifier if it passes the filter check.
    The identifier can later be used to resolve the object.
    """
    if self.filter_(obj) and not isinstance(obj, Future):
        return self.transformer.transform(obj)
    else:
        return obj

transform_iterable()

transform_iterable(
    iterable: Iterable[T],
) -> tuple[T | IdentifierT, ...]

Transform each object in an iterable.

Source code in taps/engine/_transform.py
def transform_iterable(
    self,
    iterable: Iterable[T],
) -> tuple[T | IdentifierT, ...]:
    """Transform each object in an iterable."""
    return tuple(self.transform(obj) for obj in iterable)

transform_mapping()

transform_mapping(
    mapping: Mapping[K, T]
) -> dict[K, Any]

Transform each value in a mapping.

Source code in taps/engine/_transform.py
def transform_mapping(self, mapping: Mapping[K, T]) -> dict[K, Any]:
    """Transform each value in a mapping."""
    return {k: self.transform(v) for k, v in mapping.items()}

resolve()

resolve(obj: Any) -> Any

Resolve an object.

Resolves the object if it is an identifier, otherwise returns the passed object.

Source code in taps/engine/_transform.py
def resolve(self, obj: Any) -> Any:
    """Resolve an object.

    Resolves the object if it is an identifier, otherwise returns the
    passed object.
    """
    if self.transformer.is_identifier(obj):
        return self.transformer.resolve(obj)
    else:
        return obj

resolve_iterable()

resolve_iterable(
    iterable: Iterable[Any],
) -> tuple[Any, ...]

Resolve each object in an iterable.

Source code in taps/engine/_transform.py
def resolve_iterable(self, iterable: Iterable[Any]) -> tuple[Any, ...]:
    """Resolve each object in an iterable."""
    return tuple(self.resolve(obj) for obj in iterable)

resolve_mapping()

resolve_mapping(mapping: Mapping[K, Any]) -> dict[K, Any]

Resolve each value in a mapping.

Source code in taps/engine/_transform.py
def resolve_mapping(self, mapping: Mapping[K, Any]) -> dict[K, Any]:
    """Resolve each value in a mapping."""
    return {k: self.resolve(v) for k, v in mapping.items()}

as_completed()

as_completed(
    tasks: Sequence[TaskFuture[T]],
    timeout: float | None = None,
) -> Generator[TaskFuture[T], None, None]

Return an iterator which yields tasks as they complete.

Parameters:

  • tasks (Sequence[TaskFuture[T]]) –

    Sequence of tasks.

  • timeout (float | None, default: None ) –

    Seconds to wait for a task to complete. If no task completes in that time, a TimeoutError is raised. If timeout is None, there is no limit to the wait time.

Returns:

  • Generator[TaskFuture[T], None, None]

    Iterator which yields futures as they complete (finished or cancelled futures).

Source code in taps/engine/_engine.py
def as_completed(
    tasks: Sequence[TaskFuture[T]],
    timeout: float | None = None,
) -> Generator[TaskFuture[T], None, None]:
    """Return an iterator which yields tasks as they complete.

    Args:
        tasks: Sequence of tasks.
        timeout: Seconds to wait for a task to complete. If no task completes
            in that time, a `TimeoutError` is raised. If timeout is `None`,
            there is no limit to the wait time.

    Returns:
        Iterator which yields futures as they complete (finished or cancelled \
        futures).
    """
    futures = {task._future: task for task in tasks}

    kwargs = {'timeout': timeout}
    if len(tasks) == 0 or isinstance(tasks[0]._future, Future):
        _as_completed = as_completed_python
    elif isinstance(tasks[0]._future, DaskFuture):
        _as_completed = as_completed_dask
        if sys.version_info < (3, 9):  # pragma: <3.9 cover
            kwargs = {}
    else:  # pragma: no cover
        raise ValueError(f'Unsupported future type {type(tasks[0])}.')

    for completed in _as_completed(futures.keys(), **kwargs):
        yield futures[completed]

wait()

wait(
    tasks: Sequence[TaskFuture[T]],
    timeout: float | None = None,
    return_when: str = "ALL_COMPLETED",
) -> tuple[set[TaskFuture[T]], set[TaskFuture[T]]]

Wait for tasks to finish.

Parameters:

  • tasks (Sequence[TaskFuture[T]]) –

    Sequence of tasks to wait on.

  • timeout (float | None, default: None ) –

    Maximum number of seconds to wait on tasks. Can be None to wait indefinitely.

  • return_when (str, default: 'ALL_COMPLETED' ) –

    Either "ALL_COMPLETED" or "FIRST_COMPLETED".

Returns:

Source code in taps/engine/_engine.py
def wait(
    tasks: Sequence[TaskFuture[T]],
    timeout: float | None = None,
    return_when: str = 'ALL_COMPLETED',
) -> tuple[set[TaskFuture[T]], set[TaskFuture[T]]]:
    """Wait for tasks to finish.

    Args:
        tasks: Sequence of tasks to wait on.
        timeout: Maximum number of seconds to wait on tasks. Can be `None` to
            wait indefinitely.
        return_when: Either `"ALL_COMPLETED"` or `"FIRST_COMPLETED"`.

    Returns:
        Tuple containing the set of completed tasks and the set of not \
        completed tasks.
    """
    futures = {task._future: task for task in tasks}

    if len(tasks) == 0 or isinstance(tasks[0]._future, Future):
        _wait = wait_python
    elif isinstance(tasks[0]._future, DaskFuture):
        _wait = wait_dask
    else:  # pragma: no cover
        raise ValueError(f'Unsupported future type {type(tasks[0])}.')

    completed_futures, not_completed_futures = _wait(
        list(futures.keys()),
        timeout=timeout,
        return_when=return_when,
    )

    completed_tasks = {futures[f] for f in completed_futures}
    not_completed_tasks = {futures[f] for f in not_completed_futures}

    return (completed_tasks, not_completed_tasks)