Skip to content

taps.executor.parsl

ParslConfig

Bases: ExecutorConfig

Parsl configuration.

Attributes:

  • endpoint

    Globus Compute endpoint UUID.

add_argument_group() classmethod

add_argument_group(
    parser: ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True
) -> None

Add model fields as arguments of an argument group on the parser.

Parameters:

  • parser (ArgumentParser) –

    Parser to add a new argument group to.

  • argv (Sequence[str] | None, default: None ) –

    Optional sequence of string arguments.

  • required (bool, default: True ) –

    Mark arguments without defaults as required.

Source code in taps/config.py
@classmethod
def add_argument_group(
    cls,
    parser: argparse.ArgumentParser,
    *,
    argv: Sequence[str] | None = None,
    required: bool = True,
) -> None:
    """Add model fields as arguments of an argument group on the parser.

    Args:
        parser: Parser to add a new argument group to.
        argv: Optional sequence of string arguments.
        required: Mark arguments without defaults as required.
    """
    group = parser.add_argument_group(cls.__name__)
    for field_name, field_info in cls.model_fields.items():
        arg_name = field_name.replace('_', '-').lower()
        group.add_argument(
            f'--{arg_name}',
            dest=field_name,
            # type=field_info.annotation,
            default=field_info.get_default(),
            required=field_info.is_required() and required,
            help=field_info.description,
        )

get_executor_config()

get_executor_config() -> Config

Create a Parsl config from this config.

Source code in taps/executor/parsl.py
def get_executor_config(self) -> Config:
    """Create a Parsl config from this config."""
    workers = (
        self.parsl_workers
        if self.parsl_workers is not None
        else multiprocessing.cpu_count()
    )

    if self.parsl_use_threads:
        executor = ThreadPoolExecutor(max_threads=workers)
    else:
        executor = HighThroughputExecutor(
            label='htex-local',
            max_workers_per_node=workers,
            address=address_by_hostname(),
            cores_per_worker=1,
            provider=LocalProvider(
                channel=LocalChannel(),
                init_blocks=1,
                max_blocks=1,
            ),
        )

    return Config(executors=[executor], run_dir=self.parsl_run_dir)

get_executor()

get_executor() -> Executor

Create an executor instance from the config.

Source code in taps/executor/parsl.py
def get_executor(self) -> globus_compute_sdk.Executor:
    """Create an executor instance from the config."""
    return ParslPoolExecutor(self.get_executor_config())