taps.apps.synthetic¶
SyntheticApp ¶
SyntheticApp(
structure: WorkflowStructure,
task_count: int,
task_data_bytes: int,
task_sleep: float,
bag_max_running: int | None,
*,
warmup_task: bool = True
)
Synthetic workflow application.
Parameters:
-
structure(WorkflowStructure) –Workflow structure.
-
task_count(int) –Number of tasks.
-
task_data_bytes(int) –Size of random input and output data of tasks.
-
task_sleep(float) –Seconds to sleep for in each task.
-
bag_max_running(int | None) –Maximum concurrently executing tasks in the "bag" workflow.
Source code in taps/apps/synthetic.py
close() ¶
run() ¶
Run the application.
Parameters:
Source code in taps/apps/synthetic.py
generate_data() ¶
Get random data of specified size.
Uses random.randbytes() in Python 3.9 or newer and
os.urandom() in Python 3.8 and older.
Note
This class returns a Data object rather than a bytestring directly.
This indirection is because some serializers skip bytes
which will cause problems if ProxyStore is used in this application
because the Proxy[bytes] will be an instance of bytes and
won't get properly serialized. This is the case with Ray, for example.
Parameters:
-
size(int) –size of byte string to return.
Returns:
-
Data–random data.
Source code in taps/apps/synthetic.py
noop_task() ¶
No-op sleep task.
Parameters:
-
data(Data, default:()) –Input data.
-
output_size(int) –Size in bytes of output byte-string.
-
sleep(float) –Minimum runtime of the task. Time required to generate the output data will be subtracted from this sleep time.
-
task_id(UUID | None, default:None) –Optional unique task ID to prevent engines from caching the task result.
Returns:
-
Data–Byte-string of length
output_size.
Source code in taps/apps/synthetic.py
warmup_task() ¶
run_bag_of_tasks() ¶
run_bag_of_tasks(
engine: AppEngine,
task_count: int,
task_data_bytes: int,
task_sleep: float,
max_running_tasks: int,
) -> None
Run bag of tasks workflow.
Source code in taps/apps/synthetic.py
run_diamond() ¶
Run diamond workflow.
Source code in taps/apps/synthetic.py
run_reduce() ¶
Run reduce worklow.
Source code in taps/apps/synthetic.py
run_sequential() ¶
run_sequential(
engine: AppEngine,
task_count: int,
task_data_bytes: int,
task_sleep: float,
) -> None
Run sequential workflow.