Skip to content

taps.apps.failures.app

FailureInjectionApp

FailureInjectionApp(
    base_config: AppConfig,
    failure_rate: float,
    failure_type: FailureType,
)

Failure injection application.

Parameters:

  • base_config (AppConfig) –

    Configuration for the base application to inject failures into.

  • failure_type (FailureType) –

    The type of failure to inject.

  • failure_rate (float) –

    The probability of injecting a failure into any given task.

Source code in taps/apps/failures/app.py
def __init__(
    self,
    base_config: AppConfig,
    failure_rate: float,
    failure_type: FailureType,
) -> None:
    self.base = base_config.get_app()
    self.base_config = base_config
    self.failure_rate = failure_rate
    self.failure_type = failure_type

close()

close() -> None

Close the application.

Source code in taps/apps/failures/app.py
def close(self) -> None:
    """Close the application."""
    self.base.close()

run()

run(engine: Engine, run_dir: Path) -> None

Run the application.

Parameters:

  • engine (Engine) –

    Application execution engine.

  • run_dir (Path) –

    Run directory.

Source code in taps/apps/failures/app.py
def run(self, engine: Engine, run_dir: pathlib.Path) -> None:
    """Run the application.

    Args:
        engine: Application execution engine.
        run_dir: Run directory.
    """
    logger.log(
        APP_LOG_LEVEL,
        f'Injecting failures into the {self.base_config.name} app '
        f'(type={self.failure_type.value}, rate={self.failure_rate})',
    )

    with _FailureInjectionEngine(
        engine,
        failure_rate=self.failure_rate,
        failure_type=self.failure_type,
    ) as failure_engine:
        self.base.run(engine=failure_engine, run_dir=run_dir)