eformer.executor.ray.pool_manager#
Resource pool management for distributed Ray actors.
This module provides comprehensive abstractions for managing pools of Ray actors, with specialized focus on TPU/GPU slice management for distributed computing. It includes health monitoring, automatic scaling, resource lifecycle management, and placement group coordination for optimal resource allocation.
- Key Components:
ActorPoolMember: Wrapper for actor handles with metadata
ResourcePoolManager: Abstract base for managing actor pools
SlicePoolManager: Specialized manager for TPU/GPU slices with placement groups
SliceActor: Ray actor for managing individual compute slices
DeviceHostActor: Ray actor for managing individual TPU hosts within slices
- Resource Management Features:
Placement group coordination with STRICT_SPREAD strategy
Automatic resource request handling through Ray autoscaler
Health monitoring with graceful shutdown sequences
Robust error handling with actor restart capabilities
Slot-based actor allocation for deterministic placement
- Environment Variables:
EFORMER_SCALE_POLL_S: Scaling operation polling interval (default: “30”)
EFORMER_SCALE_ADD_TIMEOUT_S: Timeout for adding new actors (default: “604800”)
Example
Managing a multi-slice TPU configuration with placement groups:
>>> from eformer.executor.ray import SlicePoolManager
>>>
>>>
>>> manager = SlicePoolManager(tpu_type="v4-8")
>>> manager.scale_multislice(num_slices=4)
>>> actors = manager.get_all_actors_in_pool()
>>>
>>>
>>> manager.prepare_all_slices()
>>> manager.drain_actor_pool()
- class eformer.executor.ray.pool_manager.ActorPoolMember(actor: ActorHandle, actor_info: ActorInfoT)[source]#
Bases:
Generic[ActorInfoT]Container for an actor handle and its associated metadata.
- actor#
Ray actor handle for remote execution.
- Type
ray.actor.ActorHandle
- actor_info#
Metadata about the actor (type depends on ActorInfoT).
- Type
eformer.executor.ray.pool_manager.ActorInfoT
- actor: ActorHandle#
- actor_info: ActorInfoT#
- exception eformer.executor.ray.pool_manager.InsufficientSlicesError[source]#
Bases:
RuntimeErrorRaised when the requested number of TPU slices cannot be allocated.
This exception is raised by SlicePoolManager.scale_multislice when none of the requested slice counts can be satisfied, typically due to: - Insufficient TPU resources in the cluster - Preemption of TPU nodes during scaling - Ray autoscaler unable to provision required nodes
The exception message includes details about requested vs available slices.
Example
>>> manager = SlicePoolManager(tpu_type="v4-32") >>> try: ... manager.scale_multislice([4, 8]) ... except InsufficientSlicesError as e: ... print(f"Could not allocate TPU slices: {e}") ...
- class eformer.executor.ray.pool_manager.ResourcePoolManager[source]#
Bases:
Generic[ActorInfoT]Abstract base class for managing pools of Ray actors.
Provides common functionality for scaling, health monitoring, and lifecycle management of actor pools. Subclasses should implement create_actor() to define how actors are created.
- _actor_pool#
List of active actor pool members.
- create_actor() ActorHandle[source]#
Create a new actor instance.
Must be implemented by subclasses to define actor creation logic.
- Returns
Ray actor handle for the newly created actor.
- Raises
NotImplementedError – If not overridden by subclass.
- drain_actor_pool() None[source]#
Shut down and remove all actors from the pool.
Attempts graceful shutdown first, then forcefully kills actors. Clears the actor pool after draining.
- get_actor_name_from_actor_info(actor_info: ActorInfoT) str[source]#
Generate a human-readable name from actor info.
- Parameters
actor_info – Metadata about the actor.
- Returns
String representation of the actor for logging.
- get_actor_pool_name() str[source]#
Get a human-readable name for this actor pool.
- Returns
String identifier for the pool, defaults to class name.
- get_all_actors_in_pool() list[ray.actor.ActorHandle][source]#
Get all actor handles in the pool.
- Returns
List of Ray actor handles.
- get_all_pool_members() list[eformer.executor.ray.pool_manager.ActorPoolMember[ActorInfoT]][source]#
Get a copy of all pool members with their metadata.
- Returns
List of ActorPoolMember objects containing actors and their info.
- class eformer.executor.ray.pool_manager.SlicePoolManager(tpu_type: str | None)[source]#
Bases:
ResourcePoolManager[SliceInfo]Manager for multiple TPU slices in multi-slice configurations.
Coordinates multiple SliceActors to manage multi-slice TPU configurations. Handles scaling, health monitoring, and distributed task execution across multiple TPU slices. This is the top-level manager used by RayExecutor for multi-slice workloads.
- _tpu_type#
Type of TPU (e.g., “v4-8”, “v5e-16”).
- _last_scale_ts#
Timestamp of last scaling operation for rate limiting.
- _last_scale_check_ts#
Timestamp of last scale check.
- _actor_pool#
List of SliceActor pool members.
- Hierarchy:
SlicePoolManager -> SliceActors -> DeviceHostActors -> Tasks
- Resource Requirements:
Each SliceActor requires a TPU-{type}-head resource.
Each slice requires placement group bundles for host distribution.
Automatically requests resources from Ray autoscaler.
- create_actor() ActorHandle[source]#
Create a new SliceActor to manage a TPU slice.
Creates a SliceActor with appropriate resource requirements based on the TPU type. The actor will manage all hosts within its assigned slice.
- Returns
Ray actor handle for the newly created SliceActor.
- drain_actor_pool() None[source]#
Shut down and remove all actors from the pool.
Attempts graceful shutdown first, then forcefully kills actors. Clears the actor pool after draining.
- execute_on_each_host(fn, *args, env: dict | None = None, **kwargs)[source]#
Execute a function on each host across all slices.
Prepares all slices and runs the function on every host actor in parallel across all slices.
- Parameters
fn – Function to execute on each host.
*args – Positional arguments for fn.
env – Optional environment variables.
**kwargs – Keyword arguments for fn.
- Returns
Nested list of results (outer list for slices, inner for hosts).
- execute_on_each_host_flat(remote_fn, env: dict | None = None, runtime_env: dict | None = None)[source]#
Execute a function on all hosts, returning a flat list.
Similar to execute_on_each_slice but flattens the nested result structure into a single list of ObjectRefs.
- Parameters
remote_fn – Ray remote function or callable to execute.
env – Optional environment variables to set.
runtime_env – Optional Ray runtime environment configuration.
- Returns
Flat list of ObjectRefs from all hosts across all slices.
- execute_on_each_slice(remote_fn, env: dict | None = None, runtime_env: dict | None = None)[source]#
Execute a function on all hosts across all slices.
Prepares all slices and runs the function on every host in parallel. Returns results grouped by slice.
- Parameters
remote_fn – Ray remote function or callable to execute.
env – Optional environment variables to set.
runtime_env – Optional Ray runtime environment configuration.
- Returns
List of lists where outer list represents slices and inner lists contain ObjectRefs for each host in that slice.
- get_actor_name_from_actor_info(actor_info: SliceInfo) str[source]#
Generate a human-readable name from actor info.
- Parameters
actor_info – Metadata about the actor.
- Returns
String representation of the actor for logging.
- get_actor_pool_name() str[source]#
Get a human-readable name for this actor pool.
- Returns
String identifier for the pool, defaults to class name.
- prepare_all_slices() None[source]#
Prepare all slices by ensuring host placement groups.
Pre-requests resources for all slices and prepares their host placement groups for distributed execution. This ensures that all nodes are ready before task execution begins.
This method: 1. Fetches slice information from all SliceActors. 2. Requests host resources for each slice from autoscaler. 3. Creates placement groups with STRICT_SPREAD strategy. 4. Ensures all hosts are discovered and ready.
Note
Called automatically by execute_multislice before running tasks. Essential for proper multi-host coordination within each slice.
- scale_multislice(num_slices: int | collections.abc.Sequence[int]) None[source]#
Scale the pool to the desired number of slices.
Supports flexible scaling with multiple valid sizes. Will scale to the largest feasible size from the provided options. This method is typically called by RayExecutor.execute_multislice to set up the required number of slices.
- Parameters
num_slices – Target number of slices or list of valid sizes. If int: exact number of slices required. If sequence: will try largest first, falling back to smaller.
- Raises
ValueError – If target is invalid or empty list provided.
InsufficientSlicesError – If none of the requested sizes can be achieved.
Example
>>> manager.scale_multislice(4) >>> manager.scale_multislice([2, 4, 8])
Note
Requests TPU head resources from Ray autoscaler.
Removes unhealthy actors before scaling.
Falls back to smaller sizes if larger ones unavailable.
- schedule_on_each_host(remote_fn, env: dict | None = None, runtime_env: dict | None = None)[source]#
Schedule a function on all hosts without waiting for results.
Ensures all slices and hosts are ready, then schedules the function execution on all hosts. Returns immediately with ObjectRefs.
- Parameters
remote_fn – Ray remote function or callable to execute.
env – Optional environment variables to set.
runtime_env – Optional Ray runtime environment configuration.
- Returns
Flat list of ObjectRefs that can be waited on later.