taps.engine¶
EngineConfig
¶
Bases: BaseModel
App engine configuration.
Parameters:
-
executor
(ExecutorConfig
, default:ProcessPoolConfig(name='process-pool', max_processes=4, context=None)
) –Executor configuration.
-
filter
(FilterConfig | None
, default:None
) –Filter configuration.
-
transformer
(TransformerConfig | None
, default:None
) –Transformer configuration.
-
task_record_file_name
(str | None
, default:'tasks.jsonl'
) –Name of line-delimted JSON file to log task records to.
get_engine
¶
get_engine() -> Engine
Create an engine from the configuration.
Source code in taps/engine/_config.py
Engine
¶
Engine(
executor: Executor,
*,
filter_: Filter | None = None,
transformer: Transformer[Any] | None = None,
record_logger: RecordLogger | None = None
)
Application execution engine.
Parameters:
-
executor
(Executor
) –Task compute executor.
-
filter_
(Filter | None
, default:None
) –Data filter.
-
transformer
(Transformer[Any] | None
, default:None
) –Data transformer.
-
record_logger
(RecordLogger | None
, default:None
) –Task record logger.
Source code in taps/engine/_engine.py
submit
¶
submit(
function: Callable[P, R], /, *args: Any, **kwargs: Any
) -> TaskFuture[R]
Schedule the callable to be executed.
This function can also accept
TaskFuture
objects as input
to denote dependencies between a parent and this child task.
Parameters:
-
function
(Callable[P, R]
) – -
args
(Any
, default:()
) –Positional arguments for the task.
-
kwargs
(Any
, default:{}
) –Keyword arguments for the task.
Returns:
-
TaskFuture[R]
–TaskFuture
object representing the result of the execution of the callable accessible viaTaskFuture.result()
.
Source code in taps/engine/_engine.py
map
¶
map(
function: Callable[P, R],
*iterables: Iterable[args],
timeout: float | None = None,
chunksize: int = 1
) -> Iterator[R]
Map a function onto iterables of arguments.
Parameters:
-
function
(Callable[P, R]
) –A callable that will take as many arguments as there are passed iterables.
-
iterables
(Iterable[args]
, default:()
) –Variable number of iterables.
-
timeout
(float | None
, default:None
) –The maximum number of seconds to wait. If None, then there is no limit on the wait time.
-
chunksize
(int
, default:1
) –Currently not supported. If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the executor. If set to one, the items in the list will be sent one at a time.
Returns:
-
Iterator[R]
–An iterator equivalent to:
map(func, *iterables)
but the calls may be evaluated out-of-order.
Source code in taps/engine/_engine.py
shutdown
¶
Shutdown the executor.
Parameters:
-
wait
(bool
, default:True
) –Wait on all pending futures to complete.
-
cancel_futures
(bool
, default:False
) –Cancel all pending futures that the executor has not started running. Only used in Python 3.9 and later.
Source code in taps/engine/_engine.py
TaskFuture
¶
TaskFuture(
future: FutureProtocol[TaskResult[R]],
info: TaskInfo,
transformer: TaskTransformer[Any],
)
Bases: Generic[R]
Task future.
Note
This class should not be instantiated by clients.
Parameters:
-
future
(FutureProtocol[TaskResult[R]]
) –Underlying future returned by the compute executor.
-
info
(TaskInfo
) –Task information and metadata.
-
transformer
(TaskTransformer[Any]
) –Transformer used to resolve the task result.
Source code in taps/engine/_engine.py
cancel
¶
cancel() -> bool
Attempt to cancel the task.
If the call is currently being executed or finished running and
cannot be cancelled then the method will return False
, otherwise
the call will be cancelled and the method will return True
.
Source code in taps/engine/_engine.py
exception
¶
exception() -> BaseException | None
result
¶
result(timeout: float | None = None) -> R
Get the result of the task.
Parameters:
-
timeout
(float | None
, default:None
) –If the task has not finished, wait up to
timeout
seconds.
Returns:
-
R
–Task result if the task completed successfully.
Raises:
-
TimeoutError
–If
timeout
is specified and the task does not complete withintimeout
seconds.
Source code in taps/engine/_engine.py
as_completed
¶
as_completed(
tasks: Sequence[TaskFuture[R]],
timeout: float | None = None,
) -> Generator[TaskFuture[R], None, None]
Return an iterator which yields tasks as they complete.
Parameters:
-
tasks
(Sequence[TaskFuture[R]]
) –Sequence of tasks.
-
timeout
(float | None
, default:None
) –Seconds to wait for a task to complete. If no task completes in that time, a
TimeoutError
is raised. If timeout isNone
, there is no limit to the wait time.
Returns:
-
None
–Iterator which yields futures as they complete (finished or cancelled futures).
Source code in taps/engine/_engine.py
wait
¶
wait(
tasks: Sequence[TaskFuture[R]],
timeout: float | None = None,
return_when: str = "ALL_COMPLETED",
) -> tuple[set[TaskFuture[R]], set[TaskFuture[R]]]
Wait for tasks to finish.
Parameters:
-
tasks
(Sequence[TaskFuture[R]]
) –Sequence of tasks to wait on.
-
timeout
(float | None
, default:None
) –Maximum number of seconds to wait on tasks. Can be
None
to wait indefinitely. -
return_when
(str
, default:'ALL_COMPLETED'
) –Either
"ALL_COMPLETED"
or"FIRST_COMPLETED"
.
Returns:
-
tuple[set[TaskFuture[R]], set[TaskFuture[R]]]
–Tuple containing the set of completed tasks and the set of not completed tasks.