eformer.executor.ray.types#

Type definitions and error handling utilities for Ray-based distributed execution.

This module provides a comprehensive type system and error handling framework for managing distributed computation jobs using Ray. It includes:

  1. Error handling utilities for Ray-specific exceptions

  2. Serializable exception information for cross-process error propagation

  3. Job status tracking with success/failure/preemption states

  4. Reference management and communication primitives

  5. Performance monitoring utilities

  6. Multi-slice coordination data structures for TPU/GPU clusters

The module is designed to provide robust error handling and status tracking for distributed training and inference workloads, with special attention to preemption scenarios common in cloud TPU environments.

Example

Basic job status tracking:

>>> job_info = JobInfo(name="training_job", state="running", kind="training")
>>>
>>> status = JobSucceeded(job_info, result={"loss": 0.1})
>>>
>>> status = JobFailed(job_info, error=ValueError("Invalid input"))

Exception serialization:

>>> try:
...     risky_operation()
... except Exception:
...     exc_info = ExceptionInfo.ser_exc_info()
...
...     exc_info.reraise()
eformer.executor.ray.types.DONE = <eformer.executor.ray.types.DoneSentinel object>#

Global instance of DoneSentinel for signaling completion.

This singleton instance should be used throughout the codebase to maintain consistency when checking for completion states.

Example

>>> queue.put(result)
>>> queue.put(DONE)
>>>
>>> while True:
...     item = queue.get()
...     if item is DONE:
...         break
...     process(item)
class eformer.executor.ray.types.DoneSentinel[source]#

Bases: object

Sentinel class to indicate completion or termination state.

This class serves as a unique marker object to signal that a process, computation, or data stream has reached its end. Using a sentinel class instead of None or other values prevents ambiguity when None might be a valid result.

Example

>>> def process_items(items):
...     for item in items:
...         if item is DONE:
...             break
...         yield process_item(item)
class eformer.executor.ray.types.ExceptionInfo(ex: BaseException | None, tb: Traceback)[source]#

Bases: object

Serializable container for exception information across process boundaries.

This class captures exception details and tracebacks in a format that can be serialized and transmitted between Ray actors/tasks. It uses tblib to preserve traceback information, enabling proper error reporting in distributed systems.

ex#

The original exception instance, or None if no exception was captured.

Type

BaseException | None

tb#

Serialized traceback information using tblib.Traceback.

Type

tblib.Traceback

Example

Capturing and re-raising an exception in a different process:

>>> try:
...     raise ValueError("Something went wrong")
... except Exception:
...     exc_info = ExceptionInfo.ser_exc_info()
...
>>>
>>> exc_info.reraise()
ex: BaseException | None#
reraise() None[source]#

Re-raise the captured exception with its original traceback.

Raises
  • The original exception that was captured, or a generic Exception

  • if no specific exception was available.

Example

>>> try:
...     dangerous_operation()
... except Exception:
...     exc_info = ExceptionInfo.ser_exc_info()
>>>
>>> exc_info.reraise()
restore() tuple[type[BaseException], BaseException, traceback.TracebackType][source]#

Restore the exception information to standard Python exc_info format.

Returns

A tuple containing (exception_type, exception_value, traceback) compatible with sys.exc_info() format. If no exception was captured, returns a generic Exception with an appropriate message.

Example

>>> exc_info = ExceptionInfo.ser_exc_info()
>>> exc_type, exc_value, exc_tb = exc_info.restore()
>>>
classmethod ser_exc_info(exception: BaseException | None = None) ExceptionInfo[source]#

Create an ExceptionInfo from current exception context or provided exception.

Parameters

exception – Specific exception to serialize. If None, uses sys.exc_info() to capture the current exception being handled.

Returns

ExceptionInfo containing the serialized exception and traceback.

Example

Capture current exception:

>>> try:
...     risky_function()
... except ValueError as e:
...     exc_info = ExceptionInfo.ser_exc_info()

Capture specific exception:

>>> try:
...     risky_function()
... except ValueError as e:
...     exc_info = ExceptionInfo.ser_exc_info(e)
tb: Traceback#
class eformer.executor.ray.types.HostInfo(host_id: int, slice_name: str, num_devices: int | None, healthy: bool, failed: bool, node_id: str | None = None)[source]#

Bases: object

Information about a TPU host within a slice.

host_id#

Unique identifier for the host within its slice.

Type

int

slice_name#

Name of the TPU slice this host belongs to.

Type

str

num_devices#

Number of TPU devices available on this host.

Type

int | None

healthy#

Whether the host is currently healthy and operational.

Type

bool

failed#

Whether the host has encountered a failure.

Type

bool

failed: bool#
healthy: bool#
host_id: int#
node_id: str | None = None#
num_devices: int | None#
slice_name: str#
class eformer.executor.ray.types.JobError(info: JobInfo, error: Exception)[source]#

Bases: JobStatus

Indicates that the job encountered an internal or unexpected error.

This is typically reserved for unexpected exceptions, infrastructure issues, or serialization problems in the Ray runtime.

error#

The exception or error message from the failure.

Type

Exception

error: Exception#
class eformer.executor.ray.types.JobFailed(info: JobInfo, error: Exception)[source]#

Bases: JobStatus

Indicates that the job ran to completion but failed due to an expected runtime issue.

This could include errors such as invalid input, failed assertions, or handled exceptions.

error#

The exception describing why the job failed.

Type

Exception

error: Exception#
class eformer.executor.ray.types.JobInfo(name: str, state: str, kind: str)[source]#

Bases: object

Metadata describing a TPU/GPU/CPU job managed via Ray.

name#

A human-readable identifier for the job.

Type

str

state#

The current state of the job (e.g., “pending”, “running”, “succeeded”, “failed”).

Type

str

kind#

The type or classification of the job (e.g., “training”, “inference”).

Type

str

kind: str#
name: str#
state: str#
class eformer.executor.ray.types.JobPreempted(info: JobInfo, error: Exception)[source]#

Bases: JobStatus

Indicates that the job was interrupted or preempted, likely by external factors such as TPU quota eviction or infrastructure scaling events.

error#

The exception raised due to preemption.

Type

Exception

error: Exception#
class eformer.executor.ray.types.JobStatus(info: JobInfo)[source]#

Bases: object

Base class representing the final status of a job after a Ray call.

This class wraps job metadata and serves as a common interface for distinguishing between successful and failed executions.

info#

Metadata about the job.

Type

JobInfo

info: JobInfo#
class eformer.executor.ray.types.JobSucceeded(info: JobInfo, result: object)[source]#

Bases: JobStatus

Indicates that the job completed successfully and returned a result.

result#

The output produced by the job.

Type

object

result: object#
class eformer.executor.ray.types.MultisliceInfo(coordinator_ip: str, slice_id: int, num_slices: int, port: int = 8081)[source]#

Bases: object

Information about a multi-slice configuration for distributed execution.

This class stores configuration data for multi-slice TPU/GPU clusters where computation is distributed across multiple slices that coordinate through a central coordinator node.

coordinator_ip#

IP address of the coordinator node.

Type

str

slice_id#

Unique identifier for this slice within the multi-slice setup.

Type

int

num_slices#

Total number of slices in the multi-slice configuration.

Type

int

port#

Port number for multi-slice coordination communication.

Type

int

Example

>>> multi_slice_config = MultisliceInfo(
...     coordinator_ip="10.0.0.1",
...     slice_id=0,
...     num_slices=4,
...     port=8081
... )
>>> print(f"Slice {multi_slice_config.slice_id} of {multi_slice_config.num_slices}")
coordinator_ip: str#
num_slices: int#
port: int = 8081#
slice_id: int#
class eformer.executor.ray.types.RefBox(ref: ObjectRef)[source]#

Bases: object

Wrapper to prevent automatic ObjectRef dereferencing in Ray.

Ray automatically dereferences ObjectRefs when they are passed as arguments to remote functions, but this doesn’t happen when they’re nested inside other objects. RefBox takes advantage of this behavior to control when dereferencing occurs, which can be useful for lazy evaluation or passing references between actors without triggering computation.

ref#

The Ray ObjectRef to be wrapped.

Type

ray.ObjectRef

Example

>>>
>>> result_ref = expensive_computation.remote()
>>> boxed = RefBox(result_ref)
>>> another_task.remote(boxed)
>>>
>>>
>>> actual_result = boxed.get()

See also

Ray documentation on object passing: https://docs.ray.io/en/latest/ray-core/objects.html

get()[source]#

Dereference the wrapped ObjectRef and return its value.

Returns

The actual value stored in the ObjectRef.

Raises

Any exception that occurred during the computation of the ObjectRef.

Example

>>> computation_ref = expensive_task.remote()
>>> box = RefBox(computation_ref)
>>> result = box.get()
ref: ObjectRef#
class eformer.executor.ray.types.SliceInfo(slice_name: str, num_hosts: int, ip_address: str, num_accelerators_per_host: int, node_ids: list[str] | None = None, host_infos: list[dict] | None = None)[source]#

Bases: object

Information about a single compute slice in a distributed cluster.

This class represents the configuration and metadata for a single compute slice, which typically consists of multiple hosts with accelerators (TPUs/GPUs). Used in multi-slice configurations for large-scale distributed training.

slice_name#

Unique name identifier for this slice.

Type

str

num_hosts#

Number of host machines in this slice.

Type

int

ip_address#

IP address of the slice head node.

Type

str

num_accelerators_per_host#

Number of accelerators (TPUs/GPUs) per host machine.

Type

int

Example

>>> slice_config = SliceInfo(
...     slice_name="slice-0",
...     num_hosts=8,
...     ip_address="10.0.1.10",
...     num_accelerators_per_host=8
... )
>>> total_accelerators = slice_config.num_hosts * slice_config.num_accelerators_per_host
>>> print(f"Slice has {total_accelerators} total accelerators")
host_infos: list[dict] | None = None#
ip_address: str#
node_ids: list[str] | None = None#
num_accelerators_per_host: int#
num_hosts: int#
slice_name: str#
class eformer.executor.ray.types.SnitchRecipient[source]#

Bases: object

Base class for actors that can receive and handle failure reports from child actors.

This class provides a standardized interface for parent actors to receive error notifications from their child actors or tasks. It implements a “snitch” pattern where children report their failures up the hierarchy for centralized error handling and logging.

logger#

Logger instance for recording child failure events.

Type

logging.Logger

Example

>>> @ray.remote
... class ParentActor(SnitchRecipient):
...     def __init__(self):
...         self.logger = logging.getLogger("ParentActor")
...
...     def spawn_child(self):
...         child = ChildActor.remote()
...
...         return child
logger: Logger#
eformer.executor.ray.types.current_actor_handle() ActorHandle[source]#

Get the handle of the currently executing Ray actor.

Returns

The ActorHandle for the current actor context.

Raises

RuntimeError – If called outside of an actor context (e.g., in a regular task or driver process).

Example

Inside a Ray actor:

>>> @ray.remote
... class MyActor:
...     def get_self_handle(self):
...         return current_actor_handle()
>>>
>>> actor = MyActor.remote()
>>> handle = ray.get(actor.get_self_handle.remote())
eformer.executor.ray.types.handle_ray_error(job_info: JobInfo, e: RayError) JobStatus[source]#

Classify Ray errors and convert them to appropriate JobStatus objects.

This function analyzes Ray-specific exceptions and categorizes them into different types of job failures (preemption, system error, task error, etc.). It provides consistent error handling across the distributed execution system.

Parameters
  • job_info – Metadata about the job that encountered the error.

  • e – The Ray exception that was raised during job execution.

Returns

  • JobPreempted: For infrastructure failures (node/actor death, worker crashes)

  • JobError: For system errors, task errors, or unknown exceptions

Return type

A JobStatus subclass indicating the type of failure

Example

>>> job_info = JobInfo(name="training", state="running", kind="ml_job")
>>> try:
...     ray.get(some_remote_task.remote())
... except NodeDiedError as e:
...     status = handle_ray_error(job_info, e)
...     assert isinstance(status, JobPreempted)
eformer.executor.ray.types.log_failures_to(parent, suppress: bool = False)[source]#

Context manager that reports exceptions to a parent actor.

This context manager wraps code execution and automatically reports any exceptions to a designated parent actor. It’s useful for implementing hierarchical error reporting in distributed Ray applications.

Parameters
  • parent – Parent actor that implements the SnitchRecipient interface and has a _child_failed method.

  • suppress – If True, suppresses the exception after reporting it. If False, re-raises the exception after reporting.

Yields

None

Raises

Any exception that occurs in the wrapped code (unless suppress=True).

Example

In a child actor:

>>> @ray.remote
... class ChildActor:
...     def __init__(self, parent):
...         self.parent = parent
...
...     def risky_operation(self):
...         with log_failures_to(self.parent):
...
...             dangerous_computation()

Suppressing exceptions:

>>> with log_failures_to(parent_actor, suppress=True):
...     might_fail()
eformer.executor.ray.types.print_remote_raise(ray_error) None[source]#

Print the traceback from a Ray remote task error.

This utility function extracts and prints the traceback from a Ray task error, which contains serialized exception information. Useful for debugging failures in distributed Ray computations.

Parameters

ray_error – The .error attribute from a Ray task output, containing a pickled exception with tblib.Traceback.

Example

>>> future = some_remote_task.remote()
>>> try:
...     result = ray.get(future)
... except Exception as e:
...     print_remote_raise(e)