Task Executors¶
The Engine
, in TaPS, wraps a task executor.
Task executors are responsible for managing the asynchronous execution of functions.
Task executors implement Python's concurrent.futures.Executor
model, and TaPS supports an extensible plugin system for configuring executor parameters and adding new executor types.
The rest of this guide describes creating a new executor within the TaPS framework.
Creating an Executor¶
Here, we will create a SyncExecutor
which simply executes a function directly.
This is not a very useful executor in practice as it does not enable an concurrency, but it will suffice for explaining the steps.
Note
This step can be skipped if you already have an implementation that implements the concurrent.futures.Executor
model.
This step is for when (a) you are implementing an executor from scratch or (b) you need to wrap an existing executor with a concurrent.futures.Executor
compliant interface.
The below code in taps/executor/sync.py
implements the required submit()
, map()
, and shutdown()
methods of SyncExecutor
.
Note that concurrent.futures.Executor
provides a default implementation of map()
which can be suitable if the implementation does not have a special mechanism for handling mapped tasks.
Creating a Config¶
Config classes are how plugins are registered within TaPS.
For executors, every config must inherit from ExecutorConfig
, an abstract base class with an abstract method get_executor()
.
The @register('executor')
decorator registers the config as a new executor plugin.
Registering the plugin makes our SyncExecutor
available as an option with the CLI and enables input validation on fields of our executor.
taps/executor/sync.py | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
|
The changes necessary to add the config to taps/executor/sync.py
are highlighted.
The name
field of SyncExecutorConfig
defines the name via which this executor can be selected from the run CLI.
Warning
The Engine
requires that task executors support implicit data flow dependencies between tasks with futures.
In other words, this means that it must be possible to pass the future from one task as a positional or keyword argument to another task.
Many executors already support this (e.g., Dask or Parsl), but many do not (e.g., Python's ThreadPoolExecutor
and ProcessPoolExecutor
.
TaPS provides the FutureDependencyExecutor
which can wrap another Executor
instance to enable implicit data flow dependencies.
Since SyncExecutor.submit()
does not support accepting a Future
in place of a positional or keyword argument, we must wrap the SyncExecutor
in a FutureDependencyExecutor
in SyncExecutorConfig.get_executor()
.
The last step is to import the SyncExecutor
and SyncExecutorConfig
inside of taps/executor/__init__.py
.
This ensures that the @register
decorators get executed.
...
from taps.executor.python import ThreadPoolConfig
from taps.executor.sync import SyncExecutor
from taps.executor.sync import SyncExecutorConfig
from taps.executor.ray import RayConfig
...
Using the Executor¶
Now that we have created our SyncExecutor
and registered the corresponding SyncExecutorConfig
, we can utilize the executor to perform an benchmark.
python -m taps.run --app cholesky --app.matrix-size 100 --app.block-size 25 \
--engine.executor sync --engine.executor.sleep 0.1
sync
, and we can also see that the sleep
field is available as an optional CLI parameter since we gave it a default value.
[2024-07-10 13:36:08.774] RUN (taps.run) :: Starting app (name=cholesky)
[2024-07-10 13:36:08.774] RUN (taps.run) :: Configuration:
app:
name: 'cholesky'
block_size: 25
matrix_size: 100
engine:
executor:
name: 'sync'
sleep: 0.1
filter:
name: 'null'
task_record_file_name: 'tasks.jsonl'
transformer:
name: 'null'
logging:
file_level: 'INFO'
file_name: 'log.txt'
level: 'INFO'
run:
dir_format: 'runs/{name}_{executor}_{timestamp}'
[2024-07-10 13:36:08.774] RUN (taps.run) :: Runtime directory: runs/cholesky_sync_2024-07-10-13-36-08
[2024-07-10 13:36:08.774] APP (taps.apps.cholesky) :: Generated input matrix: (100, 100)
[2024-07-10 13:36:08.775] APP (taps.apps.cholesky) :: Block size: 25
[2024-07-10 13:36:11.953] APP (taps.apps.cholesky) :: Output matrix: (100, 100)
[2024-07-10 13:36:11.953] RUN (taps.run) :: Finished app (name=cholesky, runtime=3.18s, tasks=30)