Skip to content

taps.executor.python

ProcessPoolConfig

Bases: ExecutorConfig

Process pool executor configuration.

Attributes:

  • max_processes (int) –

    Maximum number of processes.

add_argument_group() classmethod

add_argument_group(
    parser: ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True
) -> None

Add model fields as arguments of an argument group on the parser.

Parameters:

  • parser (ArgumentParser) –

    Parser to add a new argument group to.

  • argv (Sequence[str] | None, default: None ) –

    Optional sequence of string arguments.

  • required (bool, default: True ) –

    Mark arguments without defaults as required.

Source code in taps/config.py
@classmethod
def add_argument_group(
    cls,
    parser: argparse.ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True,
) -> None:
    """Add model fields as arguments of an argument group on the parser.

    Args:
        parser: Parser to add a new argument group to.
        argv: Optional sequence of string arguments.
        required: Mark arguments without defaults as required.
    """
    group = parser.add_argument_group(cls.__name__)
    for field_name, field_info in cls.model_fields.items():
        arg_name = field_name.replace('_', '-').lower()
        group.add_argument(
            f'--{arg_name}',
            dest=field_name,
            # type=field_info.annotation,
            default=field_info.get_default(),
            required=field_info.is_required() and required,
            help=field_info.description,
        )

get_executor()

get_executor() -> DAGExecutor

Create an executor instance from the config.

Source code in taps/executor/python.py
def get_executor(self) -> DAGExecutor:
    """Create an executor instance from the config."""
    context = multiprocessing.get_context(self.multiprocessing_context)
    return DAGExecutor(
        ProcessPoolExecutor(self.max_processes, mp_context=context),
    )

ThreadPoolConfig

Bases: ExecutorConfig

Thread pool executor configuration.

Attributes:

  • max_threads (int) –

    Maximum number of threads.

add_argument_group() classmethod

add_argument_group(
    parser: ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True
) -> None

Add model fields as arguments of an argument group on the parser.

Parameters:

  • parser (ArgumentParser) –

    Parser to add a new argument group to.

  • argv (Sequence[str] | None, default: None ) –

    Optional sequence of string arguments.

  • required (bool, default: True ) –

    Mark arguments without defaults as required.

Source code in taps/config.py
@classmethod
def add_argument_group(
    cls,
    parser: argparse.ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True,
) -> None:
    """Add model fields as arguments of an argument group on the parser.

    Args:
        parser: Parser to add a new argument group to.
        argv: Optional sequence of string arguments.
        required: Mark arguments without defaults as required.
    """
    group = parser.add_argument_group(cls.__name__)
    for field_name, field_info in cls.model_fields.items():
        arg_name = field_name.replace('_', '-').lower()
        group.add_argument(
            f'--{arg_name}',
            dest=field_name,
            # type=field_info.annotation,
            default=field_info.get_default(),
            required=field_info.is_required() and required,
            help=field_info.description,
        )

get_executor()

get_executor() -> DAGExecutor

Create an executor instance from the config.

Source code in taps/executor/python.py
def get_executor(self) -> DAGExecutor:
    """Create an executor instance from the config."""
    return DAGExecutor(ThreadPoolExecutor(self.max_threads))