eformer.executor.ray.types#
Type definitions and error handling utilities for Ray-based distributed execution.
This module provides a comprehensive type system and error handling framework for managing distributed computation jobs using Ray. It includes:
Error handling utilities for Ray-specific exceptions
Serializable exception information for cross-process error propagation
Job status tracking with success/failure/preemption states
Reference management and communication primitives
Performance monitoring utilities
Multi-slice coordination data structures for TPU/GPU clusters
The module is designed to provide robust error handling and status tracking for distributed training and inference workloads, with special attention to preemption scenarios common in cloud TPU environments.
Example
Basic job status tracking:
>>> job_info = JobInfo(name="training_job", state="running", kind="training")
>>>
>>> status = JobSucceeded(job_info, result={"loss": 0.1})
>>>
>>> status = JobFailed(job_info, error=ValueError("Invalid input"))
Exception serialization:
>>> try:
... risky_operation()
... except Exception:
... exc_info = ExceptionInfo.ser_exc_info()
...
... exc_info.reraise()
- eformer.executor.ray.types.DONE = <eformer.executor.ray.types.DoneSentinel object>#
Global instance of DoneSentinel for signaling completion.
This singleton instance should be used throughout the codebase to maintain consistency when checking for completion states.
Example
>>> queue.put(result) >>> queue.put(DONE) >>> >>> while True: ... item = queue.get() ... if item is DONE: ... break ... process(item)
- class eformer.executor.ray.types.DoneSentinel[source]#
Bases:
objectSentinel class to indicate completion or termination state.
This class serves as a unique marker object to signal that a process, computation, or data stream has reached its end. Using a sentinel class instead of None or other values prevents ambiguity when None might be a valid result.
Example
>>> def process_items(items): ... for item in items: ... if item is DONE: ... break ... yield process_item(item)
- class eformer.executor.ray.types.ExceptionInfo(ex: BaseException | None, tb: Traceback)[source]#
Bases:
objectSerializable container for exception information across process boundaries.
This class captures exception details and tracebacks in a format that can be serialized and transmitted between Ray actors/tasks. It uses tblib to preserve traceback information, enabling proper error reporting in distributed systems.
- ex#
The original exception instance, or None if no exception was captured.
- Type
BaseException | None
- tb#
Serialized traceback information using tblib.Traceback.
- Type
tblib.Traceback
Example
Capturing and re-raising an exception in a different process:
>>> try: ... raise ValueError("Something went wrong") ... except Exception: ... exc_info = ExceptionInfo.ser_exc_info() ... >>> >>> exc_info.reraise()
- ex: BaseException | None#
- reraise() None[source]#
Re-raise the captured exception with its original traceback.
- Raises
The original exception that was captured, or a generic Exception –
if no specific exception was available. –
Example
>>> try: ... dangerous_operation() ... except Exception: ... exc_info = ExceptionInfo.ser_exc_info() >>> >>> exc_info.reraise()
- restore() tuple[type[BaseException], BaseException, traceback.TracebackType][source]#
Restore the exception information to standard Python exc_info format.
- Returns
A tuple containing (exception_type, exception_value, traceback) compatible with sys.exc_info() format. If no exception was captured, returns a generic Exception with an appropriate message.
Example
>>> exc_info = ExceptionInfo.ser_exc_info() >>> exc_type, exc_value, exc_tb = exc_info.restore() >>>
- classmethod ser_exc_info(exception: BaseException | None = None) ExceptionInfo[source]#
Create an ExceptionInfo from current exception context or provided exception.
- Parameters
exception – Specific exception to serialize. If None, uses sys.exc_info() to capture the current exception being handled.
- Returns
ExceptionInfo containing the serialized exception and traceback.
Example
Capture current exception:
>>> try: ... risky_function() ... except ValueError as e: ... exc_info = ExceptionInfo.ser_exc_info()
Capture specific exception:
>>> try: ... risky_function() ... except ValueError as e: ... exc_info = ExceptionInfo.ser_exc_info(e)
- tb: Traceback#
- class eformer.executor.ray.types.HostInfo(host_id: int, slice_name: str, num_devices: int | None, healthy: bool, failed: bool, node_id: str | None = None)[source]#
Bases:
objectInformation about a TPU host within a slice.
- host_id#
Unique identifier for the host within its slice.
- Type
int
- slice_name#
Name of the TPU slice this host belongs to.
- Type
str
- num_devices#
Number of TPU devices available on this host.
- Type
int | None
- healthy#
Whether the host is currently healthy and operational.
- Type
bool
- failed#
Whether the host has encountered a failure.
- Type
bool
- failed: bool#
- healthy: bool#
- host_id: int#
- node_id: str | None = None#
- num_devices: int | None#
- slice_name: str#
- class eformer.executor.ray.types.JobError(info: JobInfo, error: Exception)[source]#
Bases:
JobStatusIndicates that the job encountered an internal or unexpected error.
This is typically reserved for unexpected exceptions, infrastructure issues, or serialization problems in the Ray runtime.
- error#
The exception or error message from the failure.
- Type
Exception
- error: Exception#
- class eformer.executor.ray.types.JobFailed(info: JobInfo, error: Exception)[source]#
Bases:
JobStatusIndicates that the job ran to completion but failed due to an expected runtime issue.
This could include errors such as invalid input, failed assertions, or handled exceptions.
- error#
The exception describing why the job failed.
- Type
Exception
- error: Exception#
- class eformer.executor.ray.types.JobInfo(name: str, state: str, kind: str)[source]#
Bases:
objectMetadata describing a TPU/GPU/CPU job managed via Ray.
- name#
A human-readable identifier for the job.
- Type
str
- state#
The current state of the job (e.g., “pending”, “running”, “succeeded”, “failed”).
- Type
str
- kind#
The type or classification of the job (e.g., “training”, “inference”).
- Type
str
- kind: str#
- name: str#
- state: str#
- class eformer.executor.ray.types.JobPreempted(info: JobInfo, error: Exception)[source]#
Bases:
JobStatusIndicates that the job was interrupted or preempted, likely by external factors such as TPU quota eviction or infrastructure scaling events.
- error#
The exception raised due to preemption.
- Type
Exception
- error: Exception#
- class eformer.executor.ray.types.JobStatus(info: JobInfo)[source]#
Bases:
objectBase class representing the final status of a job after a Ray call.
This class wraps job metadata and serves as a common interface for distinguishing between successful and failed executions.
- class eformer.executor.ray.types.JobSucceeded(info: JobInfo, result: object)[source]#
Bases:
JobStatusIndicates that the job completed successfully and returned a result.
- result#
The output produced by the job.
- Type
object
- result: object#
- class eformer.executor.ray.types.MultisliceInfo(coordinator_ip: str, slice_id: int, num_slices: int, port: int = 8081)[source]#
Bases:
objectInformation about a multi-slice configuration for distributed execution.
This class stores configuration data for multi-slice TPU/GPU clusters where computation is distributed across multiple slices that coordinate through a central coordinator node.
- coordinator_ip#
IP address of the coordinator node.
- Type
str
- slice_id#
Unique identifier for this slice within the multi-slice setup.
- Type
int
- num_slices#
Total number of slices in the multi-slice configuration.
- Type
int
- port#
Port number for multi-slice coordination communication.
- Type
int
Example
>>> multi_slice_config = MultisliceInfo( ... coordinator_ip="10.0.0.1", ... slice_id=0, ... num_slices=4, ... port=8081 ... ) >>> print(f"Slice {multi_slice_config.slice_id} of {multi_slice_config.num_slices}")
- coordinator_ip: str#
- num_slices: int#
- port: int = 8081#
- slice_id: int#
- class eformer.executor.ray.types.RefBox(ref: ObjectRef)[source]#
Bases:
objectWrapper to prevent automatic ObjectRef dereferencing in Ray.
Ray automatically dereferences ObjectRefs when they are passed as arguments to remote functions, but this doesn’t happen when they’re nested inside other objects. RefBox takes advantage of this behavior to control when dereferencing occurs, which can be useful for lazy evaluation or passing references between actors without triggering computation.
- ref#
The Ray ObjectRef to be wrapped.
- Type
ray.ObjectRef
Example
>>> >>> result_ref = expensive_computation.remote() >>> boxed = RefBox(result_ref) >>> another_task.remote(boxed) >>> >>> >>> actual_result = boxed.get()
See also
Ray documentation on object passing: https://docs.ray.io/en/latest/ray-core/objects.html
- get()[source]#
Dereference the wrapped ObjectRef and return its value.
- Returns
The actual value stored in the ObjectRef.
- Raises
Any exception that occurred during the computation of the ObjectRef. –
Example
>>> computation_ref = expensive_task.remote() >>> box = RefBox(computation_ref) >>> result = box.get()
- ref: ObjectRef#
- class eformer.executor.ray.types.SliceInfo(slice_name: str, num_hosts: int, ip_address: str, num_accelerators_per_host: int, node_ids: list[str] | None = None, host_infos: list[dict] | None = None)[source]#
Bases:
objectInformation about a single compute slice in a distributed cluster.
This class represents the configuration and metadata for a single compute slice, which typically consists of multiple hosts with accelerators (TPUs/GPUs). Used in multi-slice configurations for large-scale distributed training.
- slice_name#
Unique name identifier for this slice.
- Type
str
- num_hosts#
Number of host machines in this slice.
- Type
int
- ip_address#
IP address of the slice head node.
- Type
str
- num_accelerators_per_host#
Number of accelerators (TPUs/GPUs) per host machine.
- Type
int
Example
>>> slice_config = SliceInfo( ... slice_name="slice-0", ... num_hosts=8, ... ip_address="10.0.1.10", ... num_accelerators_per_host=8 ... ) >>> total_accelerators = slice_config.num_hosts * slice_config.num_accelerators_per_host >>> print(f"Slice has {total_accelerators} total accelerators")
- host_infos: list[dict] | None = None#
- ip_address: str#
- node_ids: list[str] | None = None#
- num_accelerators_per_host: int#
- num_hosts: int#
- slice_name: str#
- class eformer.executor.ray.types.SnitchRecipient[source]#
Bases:
objectBase class for actors that can receive and handle failure reports from child actors.
This class provides a standardized interface for parent actors to receive error notifications from their child actors or tasks. It implements a “snitch” pattern where children report their failures up the hierarchy for centralized error handling and logging.
- logger#
Logger instance for recording child failure events.
- Type
logging.Logger
Example
>>> @ray.remote ... class ParentActor(SnitchRecipient): ... def __init__(self): ... self.logger = logging.getLogger("ParentActor") ... ... def spawn_child(self): ... child = ChildActor.remote() ... ... return child
- logger: Logger#
- eformer.executor.ray.types.current_actor_handle() ActorHandle[source]#
Get the handle of the currently executing Ray actor.
- Returns
The ActorHandle for the current actor context.
- Raises
RuntimeError – If called outside of an actor context (e.g., in a regular task or driver process).
Example
Inside a Ray actor:
>>> @ray.remote ... class MyActor: ... def get_self_handle(self): ... return current_actor_handle() >>> >>> actor = MyActor.remote() >>> handle = ray.get(actor.get_self_handle.remote())
- eformer.executor.ray.types.handle_ray_error(job_info: JobInfo, e: RayError) JobStatus[source]#
Classify Ray errors and convert them to appropriate JobStatus objects.
This function analyzes Ray-specific exceptions and categorizes them into different types of job failures (preemption, system error, task error, etc.). It provides consistent error handling across the distributed execution system.
- Parameters
job_info – Metadata about the job that encountered the error.
e – The Ray exception that was raised during job execution.
- Returns
JobPreempted: For infrastructure failures (node/actor death, worker crashes)
JobError: For system errors, task errors, or unknown exceptions
- Return type
A JobStatus subclass indicating the type of failure
Example
>>> job_info = JobInfo(name="training", state="running", kind="ml_job") >>> try: ... ray.get(some_remote_task.remote()) ... except NodeDiedError as e: ... status = handle_ray_error(job_info, e) ... assert isinstance(status, JobPreempted)
- eformer.executor.ray.types.log_failures_to(parent, suppress: bool = False)[source]#
Context manager that reports exceptions to a parent actor.
This context manager wraps code execution and automatically reports any exceptions to a designated parent actor. It’s useful for implementing hierarchical error reporting in distributed Ray applications.
- Parameters
parent – Parent actor that implements the SnitchRecipient interface and has a _child_failed method.
suppress – If True, suppresses the exception after reporting it. If False, re-raises the exception after reporting.
- Yields
None
- Raises
Any exception that occurs in the wrapped code (unless suppress=True). –
Example
In a child actor:
>>> @ray.remote ... class ChildActor: ... def __init__(self, parent): ... self.parent = parent ... ... def risky_operation(self): ... with log_failures_to(self.parent): ... ... dangerous_computation()
Suppressing exceptions:
>>> with log_failures_to(parent_actor, suppress=True): ... might_fail()
- eformer.executor.ray.types.print_remote_raise(ray_error) None[source]#
Print the traceback from a Ray remote task error.
This utility function extracts and prints the traceback from a Ray task error, which contains serialized exception information. Useful for debugging failures in distributed Ray computations.
- Parameters
ray_error – The .error attribute from a Ray task output, containing a pickled exception with tblib.Traceback.
Example
>>> future = some_remote_task.remote() >>> try: ... result = ray.get(future) ... except Exception as e: ... print_remote_raise(e)