Skip to content

taps.executor.ray

RayExecutor

RayExecutor(
    address: str | None = "local",
    num_cpus: int | None = None,
)

Bases: Executor

Ray execution engine.

Note

Ray will raise a serialization error if a Proxy[bytes] is passed to or returned by a function. This is because Ray skips serializing bytes instances. Ray works with all other types of proxies, so if you need to send bytes data, wrap the data in another type.

Parameters:

  • address (str | None, default: 'local' ) –

    Address to pass to ray.init().

  • num_cpus (int | None, default: None ) –

    Number of CPUs to use.

Source code in taps/executor/ray.py
def __init__(
    self,
    address: str | None = 'local',
    num_cpus: int | None = None,
) -> None:
    if RAY_IMPORT_ERROR is not None:  # pragma: no cover
        raise RAY_IMPORT_ERROR

    self._address = address
    self._num_cpus = num_cpus

    ray.init(address=address, configure_logging=False, num_cpus=num_cpus)
    # Mapping of Python callables to Ray RemoteFunction types
    self._remote: dict[Any, Any] = {}

submit

submit(
    function: Callable[P, T],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[T]

    Future-like object representing the result of the execution of the callable.

Source code in taps/executor/ray.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future]-like object representing \
        the result of the execution of the callable.
    """
    args = cast(P.args, _parse_args(args))
    kwargs = cast(P.kwargs, _parse_kwargs(kwargs))

    if function in self._remote:
        remote = self._remote[function]
    else:
        wrapped = _wrap_function(function)
        remote = ray.remote(wrapped)
        self._remote[function] = remote

    object_ref = remote.remote(*args, **kwargs)

    return object_ref.future()

shutdown

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the client.

Source code in taps/executor/ray.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the client."""
    ray.shutdown()

RayConfig

Bases: ExecutorConfig

RayExecutor plugin configuration.

Parameters:

  • name (Literal[str], default: 'ray' ) –

    Executor name.

  • address (str | None, default: 'local' ) –

    Ray scheduler address (default spawns local cluster).

  • num_cpus (int | None, default: None ) –

    Maximum number of CPUs that ray will use.

get_executor

get_executor() -> RayExecutor

Create an executor instance from the config.

Source code in taps/executor/ray.py
def get_executor(self) -> RayExecutor:
    """Create an executor instance from the config."""
    return RayExecutor(address=self.address, num_cpus=self.num_cpus)