Skip to content

taps.executor.ray

RayExecutor

RayExecutor(
    address: str | None = "local",
    num_cpus: int | None = None,
)

Bases: Executor

Ray execution engine.

Note

Ray will raise a serialization error if a Proxy[bytes] is passed to or returned by a function. This is because Ray skips serializing bytes instances. Ray works with all other types of proxies, so if you need to send bytes data, wrap the data in another type.

Parameters:

  • address (str | None, default: 'local' ) –

    Address to pass to ray.init().

  • num_cpus (int | None, default: None ) –

    Number of CPUs to use.

Source code in taps/executor/ray.py
def __init__(
    self,
    address: str | None = 'local',
    num_cpus: int | None = None,
) -> None:
    if RAY_IMPORT_ERROR is not None:  # pragma: no cover
        raise RAY_IMPORT_ERROR

    ray.init(address=address, configure_logging=False, num_cpus=num_cpus)
    # Mapping of Python callables to Ray RemoteFunction types
    self._remote: dict[Any, Any] = {}

submit()

submit(
    function: Callable[P, T],
    /,
    *args: args,
    **kwargs: kwargs,
) -> Future[T]

Schedule the callable to be executed.

Parameters:

  • function (Callable[P, T]) –

    Callable to execute.

  • args (args, default: () ) –

    Positional arguments.

  • kwargs (kwargs, default: {} ) –

    Keyword arguments.

Returns:

  • Future[T]

    Future-like object representing the result of the execution of the callable.

Source code in taps/executor/ray.py
def submit(
    self,
    function: Callable[P, T],
    /,
    *args: P.args,
    **kwargs: P.kwargs,
) -> Future[T]:
    """Schedule the callable to be executed.

    Args:
        function: Callable to execute.
        args: Positional arguments.
        kwargs: Keyword arguments.

    Returns:
        [`Future`][concurrent.futures.Future]-like object representing \
        the result of the execution of the callable.
    """
    args = cast(P.args, _parse_args(args))
    kwargs = cast(P.kwargs, _parse_kwargs(kwargs))

    if function in self._remote:
        remote = self._remote[function]
    else:
        wrapped = _wrap_function(function)
        remote = ray.remote(wrapped)
        self._remote[function] = remote

    object_ref = remote.remote(*args, **kwargs)

    return object_ref.future()

shutdown()

shutdown(
    wait: bool = True, *, cancel_futures: bool = False
) -> None

Shutdown the client.

Source code in taps/executor/ray.py
def shutdown(
    self,
    wait: bool = True,
    *,
    cancel_futures: bool = False,
) -> None:
    """Shutdown the client."""
    ray.shutdown()

RayConfig

Bases: ExecutorConfig

RayExecutor plugin configuration.

Attributes:

  • address (Optional[str]) –

    Address of the Ray cluster to run on.

  • num_cpus (Optional[int]) –

    Number of actor processes to start in the pool. Defaults to the number of cores in the Ray cluster, or the number of cores on this machine.

get_executor()

get_executor() -> RayExecutor

Create an executor instance from the config.

Source code in taps/executor/ray.py
def get_executor(self) -> RayExecutor:
    """Create an executor instance from the config."""
    return RayExecutor(address=self.address, num_cpus=self.num_cpus)