Skip to content

taps.executor.parsl

ParslLocalConfig

Bases: ExecutorConfig

Local ParslPoolExecutor plugin configuration.

Simple Parsl configuration that uses the HighThroughputExecutor on the local node.

Parameters:

  • name (Literal[str], default: 'parsl-local' ) –

    Executor name.

  • workers (int | None, default: None ) –

    Maximum number of parsl workers.

  • run_dir (str, default: 'parsl-runinfo' ) –

    Parsl run directory within the app run directory.

get_executor

get_executor() -> ParslPoolExecutor

Create an executor instance from the config.

Source code in taps/executor/parsl.py
def get_executor(self) -> ParslPoolExecutor:
    """Create an executor instance from the config."""
    workers = (
        self.workers
        if self.workers is not None
        else multiprocessing.cpu_count()
    )
    executor = HighThroughputExecutor(
        label='taps-htex-local',
        max_workers_per_node=workers,
        address=address_by_hostname(),
        cores_per_worker=1,
        provider=LocalProvider(
            channel=LocalChannel(),
            init_blocks=1,
            max_blocks=1,
        ),
    )
    config = Config(
        executors=[executor],
        run_dir=self.run_dir,
        initialize_logging=False,
    )
    return ParslPoolExecutor(config)

ParslHTExConfig

Bases: ExecutorConfig

HTEx ParslPoolExecutor plugin configuration.

These parameters are a subset of parsl.config.Config.

For simple, single-node HTEx deployments, prefer ParslLocalConfig.

Parameters:

  • name (Literal[str], default: 'parsl-htex' ) –

    Executor name.

  • htex (ForwardRef('Union[HTExConfig, Dict[str, HTExConfig]]')) –

    HTEx configuration.

  • app_cache (bool | None, default: None ) –

    Enable app caching.

  • retries (int, default: 0 ) –

    Number of task retries in case of task failure.

  • strategy (str | None, default: None ) –

    Block scaling strategy.

  • max_idletime (float | None, default: None ) –

    Idle time before strategy can shutdown unused blocks.

  • monitoring (ForwardRef('Optional[MonitoringConfig]'), default: None ) –

    Database monitoring configuration.

  • run_dir (str, default: 'parsl-runinfo' ) –

    Parsl run directory within the app run directory.

get_executor

get_executor() -> ParslPoolExecutor

Create an executor instance from the config.

Source code in taps/executor/parsl.py
def get_executor(self) -> ParslPoolExecutor:
    """Create an executor instance from the config."""
    options = self.model_dump(
        exclude={'name', 'htex', 'monitoring'},
        exclude_none=True,
    )

    if self.monitoring is not None:
        options['monitoring'] = self.monitoring.get_monitoring()

    executor_configs = (
        list(self.htex.values())
        if isinstance(self.htex, dict)
        else [self.htex]
    )
    config = Config(
        executors=[config.get_executor() for config in executor_configs],
        initialize_logging=False,
        **options,
    )
    return ParslPoolExecutor(config)

HTExConfig

Bases: BaseModel

Configuration for Parl's parsl.executors.HighThroughputExecutor.

Parameters:

  • label (str, default: 'taps-htex' ) –

    Executor label.

  • provider (ForwardRef('Optional[ProviderConfig]'), default: None ) –

    Configuration for the compute resource provider.

  • address (ForwardRef('Optional[Union[str, AddressConfig]]'), default: None ) –

    Address to connect to the main Parsl process.

  • manager_selector (ForwardRef('Optional[ManagerSelectorConfig]'), default: None ) –

    Configuration for the manager selector (available in Parsl v2024.8.5 and later).

  • worker_ports (Tuple[int, int] | None, default: None ) –

    Ports used by workers to connect to Parsl

  • worker_port_range (Tuple[int, int] | None, default: None ) –

    Range of ports to choose worker ports from.

  • interchange_port_range (Tuple[int, int] | None, default: None ) –

    Ports used by Parsl to connect to interchange.

Note

Optional attributes will default to Parsl's default values.

Note

When using multiple HTExs at the same time, the label attribute must be changed to be unique for each executor.

Note

Extra options passed to this model will be provided as keyword arguments to parsl.executors.HighThroughputExecutor.

get_executor

get_executor() -> HighThroughputExecutor

Create an executor instance from the config.

Source code in taps/executor/parsl.py
def get_executor(self) -> HighThroughputExecutor:
    """Create an executor instance from the config."""
    options = self.model_dump(exclude_none=True)
    if self.model_extra is not None:  # pragma: no branch
        options.update(self.model_extra)

    if self.provider is not None:
        options['provider'] = self.provider.get_provider()
    if self.manager_selector is not None:
        options['manager_selector'] = (
            self.manager_selector.get_manager_selector()
        )
    if self.address is not None and isinstance(
        self.address,
        AddressConfig,
    ):
        options['address'] = self.address.get_address()

    return HighThroughputExecutor(**options)

AddressConfig

Bases: BaseModel

Parsl address configuration.

Parameters:

  • kind (str) –

    Function to invoke to get address.

Example
from parsl.addresses import address_by_interface
from taps.executor.parsl import AddressConfig

config = AddressConfig(kind='address_by_interface', ifname='bond0')
assert config.get_address() == address_by_interface(ifname='bond0')

get_address

get_address() -> str

Get the address according to the configuration.

Source code in taps/executor/parsl.py
def get_address(self) -> str:
    """Get the address according to the configuration."""
    address_fn = getattr(parsl.addresses, self.kind)
    options = self.model_extra if self.model_extra is not None else {}
    return address_fn(**options)

ProviderConfig

Bases: BaseModel

Parsl execution provider configuration.

Parameters:

  • kind (str) –

    Execution provider class name

  • launcher (ForwardRef('Optional[LauncherConfig]'), default: None ) –

    Launcher configuration.

Example

Create a provider configuration and call get_provider().

from taps.executor.parsl import ProviderConfig

config = ProviderConfig(
    kind='PBSProProvider',
    account='my-account',
    cpus_per_node=32,
    init_blocks=1,
    max_blocks=1,
    min_blocks=0,
    nodes_per_block=1,
    queue='debug',
    select_options='ngpus=4',
    walltime='00:30:00',
    worker_init='module load conda',
)
config.get_provider()
The resulting provider is equivalent to creating it manually.
from parsl.providers import PBSProProvider

PBSProProvider(
    account='my-account',
    cpus_per_node=32,
    init_blocks=1,
    max_blocks=1,
    min_blocks=0,
    nodes_per_block=1,
    queue='debug',
    select_options='ngpus=4',
    walltime='00:30:00',
    worker_init='module load conda',
),

get_provider

get_provider() -> ExecutionProvider

Create a provider from the configuration.

Source code in taps/executor/parsl.py
def get_provider(self) -> ExecutionProvider:
    """Create a provider from the configuration."""
    options = self.model_extra if self.model_extra is not None else {}

    if self.launcher is not None:
        options['launcher'] = self.launcher.get_launcher()

    provider_cls = getattr(parsl.providers, self.kind)
    return provider_cls(**options)

LauncherConfig

Bases: BaseModel

Parsl launcher configuration.

Parameters:

  • kind (str) –

    Launcher class name.

Example

Create a launcher configuration and call get_launcher().

from taps.executor.parsl import LauncherConfig

config = LauncherConfig(
    kind='MpiExecLauncher',
    bind_cmd='--cpu-bind',
    overrides='--depth=64 --ppn=1,
)
config.get_launcher()
The resulting launcher is equivalent to creating it manually.
from parsl.launchers import MpiExecLauncher

MpiExecLauncher(bind_cmd='--cpu-bind', overrides='--depth=64 --ppn 1')

get_launcher

get_launcher() -> Launcher

Create a launcher from the configuration.

Source code in taps/executor/parsl.py
def get_launcher(self) -> Launcher:
    """Create a launcher from the configuration."""
    launcher_cls = getattr(parsl.launchers, self.kind)
    options = self.model_extra if self.model_extra is not None else {}
    return launcher_cls(**options)

ManagerSelectorConfig

Bases: BaseModel

Parsl HTEx manager selector configuration.

Parameters:

  • kind (str) –

    Manager selector class name.

Example

Create a manager selector configuration and call get_manager_selector().

from taps.executor.parsl import ManagerSelectorConfig

config = ManagerSelectorConfig(kind='RandomManagerSelector')
config.get_manager_selector()
The resulting manager selector is equivalent to creating it manually.
from parsl.executors.high_throughput.manager_selector import RandomManagerSelector

RandomManagerSelector()

get_manager_selector

get_manager_selector() -> ManagerSelector

Create a manager selector from the configuration.

Source code in taps/executor/parsl.py
def get_manager_selector(self) -> ManagerSelector:
    """Create a manager selector from the configuration."""
    manager_cls = getattr(
        parsl.executors.high_throughput.manager_selector,
        self.kind,
    )
    options = self.model_extra if self.model_extra is not None else {}
    return manager_cls(**options)

MonitoringConfig

Bases: BaseModel

Parsl monitoring system configuration.

Parameters:

  • hub_address (str | AddressConfig | None, default: None ) –

    Address to connect to the monitoring hub.

  • hub_port_range (Tuple[int, int] | None, default: None ) –

    Port range for a ZMQ channel from executor process.

Example

Create a monitoring configuration and call get_monitoring().

from taps.executor.parsl import MonitoringConfig

config = MonitoringConfig(
    hub_address='localhost',
    logging_endpoint='sqlite:///parsl-runinfo/monitoring.db',
    resource_monitoring_interval=1,
    hub_port=55055,
)
config.get_monitoring()
The resulting MonitoringHub is equivalent to creating it manually.
from parsl.monitoring.monitoring import MonitoringHub

MonitoringHub(
    hub_address='localhost',
    logging_endpoint='sqlite:///parsl-runinfo/monitoring.db',
    resource_monitoring_interval=1,
    hub_port=55055,
)

get_monitoring

get_monitoring() -> MonitoringHub

Create a MonitoringHub from the configuration.

Source code in taps/executor/parsl.py
def get_monitoring(self) -> MonitoringHub:
    """Create a MonitoringHub from the configuration."""
    options = self.model_dump(exclude_none=True)
    if self.model_extra is not None:  # pragma: no branch
        options.update(self.model_extra)
    if self.hub_address is not None and isinstance(
        self.hub_address,
        AddressConfig,
    ):
        options['hub_address'] = self.hub_address.get_address()

    return MonitoringHub(**options)