Skip to content

taps.apps.failures.types

FailureType

Bases: Enum

Failure types.

random classmethod

random() -> Self

Select a random failure type (excluding RANDOM).

Source code in taps/apps/failures/types.py
@classmethod
def random(cls) -> Self:
    """Select a random failure type (excluding RANDOM)."""
    options = [f.value for f in cls]
    options.remove(cls.RANDOM.value)
    return cls(random.choice(options))

ParentDependencyError

Bases: Exception

Exception raised in parent tasks when simulating dependency errors.

exception_failure

exception_failure() -> None

Raise an exception.

Source code in taps/apps/failures/types.py
def exception_failure() -> None:
    """Raise an exception."""
    raise Exception('Failure injection error.')

import_failure

import_failure() -> None

Simulate an import error due to a bad environment.

Source code in taps/apps/failures/types.py
def import_failure() -> None:
    """Simulate an import error due to a bad environment."""
    raise ImportError('Failure injection error.')

manager_killed_failure

manager_killed_failure() -> None

Kill the parent process (i.e., the manager).

Source code in taps/apps/failures/types.py
def manager_killed_failure() -> None:  # pragma: no cover
    """Kill the parent process (i.e., the manager)."""
    current_pid = os.getpid()
    current_process = psutil.Process(current_pid)
    parent_process = current_process.parent()

    if parent_process is None:
        logger.warning(
            f'Task process (pid={current_process} has no parent process',
        )
        return

    parent_pid = parent_process.pid
    logger.info(f'Killing manager parent process (pid={parent_pid})')
    try:
        os.kill(parent_pid, signal.SIGTERM)
        logger.info(f'Parent process terminated (pid={parent_pid})')
    except psutil.NoSuchProcess:
        logger.exception('Parent process does not exist')
    except psutil.AccessDenied:
        logger.exception(
            'Insufficient permission to terminate parent process',
        )

memory_failure

memory_failure() -> None

Force an out of memory error.

Source code in taps/apps/failures/types.py
def memory_failure() -> None:  # pragma: no cover
    """Force an out of memory error."""
    huge_memory_list = []
    while True:
        huge_memory_list.append('x' * (1024**3))

node_killed_failure

node_killed_failure() -> None

Kill other processes in the node to simulate a node failure.

Warning

This is a very dangerous function. It will kill random processes on the node. Do not run this function in a process with sudo privileges.

Source code in taps/apps/failures/types.py
def node_killed_failure() -> None:  # pragma: no cover
    """Kill other processes in the node to simulate a node failure.

    Warning:
        This is a very dangerous function. It will kill random processes
        on the node. Do not run this function in a process with sudo
        privileges.
    """
    current_pid = os.getpid()

    for proc in psutil.process_iter(attrs=['pid', 'name']):
        pid = proc.info['pid']
        if pid == current_pid:
            continue
        try:
            p = psutil.Process(pid)
            p.terminate()
        except (
            psutil.NoSuchProcess,
            psutil.AccessDenied,
            psutil.ZombieProcess,
        ):
            logger.exception(f'Exception when killing process (pid={pid})')

    psutil.wait_procs(psutil.process_iter(), timeout=3, callback=None)

worker_killed_failure

worker_killed_failure() -> None

Kill the current process.

Source code in taps/apps/failures/types.py
def worker_killed_failure() -> None:  # pragma: no cover
    """Kill the current process."""
    pid = os.getpid()
    try:
        psutil.Process(pid).terminate()
    except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
        logger.exception(f'Failed to kill current process (pid={pid})')

timeout_failure

timeout_failure() -> None

Sleep forever to force walltime or timeout error.

Source code in taps/apps/failures/types.py
def timeout_failure() -> None:  # pragma: no cover
    """Sleep forever to force walltime or timeout error."""
    import time

    while True:
        time.sleep(60)

ulimit_failure

ulimit_failure() -> None

Open 1M files to simulate ulimit exceeded error.

Source code in taps/apps/failures/types.py
def ulimit_failure() -> None:  # pragma: no cover
    """Open 1M files to simulate ulimit exceeded error."""
    limit = 1_000_000
    handles = []

    with tempfile.TemporaryDirectory() as tmp_dir:
        try:
            for i in range(limit):
                file = os.path.join(tmp_dir, f'{i}.txt')
                handles.append(open(file, 'w'))  # noqa: SIM115
        finally:
            for handle in handles:
                handle.close()

zero_division_failure

zero_division_failure() -> None

Raise divide by zero error.

Source code in taps/apps/failures/types.py
def zero_division_failure() -> None:
    """Raise divide by zero error."""
    raise ZeroDivisionError('Failure injection error.')