taps.apps.synthetic¶
SyntheticApp ¶
SyntheticApp(
structure: WorkflowStructure,
task_count: int,
task_data_bytes: int,
task_sleep: float,
bag_max_running: int | None,
*,
warmup_task: bool = True
)
Synthetic workflow application.
Parameters:
-
structure
(WorkflowStructure
) –Workflow structure.
-
task_count
(int
) –Number of tasks.
-
task_data_bytes
(int
) –Size of random input and output data of tasks.
-
task_sleep
(float
) –Seconds to sleep for in each task.
-
bag_max_running
(int | None
) –Maximum concurrently executing tasks in the "bag" workflow.
Source code in taps/apps/synthetic.py
close() ¶
run() ¶
Run the application.
Parameters:
Source code in taps/apps/synthetic.py
generate_data() ¶
Get random data of specified size.
Uses random.randbytes()
in Python 3.9 or newer and
os.urandom()
in Python 3.8 and older.
Note
This class returns a Data
object rather than a bytestring directly.
This indirection is because some serializers skip bytes
which will cause problems if ProxyStore is used in this application
because the Proxy[bytes]
will be an instance of bytes
and
won't get properly serialized. This is the case with Ray, for example.
Parameters:
-
size
(int
) –size of byte string to return.
Returns:
-
Data
–random data.
Source code in taps/apps/synthetic.py
noop_task() ¶
No-op sleep task.
Parameters:
-
data
(Data
, default:()
) –Input data.
-
output_size
(int
) –Size in bytes of output byte-string.
-
sleep
(float
) –Minimum runtime of the task. Time required to generate the output data will be subtracted from this sleep time.
-
task_id
(UUID | None
, default:None
) –Optional unique task ID to prevent engines from caching the task result.
Returns:
-
Data
–Byte-string of length
output_size
.
Source code in taps/apps/synthetic.py
warmup_task() ¶
run_bag_of_tasks() ¶
run_bag_of_tasks(
engine: AppEngine,
task_count: int,
task_data_bytes: int,
task_sleep: float,
max_running_tasks: int,
) -> None
Run bag of tasks workflow.
Source code in taps/apps/synthetic.py
run_diamond() ¶
Run diamond workflow.
Source code in taps/apps/synthetic.py
run_reduce() ¶
Run reduce worklow.
Source code in taps/apps/synthetic.py
run_sequential() ¶
run_sequential(
engine: AppEngine,
task_count: int,
task_data_bytes: int,
task_sleep: float,
) -> None
Run sequential workflow.