Skip to content

taps.transformer

Transformer

Bases: Protocol[IdentifierT]

Data transformer protocol.

A data transformer is used by the Engine to transform task parameters and results into alternative formats that are more suitable for communication.

An object can be transformed using transform() which returns an identifier. The identifier can then be provided to resolve(), the inverse of transform(), which returns the original object.

Data transformer implementations can implement object identifiers in any manner, provided identifiers are serializable. For example, a simple identifier could be a UUID corresponding to a database entry containing the serialized object.

close

close() -> None

Close the transformer.

The transformer is only closed by the client once the application has finished executing (or raised an exception).

Source code in taps/transformer/_protocol.py
def close(self) -> None:
    """Close the transformer.

    The transformer is only closed by the client once the application
    has finished executing (or raised an exception).
    """
    ...

is_identifier

is_identifier(obj: T) -> bool

Check if the object is an identifier instance.

Source code in taps/transformer/_protocol.py
def is_identifier(self, obj: T) -> bool:
    """Check if the object is an identifier instance."""
    ...

transform

transform(obj: T) -> IdentifierT

Transform the object into an identifier.

Parameters:

  • obj (T) –

    Object to transform.

Returns:

  • IdentifierT

    Identifier object that can be used to resolve obj.

Source code in taps/transformer/_protocol.py
def transform(self, obj: T) -> IdentifierT:
    """Transform the object into an identifier.

    Args:
        obj: Object to transform.

    Returns:
        Identifier object that can be used to resolve `obj`.
    """
    ...

resolve

resolve(identifier: IdentifierT) -> Any

Resolve an object from an identifier.

Parameters:

  • identifier (IdentifierT) –

    Identifier to an object.

Returns:

  • Any

    The resolved object.

Source code in taps/transformer/_protocol.py
def resolve(self, identifier: IdentifierT) -> Any:
    """Resolve an object from an identifier.

    Args:
        identifier: Identifier to an object.

    Returns:
        The resolved object.
    """
    ...

TransformerConfig

Bases: BaseModel, ABC

Abstract Transformer plugin configuration.

Parameters:

  • name (str) –

    Transformer name.

get_transformer abstractmethod

get_transformer() -> Transformer[Any]

Create a transformer from the configuration.

Source code in taps/transformer/_protocol.py
@abc.abstractmethod
def get_transformer(self) -> Transformer[Any]:
    """Create a transformer from the configuration."""
    ...

PickleFileTransformer

PickleFileTransformer(cache_dir: Path | str)

Pickle file object transformer.

Transforms objects by pickling the object and writing the pickled object to a file.

Parameters:

  • cache_dir (Path | str) –

    Directory to store pickled objects in.

Source code in taps/transformer/_file.py
def __init__(
    self,
    cache_dir: pathlib.Path | str,
) -> None:
    self.cache_dir = pathlib.Path(cache_dir).resolve()

close

close() -> None

Close the transformer.

Source code in taps/transformer/_file.py
def close(self) -> None:
    """Close the transformer."""
    shutil.rmtree(self.cache_dir, ignore_errors=True)

is_identifier

is_identifier(obj: Any) -> bool

Check if the object is an identifier instance.

Source code in taps/transformer/_file.py
def is_identifier(self, obj: Any) -> bool:
    """Check if the object is an identifier instance."""
    return isinstance(obj, PickleFileIdentifier)

transform

transform(obj: T) -> PickleFileIdentifier

Transform the object into an identifier.

Parameters:

  • obj (T) –

    Object to transform.

Returns:

Source code in taps/transformer/_file.py
def transform(self, obj: T) -> PickleFileIdentifier:
    """Transform the object into an identifier.

    Args:
        obj: Object to transform.

    Returns:
        Identifier object that can be used to resolve `obj`.
    """
    identifier = PickleFileIdentifier(self.cache_dir, uuid.uuid4())
    filepath = identifier.path()
    filepath.parent.mkdir(parents=True, exist_ok=True)

    with open(filepath, 'wb', buffering=0) as f:
        pickle.dump(obj, f)

    return identifier

resolve

resolve(identifier: PickleFileIdentifier) -> Any

Resolve an object from an identifier.

Parameters:

Returns:

  • Any

    The resolved object.

Source code in taps/transformer/_file.py
def resolve(self, identifier: PickleFileIdentifier) -> Any:
    """Resolve an object from an identifier.

    Args:
        identifier: Identifier to an object.

    Returns:
        The resolved object.
    """
    filepath = identifier.path()
    with open(filepath, 'rb') as f:
        obj = pickle.load(f)
    return obj

PickleFileTransformerConfig

Bases: TransformerConfig

PickleFileTransformer plugin configuration.

Parameters:

  • name (Literal[str], default: 'file' ) –

    Transformer name.

  • file_dir (str) –

    Object file directory.

get_transformer

get_transformer() -> PickleFileTransformer

Create a transformer from the configuration.

Source code in taps/transformer/_file.py
def get_transformer(self) -> PickleFileTransformer:
    """Create a transformer from the configuration."""
    return PickleFileTransformer(self.file_dir)

PickleFileIdentifier

Bases: NamedTuple

Identifier type for the PickleFileTransformer.

Parameters:

  • cache_dir (ForwardRef('pathlib.Path'), default: None ) –
  • obj_id (ForwardRef('uuid.UUID'), default: None ) –

Attributes:

  • cache_dir

    Object directory.

  • obj_id

    Object ID.

path

path() -> Path

Get path to the object.

Source code in taps/transformer/_file.py
def path(self) -> pathlib.Path:
    """Get path to the object."""
    return self.cache_dir / str(self.obj_id)

ProxyTransformer

ProxyTransformer(
    store: Store[Any],
    *,
    async_resolve: bool = False,
    extract_target: bool = False,
    metrics_dir: str | None = None
)

Proxy object transformer.

Transforms objects into proxies which act as the identifier.

Parameters:

  • store (Store[Any]) –

    Store instance to use for proxying objects.

  • async_resolve (bool, default: False ) –

    Begin asynchronously resolving proxies when the transformer resolves a proxy (which is otherwise a no-op unless extract_target=True). Not compatible with extract_target=True.

  • extract_target (bool, default: False ) –

    When True, resolving an identifier (i.e., a proxy) will return the target object. Otherwise, the proxy is returned since a proxy can act as the target object. Not compatible with async_resolve=True.

  • metrics_dir (str | None, default: None ) –

    If metrics recording on store is True, then write the recorded metrics to this directory when this transformer is closed. Typically, close() is only called on the transformer instance in the main TaPS process (i.e., close() is not called in worker processes) so only the metrics from the main process will be recorded.

Source code in taps/transformer/_proxy.py
def __init__(
    self,
    store: Store[Any],
    *,
    async_resolve: bool = False,
    extract_target: bool = False,
    metrics_dir: str | None = None,
) -> None:
    if async_resolve and extract_target:
        raise ValueError(
            'Options async_resolve and extract_target cannot be '
            'enabled at the same time.',
        )

    self.store = store
    self.async_resolve = async_resolve
    self.extract_target = extract_target
    self.metrics_dir = (
        pathlib.Path(metrics_dir).resolve()
        if metrics_dir is not None
        else None
    )

close

close() -> None

Close the transformer.

Source code in taps/transformer/_proxy.py
def close(self) -> None:
    """Close the transformer."""
    self.store.close()

    if self.metrics_dir is not None:
        _write_metrics(
            self.store,
            self.metrics_dir / _PROXYSTORE_AGGREGATE_FILE,
            self.metrics_dir / _PROXYSTORE_STATS_FILE,
        )

is_identifier

is_identifier(obj: Any) -> bool

Check if the object is an identifier instance.

Source code in taps/transformer/_proxy.py
def is_identifier(self, obj: Any) -> bool:
    """Check if the object is an identifier instance."""
    return isinstance(obj, Proxy)

transform

transform(obj: T) -> Proxy[T]

Transform the object into an identifier.

Parameters:

  • obj (T) –

    Object to transform.

Returns:

  • Proxy[T]

    Identifier object that can be used to resolve obj.

Source code in taps/transformer/_proxy.py
def transform(self, obj: T) -> Proxy[T]:
    """Transform the object into an identifier.

    Args:
        obj: Object to transform.

    Returns:
        Identifier object that can be used to resolve `obj`.
    """
    return self.store.proxy(obj)

resolve

resolve(identifier: Proxy[T]) -> T | Proxy[T]

Resolve an object from an identifier.

Parameters:

  • identifier (Proxy[T]) –

    Identifier to an object.

Returns:

  • T | Proxy[T]

    The resolved object or a proxy of the resolved object depending on the setting of extract_target.

Source code in taps/transformer/_proxy.py
def resolve(self, identifier: Proxy[T]) -> T | Proxy[T]:
    """Resolve an object from an identifier.

    Args:
        identifier: Identifier to an object.

    Returns:
        The resolved object or a proxy of the resolved object depending \
        on the setting of `extract_target`.
    """
    if self.extract_target:
        return extract(identifier)
    if self.async_resolve:
        resolve_async(identifier)
    return identifier

ProxyTransformerConfig

Bases: TransformerConfig

ProxyTransformer plugin configuration.

Parameters:

  • name (Literal[str], default: 'proxystore' ) –

    Transformer name.

  • connector (ConnectorConfig) –

    Connector configuration.

  • async_resolve (bool, default: False ) –

    Asynchronously resolve proxies. Not compatible with extract_target=True.

  • cache_size (int, default: 16 ) –

    cache size

  • extract_target (bool, default: False ) –

    Extract the target from the proxy when resolving the identifier. Not compatible with async_resolve=True.

  • metrics (bool, default: False ) –

    Enable recording operation metrics.

  • populate_target (bool, default: True ) –

    Populate target objects of newly created proxies.

Note

Extra arguments provided to this config will be passed as parameters to the Store.

get_transformer

get_transformer() -> ProxyTransformer

Create a transformer from the configuration.

Source code in taps/transformer/_proxy.py
def get_transformer(self) -> ProxyTransformer:
    """Create a transformer from the configuration."""
    connector = self.connector.get_connector()

    # Want register=True to be the default unless the user config
    # has explicitly disabled it.
    extra: dict[str, Any] = {'register': True}
    # Guaranteed when config.extra is set to "allow"
    assert self.model_extra is not None
    extra.update(self.model_extra)

    return ProxyTransformer(
        store=Store(
            'proxy-transformer',
            connector=connector,
            cache_size=self.cache_size,
            metrics=self.metrics,
            populate_target=self.populate_target,
            **extra,
        ),
        async_resolve=self.async_resolve,
        extract_target=self.extract_target,
        metrics_dir=_PROXYSTORE_DIR if self.metrics else None,
    )