Skip to content

Latest commit

 

History

History
1255 lines (921 loc) · 35.7 KB

File metadata and controls

1255 lines (921 loc) · 35.7 KB

AGENTS.md - Ray Conversion Guide

Purpose

This guide helps coding agents convert Python scripts into Ray-optimized, cluster-ready code that connects to and runs on remote Ray clusters. All conversions assume a cluster-first approach - scripts will connect to an existing Ray cluster rather than starting a local Ray instance.

How to Use This Guide

  1. Start with Cluster Connection (Section 2) - Always begin by setting up cluster connection via environment variables
  2. Follow the Conversion Workflow (Section 3) - Step-by-step process to convert your script
  3. Reference Use Case Sections (Section 4) - Find patterns specific to your workload type
  4. Check Patterns & Anti-Patterns (Section 5) - Avoid common mistakes
  5. Use Quick Reference Sections (Sections 6-9) - Fast lookup for APIs, examples, and configuration

Prerequisites

  • Ray installed: pip install "ray[default]"
  • Access to a Ray cluster (cluster address)
  • RAY_ADDRESS environment variable set
  • RAY_NAMESPACE environment variable set (required)

Project Structure and Workflow

Input/Output Directories

This repository uses a simple workflow for script conversion:

  • input/: Place the original Python script(s) to be converted here
  • output/: Write the converted Ray-optimized script(s) here

Workflow:

  1. Original script is placed in input/ directory
  2. Agent reads script from input/ and converts it using this guide
  3. Converted script is written to output/ directory
  4. Converted script connects to Ray cluster via RAY_ADDRESS environment variable

Example:

  • Input: input/my_script.py (original Python script)
  • Output: output/my_script.py (Ray-optimized version)

Resource Directory Structure

This guide references documentation in the resources/ directory:

  • resources/ray-core/ - Core Ray concepts (tasks, actors, objects, scheduling)
  • resources/ray-core/patterns/ - Design patterns and anti-patterns
  • resources/ray-core/api/ - API reference documentation
  • resources/data/ - Ray Data for batch processing
  • resources/train/ - Ray Train for distributed training
  • resources/tune/ - Ray Tune for hyperparameter tuning
  • resources/serve/ - Ray Serve for model serving
  • resources/rllib.md - Ray RLlib for reinforcement learning
  • resources/cluster/ - Cluster deployment and job submission

2. Cluster Connection Setup (CRITICAL FIRST STEP)

IMPORTANT: All Ray scripts must connect to a cluster. Never assume local Ray initialization unless explicitly required.

Environment Variables for Cluster Connection

Configure these environment variables before running your script:

Required Variables

  • RAY_ADDRESS: Cluster address

    • Format: ray://head-node-ip:port (e.g., ray://192.168.1.100:10001)
    • Or: http://head-node-ip:8265 (for Ray Jobs API)
    • Or: auto (connects to existing local cluster)
    • Example: export RAY_ADDRESS="ray://cluster.example.com:10001"
  • RAY_NAMESPACE: Logical grouping of jobs and actors (REQUIRED)

    • Example: export RAY_NAMESPACE="production"
    • Must be set for all Ray connections

Optional Variables

  • RAY_RUNTIME_ENV: JSON string for runtime environment

    • Example: export RAY_RUNTIME_ENV='{"pip": ["numpy", "pandas"], "env_vars": {"MY_VAR": "value"}}'
  • RAY_JOB_CONFIG: JSON string for job configuration

    • Example: export RAY_JOB_CONFIG='{"runtime_env": {"pip": ["requests"]}}'

Other Ray Environment Variables

  • RAY_TASK_MAX_RETRIES: Maximum task retries (default: 3)
  • RAY_gcs_rpc_server_reconnect_timeout_s: GCS reconnection timeout
  • RAY_PICKLE_VERBOSE_DEBUG: Enable pickle debugging (values: 0, 1, 2)

See Environment Variables Reference for complete list.

Standard Connection Pattern

REQUIRED: All connections must use a namespace. Minimal connection pattern:

import os
import ray

# Get required cluster address and namespace from environment variables
cluster_address = os.getenv("RAY_ADDRESS", "auto")
namespace = os.getenv("RAY_NAMESPACE")

if not namespace:
    raise ValueError(
        "RAY_NAMESPACE environment variable is required. "
        "Set it to a namespace name (e.g., production, development)"
    )

# Connect to cluster with namespace
ray.init(address=cluster_address, namespace=namespace)

# Verify connection
if not ray.is_initialized():
    raise RuntimeError("Failed to connect to Ray cluster")

Error Handling for Cluster Connection

import os
import ray
import sys

def connect_to_cluster():
    cluster_address = os.getenv("RAY_ADDRESS")
    namespace = os.getenv("RAY_NAMESPACE")
    
    if not cluster_address:
        raise ValueError(
            "RAY_ADDRESS environment variable not set. "
            "Set it to your cluster address (e.g., ray://head-node:10001)"
        )
    
    if not namespace:
        raise ValueError(
            "RAY_NAMESPACE environment variable is required. "
            "Set it to a namespace name (e.g., production, development)"
        )
    
    try:
        ray.init(address=cluster_address, namespace=namespace)
        if not ray.is_initialized():
            raise RuntimeError("Ray initialization failed")
        return True
    except Exception as e:
        print(f"Failed to connect to cluster at {cluster_address} with namespace {namespace}: {e}", file=sys.stderr)
        sys.exit(1)

# Use in your script
connect_to_cluster()

Connection Verification

After connecting, verify the connection and check available resources:

import ray

# Check if connected
if not ray.is_initialized():
    raise RuntimeError("Not connected to Ray cluster")

# Get cluster information
cluster_resources = ray.cluster_resources()
available_resources = ray.available_resources()

print(f"Cluster resources: {cluster_resources}")
print(f"Available resources: {available_resources}")

# Get GCS address
from ray._private.services import get_node_ip_address
node_ip = get_node_ip_address()
print(f"Connected to node: {node_ip}")

References:


3. Conversion Workflow (Step-by-Step)

Step 1: Analyze the Script

Before converting, analyze your script to identify:

  1. Desired Output Analysis

    • CRITICAL: Analyze what the script is supposed to produce
    • Identify expected return values, output files, or state changes
    • Determine success criteria (what indicates the script worked correctly?)
    • Ask the developer to confirm: "Based on the script, I understand it should produce [X]. Is this correct?"
    • Document the expected output for testing purposes
    • Example questions to ask:
      • What should the script return or output?
      • What files should be created?
      • What state changes should occur?
      • What indicates successful execution?
  2. Parallelizable Operations

    • Loops that can run independently
    • Data processing that can be split
    • Independent function calls
    • Batch operations
  3. Stateful vs Stateless Components

    • Stateless: Functions that don't modify external state → Use @ray.remote tasks
    • Stateful: Classes/objects that maintain state → Use @ray.remote actors
  4. Resource Requirements

    • CPU-intensive operations → Specify num_cpus
    • GPU operations → Specify num_gpus
    • Memory requirements → Specify memory resource
    • Custom resources needed
  5. Data Dependencies and Flow

    • Data that needs to be shared → Use ray.put() and object refs
    • Large data passed to multiple tasks → Use object refs to avoid duplication
    • Data locality requirements

Note: All operations will run on the cluster, not locally. Consider network latency and data transfer costs.

Example Analysis:

# Original script
def process_file(filename):
    data = load_file(filename)
    result = expensive_computation(data)
    return result

results = []
for filename in file_list:
    results.append(process_file(filename))  # Can be parallelized

Analysis:

  • ✅ Parallelizable: Each process_file call is independent
  • ✅ Stateless: Function doesn't maintain state
  • ✅ Resource needs: May need CPU/memory per task
  • ✅ Data: Each file is independent

Step 2: Choose Ray Primitive

Use this decision tree to choose the right Ray primitive:

Is your workload...
├─ Data processing/batch inference?
│  └─ Use Ray Data → resources/data/
├─ Distributed training?
│  └─ Use Ray Train → resources/train/
├─ Hyperparameter tuning?
│  └─ Use Ray Tune → resources/tune/
├─ Model serving?
│  └─ Use Ray Serve → resources/serve/
├─ Reinforcement learning?
│  └─ Use Ray RLlib → resources/rllib.md
└─ Generic distributed computing?
   ├─ Need to maintain state?
   │  └─ Use @ray.remote class (Actor) → resources/ray-core/actors.md
   └─ Stateless operations?
      └─ Use @ray.remote function (Task) → resources/ray-core/tasks.md

When to Use Tasks (@ray.remote functions)

  • Stateless operations
  • Independent function calls
  • Embarrassingly parallel workloads
  • One-time computations
import ray
import os

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

@ray.remote
def process_item(item):
    # Stateless processing
    return item * 2

items = [1, 2, 3, 4, 5]
results = ray.get([process_item.remote(i) for i in items])

References:

When to Use Actors (@ray.remote classes)

  • Stateful operations
  • Services that maintain state
  • Shared resources (models, connections)
  • Sequential operations on shared state
import ray
import os

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1
        return self.value

counter = Counter.remote()
results = [counter.increment.remote() for _ in range(10)]

References:

When to Use Ray Libraries

  • Ray Data: Batch data processing, ETL pipelines
  • Ray Train: Distributed ML training (PyTorch, TensorFlow, etc.)
  • Ray Tune: Hyperparameter optimization
  • Ray Serve: Model serving and APIs
  • Ray RLlib: Reinforcement learning

See Reference Sections by Use Case for details.

Step 3: Apply Ray Patterns

Apply these patterns for optimal cluster performance:

Resource Specification Patterns

Always specify resource requirements for cluster scheduling:

@ray.remote(num_cpus=2, num_gpus=1)
def gpu_task(data):
    # GPU computation
    pass

@ray.remote(num_cpus=4, memory=8 * 1024 * 1024 * 1024)  # 8GB
def memory_intensive_task(data):
    # Memory-intensive operation
    pass

References:

Object Reference Patterns

Use ray.put() for large objects shared across tasks:

import ray
import numpy as np

# Large shared data
large_data = np.random.rand(10000, 10000)

# Put in object store once
data_ref = ray.put(large_data)

# Pass reference to multiple tasks (avoids duplication)
@ray.remote
def process(data_ref, index):
    data = ray.get(data_ref)  # Fetch from object store
    return data[index].sum()

results = [process.remote(data_ref, i) for i in range(100)]

References:

Scheduling and Placement Patterns

Use placement groups and scheduling strategies for cluster-aware placement:

from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

# Create placement group
pg = placement_group([{"CPU": 4}, {"GPU": 1}])
ray.get(pg.ready())

@ray.remote(
    scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)
)
def task_with_placement():
    pass

References:

Fault Tolerance Patterns

Handle failures gracefully:

@ray.remote(max_retries=3)
def unreliable_task():
    # May fail, will retry up to 3 times
    pass

# Check for exceptions
try:
    result = ray.get(unreliable_task.remote())
except Exception as e:
    print(f"Task failed: {e}")

References:

All Patterns:

Step 4: Optimize for Scale

Optimize your code for cluster-scale execution:

Memory Management

  • Use ray.put() for large shared objects
  • Avoid closure capture of large objects
  • Use object refs instead of passing large values
# ❌ BAD: Large object in closure
large_data = np.random.rand(1000000)

@ray.remote
def task():
    return len(large_data)  # large_data serialized with task!

# ✅ GOOD: Pass via object ref
large_data_ref = ray.put(np.random.rand(1000000))

@ray.remote
def task(data_ref):
    data = ray.get(data_ref)
    return len(data)

References:

Concurrency Limits

Limit concurrent tasks to prevent OOM:

# Limit running tasks using resources
@ray.remote(memory=2 * 1024 * 1024 * 1024)  # 2GB per task
def memory_intensive_task(data):
    pass

# Limit pending tasks using ray.wait
MAX_PENDING = 100
pending = []
for item in items:
    if len(pending) >= MAX_PENDING:
        ready, pending = ray.wait(pending, num_returns=1)
        ray.get(ready)
    pending.append(memory_intensive_task.remote(item))

References:

Resource Allocation Strategies

Request appropriate cluster resources:

# CPU-bound tasks
@ray.remote(num_cpus=4)
def cpu_task():
    pass

# GPU tasks
@ray.remote(num_gpus=1)
def gpu_task():
    pass

# Custom resources
@ray.remote(resources={"custom_resource": 1})
def custom_task():
    pass

References:

Step 5: Handle Dependencies & Runtime

Configure runtime environment for cluster workers:

Runtime Environment via Environment Variables

import os
import json
import ray

# Parse runtime environment from env var
runtime_env_json = os.getenv("RAY_RUNTIME_ENV")
if runtime_env_json:
    runtime_env = json.loads(runtime_env_json)
else:
    runtime_env = {
        "pip": ["numpy", "pandas"],
        "env_vars": {"MY_VAR": "value"}
    }

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE"),
    runtime_env=runtime_env
)

Dependency Management

Specify dependencies that cluster workers need:

runtime_env = {
    "pip": ["torch", "transformers", "datasets"],
    "conda": ["pytorch"],
    "env_vars": {
        "HF_HOME": "/tmp/huggingface",
        "TRANSFORMERS_CACHE": "/tmp/transformers"
    },
    "working_dir": "https://github.com/user/repo/archive/main.zip"
}

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE"),
    runtime_env=runtime_env
)

References:

Step 6: Cluster Deployment & Job Submission

Using Ray Jobs API

Submit jobs to cluster using environment variables:

import os
from ray.job_submission import JobSubmissionClient

# Get cluster address from environment
cluster_address = os.getenv("RAY_ADDRESS", "http://localhost:8265")

# Parse runtime environment
import json
runtime_env = json.loads(os.getenv("RAY_RUNTIME_ENV", "{}"))

# Submit job
client = JobSubmissionClient(cluster_address)
job_id = client.submit_job(
    entrypoint="python my_script.py",
    runtime_env=runtime_env
)

print(f"Submitted job {job_id}")

Job Submission with Environment Variables

Set environment variables before submitting:

export RAY_ADDRESS="http://cluster-head:8265"
export RAY_RUNTIME_ENV='{"pip": ["numpy"], "env_vars": {"MY_VAR": "value"}}'
python submit_job.py

References:

Step 7: Create Minimalist Tests

Purpose: Create basic tests to verify the rayified script works and produces the desired output. These are NOT production-grade tests - just simple validation that the script runs correctly.

Testing Philosophy

  • Minimalist Approach: Simple, fast tests that verify basic functionality
  • Local Execution: Tests run locally using ray.init() (no cluster needed)
  • Minimal Data: Use tiny datasets (10-100 items instead of millions)
  • Reduced Iterations: 1-2 iterations/epochs instead of full training
  • Output Validation: Verify the script produces the expected output confirmed in Step 1

Test Structure

Create a tests/ directory in output/ with:

  1. Basic Connectivity Test

    • Verify Ray initializes
    • Verify script imports without errors
    • Verify cluster connection works (if RAY_ADDRESS set)
  2. Core Functionality Test

    • Run the main script logic with minimal data
    • Verify it produces the expected output (from Step 1)
    • Check for basic errors (timeouts, exceptions, wrong outputs)
  3. Ray Primitives Test

    • Verify tasks/actors execute correctly
    • Verify data flows through Ray primitives
    • Check object references work

Test Requirements

  • Fast: Tests should complete in < 30 seconds total
  • Simple: No complex mocking or fixtures needed
  • Clear: Each test clearly shows what it's checking
  • Isolated: Tests don't depend on each other
  • Verifiable: Expected outputs are explicit and checkable

Example Test Approach

For a script that processes files and returns results:

# Test: Verify script processes minimal data and returns expected format
# Expected output: List of processed results matching input count
# Minimal data: 3 test files instead of full dataset

For a training script:

# Test: Verify training setup works with 1 epoch, minimal data
# Expected output: Model trains without errors, returns metrics
# Minimal data: 10 samples instead of full dataset

Verification Strategy

  1. Run Tests Locally: pytest tests/ -v should show all tests passing
  2. Check Output: Verify actual output matches expected output from Step 1
  3. No Errors: Script should run without exceptions or timeouts
  4. Clean State: Ray should clean up properly after tests

What NOT to Test

  • Don't test full-scale performance
  • Don't test with production-sized data
  • Don't create complex test infrastructure
  • Don't test edge cases extensively
  • Don't aim for high code coverage

Goal: Just verify the script works correctly with minimal data and produces the expected output.

Step 8: Containerization (Optional - for Long-Running Scripts)

Purpose: Create Dockerfile and docker-compose.yml for scripts that need to run indefinitely or for extended periods on a VM.

Containerization Approach

  1. Template-Based Generation

    • Use base templates for Dockerfile and docker-compose.yml
    • Customize based on script requirements
    • Place both files in output/ directory
  2. Dockerfile Structure

    • Base image with Python and Ray
    • Install dependencies from requirements.txt
    • Copy rayified script
    • Set entrypoint to run the script
  3. Docker Compose Configuration

    • Service definition for the script
    • Environment variables (RAY_ADDRESS, RAY_NAMESPACE, etc.)
    • Resource limits (CPU, memory)
    • Restart policy (unless-stopped or on-failure)
    • Volume mounts if script needs data persistence
  4. Environment Variables

    • Inject cluster connection via environment
    • Application-specific configuration
    • Use .env file or docker-compose environment section

Output Structure

output/
├── my_script.py          # Rayified script
├── Dockerfile            # Container definition
├── docker-compose.yml    # Compose configuration
└── requirements.txt      # Python dependencies

Agent Workflow

  1. Analyze Script Requirements

    • Identify dependencies (pip packages)
    • Determine resource needs (CPU, memory)
    • Check for file I/O or data persistence needs
  2. Generate Dockerfile

    • Start from Python base image
    • Install Ray and dependencies
    • Copy script and set entrypoint
  3. Generate docker-compose.yml

    • Configure service with resource limits
    • Set restart policy
    • Configure environment variables
    • Add volumes if needed

Key Configuration Points

Dockerfile:

  • Base image (Python version)
  • Ray installation
  • Dependencies installation
  • Script copying
  • Entrypoint command

docker-compose.yml:

  • Service name
  • Resource limits (cpus, memory)
  • Restart policy
  • Environment variables
  • Volume mounts (if needed)

Example Structure

The generated files should be minimal and focused:

  • Dockerfile: Builds container with script and dependencies
  • docker-compose.yml: Orchestrates container with proper configuration
  • Both files reference environment variables for cluster connection

Note: This is for containerization only. Deployment to VMs is handled separately by copying files and running docker-compose up -d on the target VM.


4. Reference Sections by Use Case

4.1 Data Processing & Batch Inference

Use Ray Data for scalable data processing and batch inference.

Cluster Connection Example:

import os
import ray
import ray.data

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

# Create dataset
ds = ray.data.range(1000)

# Process in parallel on cluster
ds = ds.map(lambda x: x * 2)

# Batch inference
@ray.remote
class Model:
    def __init__(self):
        # Load model
        pass
    
    def predict(self, batch):
        # Inference
        return batch

predictions = ds.map_batches(
    Model,
    compute=ray.data.ActorPoolStrategy(size=4)
)

References:

4.2 Distributed Training

Use Ray Train for distributed ML training across cluster nodes.

Cluster Connection Example:

import os
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

def train_func():
    # Training logic
    pass

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(
        num_workers=int(os.getenv("NUM_WORKERS", "4")),
        use_gpu=os.getenv("USE_GPU", "false").lower() == "true"
    )
)

result = trainer.fit()

References:

4.3 Hyperparameter Tuning

Use Ray Tune for distributed hyperparameter optimization.

Cluster Connection Example:

import os
import ray
from ray import tune
from ray.train import ScalingConfig

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

def trainable(config):
    # Training with hyperparameters
    return {"loss": config["lr"] ** 2}

tuner = tune.Tuner(
    trainable,
    param_space={"lr": tune.grid_search([0.1, 0.01, 0.001])},
    tune_config=tune.TuneConfig(
        num_samples=int(os.getenv("NUM_SAMPLES", "10"))
    )
)

results = tuner.fit()

References:

4.4 Model Serving

Use Ray Serve for scalable model serving on clusters.

Cluster Connection Example:

import os
from ray import serve
from fastapi import FastAPI

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

app = FastAPI()

@serve.deployment(
    num_replicas=int(os.getenv("NUM_REPLICAS", "2")),
    ray_actor_options={"num_gpus": 1 if os.getenv("USE_GPU") == "true" else 0}
)
@serve.ingress(app)
class ModelDeployment:
    def __init__(self):
        # Load model
        pass
    
    @app.post("/predict")
    def predict(self, data):
        # Inference
        return {"result": "prediction"}

serve.run(ModelDeployment.bind())

References:

4.5 Reinforcement Learning

Use Ray RLlib for distributed reinforcement learning.

Cluster Connection Example:

import os
import ray
from ray.rllib.algorithms.ppo import PPOConfig

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

config = (
    PPOConfig()
    .environment(env="CartPole-v1")
    .rollouts(num_rollout_workers=int(os.getenv("NUM_WORKERS", "4")))
)

algo = config.build()
algo.train()

References:

4.6 Generic Distributed Computing

Use Ray Core primitives (tasks, actors, objects) for custom distributed workloads.

Cluster Connection Example:

import os
import ray

ray.init(
    address=os.getenv("RAY_ADDRESS"),
    namespace=os.getenv("RAY_NAMESPACE")
)

@ray.remote(num_cpus=2)
def compute_task(data):
    # Distributed computation
    return result

results = ray.get([compute_task.remote(i) for i in range(100)])

References:


5. Design Patterns & Anti-Patterns

Patterns (Best Practices)

  1. Limit Running Tasks - Use resources to control concurrency

  2. Limit Pending Tasks - Use ray.wait() for backpressure

Anti-Patterns (Common Mistakes)

  1. Pass Large Arguments By Value - Use ray.put() instead

  2. Closure Capture of Large Objects - Pass via arguments

Cluster-Specific Considerations

  • Network Latency: Minimize data transfer between nodes
  • Resource Availability: Check cluster resources before requesting
  • Fault Tolerance: Handle node failures gracefully
  • Data Locality: Consider data placement for performance

All Pattern Files:


6. API Reference Quick Links

Core APIs

Runtime & Configuration

Advanced Features

Complete API Reference


7. Examples Index

Ray Core Examples

Ray Data Examples

Ray Train Examples

Ray Tune Examples

Ray Serve Examples

LLM Examples

Cluster Examples


8. Optimization Checklist

Pre-Conversion Analysis

  • Identify all parallelizable operations
  • Determine stateful vs stateless components
  • Map resource requirements (CPU/GPU/memory)
  • Identify data dependencies and sharing needs
  • Check cluster resource availability
  • Verify cluster connection (RAY_ADDRESS set)

Conversion Steps

  • Analyze desired output and confirm with developer
  • Set up cluster connection via RAY_ADDRESS
  • Choose appropriate Ray primitive (Task/Actor/Library)
  • Apply resource specifications
  • Use object refs for large shared data
  • Configure runtime environment
  • Add error handling and retries
  • Create minimalist tests to verify script works and produces expected output
  • Generate Dockerfile and docker-compose.yml in output/ (if script needs to run indefinitely)

Post-Conversion Optimization

  • Run minimalist tests locally - all tests should pass
  • Verify script produces expected output (from Step 1)
  • Verify cluster connection works
  • Check resource utilization
  • Optimize memory usage (use ray.put() for large objects)
  • Set concurrency limits if needed
  • Add monitoring/logging
  • Test fault tolerance
  • Profile and optimize hot paths

Performance Tuning

  • Monitor cluster resource usage
  • Adjust resource requests based on actual usage
  • Optimize data transfer (minimize object store transfers)
  • Use placement groups for data locality
  • Tune concurrency limits
  • Profile task execution times

Cluster Resource Optimization

  • Request appropriate resources (not too much, not too little)
  • Use custom resources for specialized hardware
  • Consider data locality when scheduling
  • Monitor cluster-wide resource utilization
  • Scale cluster if needed

9. Environment Variables Reference

Cluster Connection Variables

Variable Description Example
RAY_ADDRESS Cluster address (required) ray://head-node:10001 or http://head-node:8265
RAY_NAMESPACE Logical grouping namespace (required) production or development

Runtime Environment Variables

Variable Description Example
RAY_RUNTIME_ENV JSON string for runtime environment {"pip": ["numpy"], "env_vars": {"VAR": "value"}}
RAY_JOB_CONFIG JSON string for job configuration {"runtime_env": {"pip": ["requests"]}}

Fault Tolerance Variables

Variable Description Example
RAY_gcs_rpc_server_reconnect_timeout_s GCS reconnection timeout (seconds) 60

Task Configuration Variables

Variable Description Example
RAY_TASK_MAX_RETRIES Maximum task retries 3 (default) or 0 to disable

Debugging Variables

Variable Description Example
RAY_PICKLE_VERBOSE_DEBUG Enable pickle debugging 0 (off), 1, or 2

Other Ray Environment Variables

Ray may use additional environment variables for configuration. Check Ray documentation for the complete list.

References:


Quick Reference: Conversion Template

Use this template as a starting point for converting scripts:

import os
import ray

# 1. Connect to cluster
cluster_address = os.getenv("RAY_ADDRESS", "auto")
namespace = os.getenv("RAY_NAMESPACE")

if not namespace:
    raise ValueError("RAY_NAMESPACE environment variable is required")

ray.init(address=cluster_address, namespace=namespace)

# 2. Verify connection
if not ray.is_initialized():
    raise RuntimeError("Failed to connect to Ray cluster")

# 3. Convert your functions to Ray tasks/actors
@ray.remote(num_cpus=1)
def your_function(arg):
    # Your logic here
    return result

# 4. Use Ray primitives
results = ray.get([your_function.remote(arg) for arg in args])

# 5. Cleanup
ray.shutdown()

Additional Resources


Remember: Always connect to a cluster via RAY_ADDRESS. Never assume local Ray unless explicitly required.