eformer.executor.patch_tpus_ray#

class eformer.executor.patch_tpus_ray.ArgumentParser[source]#

Bases: object

Handles command-line argument parsing for the Ray TPU patcher.

Provides static methods to parse and validate command-line arguments for setting up and managing Ray clusters on TPU infrastructure.

The parser supports multiple argument groups: - General options (logging level, log file) - Cluster configuration (external IPs, TPU version, slice size) - IP configuration (config file, IP lists, head node) - Operation modes (stop, verify, test-ssh, self-job)

static parse_arguments()[source]#

Parse command-line arguments for Ray TPU cluster setup.

Returns

Parsed arguments containing all configuration

options for cluster setup and management.

Return type

argparse.Namespace

Note

Uses argparse.ArgumentDefaultsHelpFormatter to display default values in the help text.

class eformer.executor.patch_tpus_ray.ClusterManager(ray_manager: RayManager)[source]#

Bases: object

Manages TPU cluster setup and configuration.

Provides high-level cluster management operations including setup of multi-node Ray clusters on TPU infrastructure, handling both head and worker nodes, and managing self-job mode for individual machines.

logger#

Logger instance for this class.

ray_manager#

RayManager instance for low-level Ray operations.

Example

>>> ray_manager = RayManager()
>>> cluster_manager = ClusterManager(ray_manager)
>>> cluster_manager.setup_cluster(use_external=False)
setup_cluster(use_external: bool) bool[source]#

Set up a complete Ray cluster with head and worker nodes.

Initializes a Ray cluster across multiple nodes, starting the head node on the first IP and worker nodes on remaining IPs. Stops any existing Ray processes before starting.

Parameters

use_external – If True, use external IPs; otherwise use internal IPs.

Returns

True if cluster was set up successfully, False otherwise.

Return type

bool

Note

Uses global TPU_VERSION, TPU_SLICE_SIZE, and TPU_CORES_PER_HOST variables for resource configuration.

setup_self_job_node(args) int[source]#

Set up a single node in self-job mode.

Configures the current machine as either a head or worker node based on its IP address and provided arguments. Useful for distributed setups where each node independently joins the cluster.

Parameters

args – Namespace containing command-line arguments including: - external: Whether to use external IPs - internal_ips: Comma-separated internal IPs - head_node_ip: IP of external head node (optional) - head_only: Run as head-only without TPU resources - num_slices: Number of TPU slices - verify: Whether to verify cluster after setup - stop: Stop Ray instead of starting

Returns

Exit code (0 for success, 1 for failure).

Return type

int

class eformer.executor.patch_tpus_ray.ColorFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]#

Bases: Formatter

Custom log formatter that adds color coding to log messages based on level.

Formats log messages with ANSI color codes to make different log levels visually distinct in terminal output. Colors are defined in the LEVEL_COLORS dictionary and include support for DEBUG, INFO, WARNING, ERROR, and CRITICAL levels.

Inherits all attributes from logging.Formatter.

Example

>>> handler = logging.StreamHandler()
>>> handler.setFormatter(ColorFormatter())
>>> logger.addHandler(handler)
format(record: LogRecord) str[source]#

Format a log record with color coding.

Parameters

record – The log record to format.

Returns

Formatted log message with ANSI color codes.

Return type

str

class eformer.executor.patch_tpus_ray.IPManager[source]#

Bases: object

Manages IP address operations and configurations.

Provides static methods for retrieving local and external IP addresses, reading IP configurations from YAML files, and testing SSH connectivity to cluster nodes.

This class is designed for managing network configurations in distributed TPU/Ray clusters where nodes need to communicate with each other.

Example

>>> local_ip = IPManager.get_local_ip()
>>> external_ip = IPManager.get_external_ip()
>>> IPManager.read_ips_from_yaml("cluster_config.yaml")
>>> IPManager.test_ssh_connectivity(["10.0.0.1", "10.0.0.2"])
static get_external_ip() str[source]#

Get the external IP address of the machine.

Queries multiple public IP detection services to determine the machine’s external IP address.

Returns

External IP address, or an error message if detection fails.

Return type

str

Note

Tries ipify.org, ipinfo.io, and checkip.amazonaws.com in sequence, using the first successful response.

static get_local_ip() str[source]#

Get the local IP address of the machine.

Retrieves the primary local IP address using the ‘hostname -I’ command.

Returns

Local IP address, or ‘127.0.0.1’ if detection fails.

Return type

str

static read_ips_from_yaml(yaml_file: str) bool[source]#

Read IP addresses from a YAML configuration file.

Parses a YAML file containing ‘internal_ips’ and ‘external_ips’ lists and updates the global INTERNAL_IPS and EXTERNAL_IPS variables.

Parameters

yaml_file – Path to the YAML configuration file.

Returns

True if IPs were successfully read, False on error.

Return type

bool

Expected YAML format:
internal_ips:
  • 10.0.0.1

  • 10.0.0.2

external_ips:
  • 35.192.1.1

  • 35.192.1.2

static test_ssh_connectivity(ips: list[str]) bool[source]#

Test SSH connectivity to all nodes in the cluster.

Attempts to connect to each IP address via SSH and execute a simple command to verify connectivity.

Parameters

ips – List of IP addresses to test.

Returns

True if all connections succeeded, False if any failed.

Return type

bool

Note

Uses the SSH_USER global variable for authentication. Connections use StrictHostKeyChecking=no and a 5-second timeout.

class eformer.executor.patch_tpus_ray.LazyLogger(name: str, level: int | None = None)[source]#

Bases: object

A lazily-initialized logger that defers setup until first use.

This class provides a logger that is not initialized until the first log message is emitted, reducing startup overhead when logging may not be used. It uses ColorFormatter for colorized terminal output.

_name#

Name for the logger.

_level#

Logging level (defaults to LOGGING_LEVEL_ED environment variable).

_logger#

The underlying logging.Logger instance, initialized on first use.

Example

>>> logger = LazyLogger("my_module")
>>> logger.info("This message triggers initialization")
class eformer.executor.patch_tpus_ray.RayManager[source]#

Bases: object

Manages Ray operations including starting, stopping, and verifying clusters.

This class provides comprehensive functionality for managing Ray clusters on TPU infrastructure, including:

  • Finding and validating the Ray executable

  • Starting head and worker nodes with proper resource configuration

  • Stopping individual nodes or entire clusters

  • Verifying cluster health and TPU resource availability

  • Running commands locally or remotely via SSH

logger#

Logger instance for this class.

ray_path#

Path to the Ray executable.

Example

>>> manager = RayManager()
>>> manager.start_head_node("10.0.0.1", {"TPU": 4})
>>> manager.start_worker_node("10.0.0.1", "10.0.0.2", {"TPU": 4})
>>> manager.verify_cluster("10.0.0.1", expected_tpu_count=8)
is_ray_running(ip: str) bool[source]#

Check if Ray is already running on a node.

Determines if Ray processes are running on the specified IP address by checking for ‘ray::IDLE’ processes via ps command.

Parameters

ip – IP address of the node to check.

Returns

True if Ray is running on the node, False otherwise.

Return type

bool

run_command(command: str | list[str], use_sudo: bool = False, check: bool = True, capture_output: bool = False, remote_ip: str | None = None) bool | str[source]#

Run a command locally or remotely with optional output capture.

Executes shell commands either on the local machine or on a remote machine via SSH. Supports sudo elevation and output capture.

Parameters
  • command – Command to execute, as a string or list of arguments.

  • use_sudo – If True, prepend ‘sudo’ to the command.

  • check – If True, raise exception on non-zero exit code.

  • capture_output – If True, return stdout; otherwise return success bool.

  • remote_ip – If provided, execute command on this remote host via SSH.

Returns

If capture_output is True, returns stdout as string.

Otherwise, returns True if command succeeded, False if it failed.

Return type

Union[bool, str]

Example

>>> manager.run_command(["ls", "-la"])
True
>>> manager.run_command("hostname", capture_output=True, remote_ip="10.0.0.1")
'worker-1'
start_head_node(head_ip: str, resources: dict[str, Any]) bool[source]#

Start Ray head node with specified resources.

Initializes the Ray head node on the specified IP address. If Ray is already running on the node, it will be stopped first.

Parameters
  • head_ip – IP address for the head node.

  • resources – Dictionary of custom resources to advertise (e.g., {“TPU”: 4}).

Returns

True if the head node started successfully, False otherwise.

Return type

bool

Note

The head node will listen on port 6379 and start the dashboard on 0.0.0.0 to allow remote access.

start_worker_node(head_ip: str, worker_ip: str, resources: dict[str, Any]) bool[source]#

Start Ray worker node and connect it to the head node.

Initializes a Ray worker node on the specified IP and connects it to the existing head node. If Ray is already running on the worker, it will be stopped first.

Parameters
  • head_ip – IP address of the Ray head node to connect to.

  • worker_ip – IP address for the worker node.

  • resources – Dictionary of custom resources to advertise (e.g., {“TPU”: 4}).

Returns

True if the worker node started and connected successfully,

False otherwise.

Return type

bool

stop_cluster(ips: list[str]) None[source]#

Stop Ray cluster on all nodes.

Stops Ray processes on all nodes in the provided list.

Parameters

ips – List of IP addresses of nodes in the cluster.

stop_node(ip: str) bool[source]#

Stop Ray on a specific node.

Stops all Ray processes on the specified IP address.

Parameters

ip – IP address of the node to stop Ray on.

Returns

True if Ray was stopped successfully, False otherwise.

Return type

bool

verify_cluster(head_ip: str, expected_tpu_count: int, allow_head_only: bool = False) bool[source]#

Verify Ray cluster setup and TPU resource availability.

Connects to the Ray cluster and verifies that the expected number of TPU cores are registered. Allows a 10% tolerance for TPU detection.

Parameters
  • head_ip – IP address of the Ray head node.

  • expected_tpu_count – Expected number of TPU cores across all nodes.

  • allow_head_only – If True, allow verification to pass for head-only nodes with zero TPU resources.

Returns

True if verification passed, False otherwise.

Return type

bool

Note

Creates a temporary Python script to perform verification, which is cleaned up after execution.

eformer.executor.patch_tpus_ray.get_logger(name: str, level: int | None = None) LazyLogger[source]#

Create a LazyLogger instance with the specified name and level.

Factory function for creating LazyLogger instances. Provides a simple interface for obtaining loggers throughout the application.

Parameters
  • name – Name for the logger, typically the module name (__name__).

  • level – Optional logging level override. If None, uses the LOGGING_LEVEL_ED environment variable or defaults to INFO.

Returns

A lazily-initialized logger instance.

Return type

LazyLogger

Example

>>> logger = get_logger(__name__)
>>> logger.info("Application started")
eformer.executor.patch_tpus_ray.main()[source]#

Main entry point for the Ray TPU cluster patcher.

Parses command-line arguments and performs the requested operation: - Set up a complete cluster across multiple nodes - Set up a single node in self-job mode - Test SSH connectivity - Stop an existing cluster - Verify cluster health

Returns

Exit code (0 for success, 1 for failure).

Return type

int