This guide helps coding agents convert Python scripts into Ray-optimized, cluster-ready code that connects to and runs on remote Ray clusters. All conversions assume a cluster-first approach - scripts will connect to an existing Ray cluster rather than starting a local Ray instance.
- Start with Cluster Connection (Section 2) - Always begin by setting up cluster connection via environment variables
- Follow the Conversion Workflow (Section 3) - Step-by-step process to convert your script
- Reference Use Case Sections (Section 4) - Find patterns specific to your workload type
- Check Patterns & Anti-Patterns (Section 5) - Avoid common mistakes
- Use Quick Reference Sections (Sections 6-9) - Fast lookup for APIs, examples, and configuration
- Ray installed:
pip install "ray[default]" - Access to a Ray cluster (cluster address)
RAY_ADDRESSenvironment variable setRAY_NAMESPACEenvironment variable set (required)
This repository uses a simple workflow for script conversion:
input/: Place the original Python script(s) to be converted hereoutput/: Write the converted Ray-optimized script(s) here
Workflow:
- Original script is placed in
input/directory - Agent reads script from
input/and converts it using this guide - Converted script is written to
output/directory - Converted script connects to Ray cluster via
RAY_ADDRESSenvironment variable
Example:
- Input:
input/my_script.py(original Python script) - Output:
output/my_script.py(Ray-optimized version)
This guide references documentation in the resources/ directory:
resources/ray-core/- Core Ray concepts (tasks, actors, objects, scheduling)resources/ray-core/patterns/- Design patterns and anti-patternsresources/ray-core/api/- API reference documentationresources/data/- Ray Data for batch processingresources/train/- Ray Train for distributed trainingresources/tune/- Ray Tune for hyperparameter tuningresources/serve/- Ray Serve for model servingresources/rllib.md- Ray RLlib for reinforcement learningresources/cluster/- Cluster deployment and job submission
IMPORTANT: All Ray scripts must connect to a cluster. Never assume local Ray initialization unless explicitly required.
Configure these environment variables before running your script:
-
RAY_ADDRESS: Cluster address- Format:
ray://head-node-ip:port(e.g.,ray://192.168.1.100:10001) - Or:
http://head-node-ip:8265(for Ray Jobs API) - Or:
auto(connects to existing local cluster) - Example:
export RAY_ADDRESS="ray://cluster.example.com:10001"
- Format:
-
RAY_NAMESPACE: Logical grouping of jobs and actors (REQUIRED)- Example:
export RAY_NAMESPACE="production" - Must be set for all Ray connections
- Example:
-
RAY_RUNTIME_ENV: JSON string for runtime environment- Example:
export RAY_RUNTIME_ENV='{"pip": ["numpy", "pandas"], "env_vars": {"MY_VAR": "value"}}'
- Example:
-
RAY_JOB_CONFIG: JSON string for job configuration- Example:
export RAY_JOB_CONFIG='{"runtime_env": {"pip": ["requests"]}}'
- Example:
RAY_TASK_MAX_RETRIES: Maximum task retries (default: 3)RAY_gcs_rpc_server_reconnect_timeout_s: GCS reconnection timeoutRAY_PICKLE_VERBOSE_DEBUG: Enable pickle debugging (values: 0, 1, 2)
See Environment Variables Reference for complete list.
REQUIRED: All connections must use a namespace. Minimal connection pattern:
import os
import ray
# Get required cluster address and namespace from environment variables
cluster_address = os.getenv("RAY_ADDRESS", "auto")
namespace = os.getenv("RAY_NAMESPACE")
if not namespace:
raise ValueError(
"RAY_NAMESPACE environment variable is required. "
"Set it to a namespace name (e.g., production, development)"
)
# Connect to cluster with namespace
ray.init(address=cluster_address, namespace=namespace)
# Verify connection
if not ray.is_initialized():
raise RuntimeError("Failed to connect to Ray cluster")import os
import ray
import sys
def connect_to_cluster():
cluster_address = os.getenv("RAY_ADDRESS")
namespace = os.getenv("RAY_NAMESPACE")
if not cluster_address:
raise ValueError(
"RAY_ADDRESS environment variable not set. "
"Set it to your cluster address (e.g., ray://head-node:10001)"
)
if not namespace:
raise ValueError(
"RAY_NAMESPACE environment variable is required. "
"Set it to a namespace name (e.g., production, development)"
)
try:
ray.init(address=cluster_address, namespace=namespace)
if not ray.is_initialized():
raise RuntimeError("Ray initialization failed")
return True
except Exception as e:
print(f"Failed to connect to cluster at {cluster_address} with namespace {namespace}: {e}", file=sys.stderr)
sys.exit(1)
# Use in your script
connect_to_cluster()After connecting, verify the connection and check available resources:
import ray
# Check if connected
if not ray.is_initialized():
raise RuntimeError("Not connected to Ray cluster")
# Get cluster information
cluster_resources = ray.cluster_resources()
available_resources = ray.available_resources()
print(f"Cluster resources: {cluster_resources}")
print(f"Available resources: {available_resources}")
# Get GCS address
from ray._private.services import get_node_ip_address
node_ip = get_node_ip_address()
print(f"Connected to node: {node_ip}")References:
Before converting, analyze your script to identify:
-
Desired Output Analysis
- CRITICAL: Analyze what the script is supposed to produce
- Identify expected return values, output files, or state changes
- Determine success criteria (what indicates the script worked correctly?)
- Ask the developer to confirm: "Based on the script, I understand it should produce [X]. Is this correct?"
- Document the expected output for testing purposes
- Example questions to ask:
- What should the script return or output?
- What files should be created?
- What state changes should occur?
- What indicates successful execution?
-
Parallelizable Operations
- Loops that can run independently
- Data processing that can be split
- Independent function calls
- Batch operations
-
Stateful vs Stateless Components
- Stateless: Functions that don't modify external state → Use
@ray.remotetasks - Stateful: Classes/objects that maintain state → Use
@ray.remoteactors
- Stateless: Functions that don't modify external state → Use
-
Resource Requirements
- CPU-intensive operations → Specify
num_cpus - GPU operations → Specify
num_gpus - Memory requirements → Specify
memoryresource - Custom resources needed
- CPU-intensive operations → Specify
-
Data Dependencies and Flow
- Data that needs to be shared → Use
ray.put()and object refs - Large data passed to multiple tasks → Use object refs to avoid duplication
- Data locality requirements
- Data that needs to be shared → Use
Note: All operations will run on the cluster, not locally. Consider network latency and data transfer costs.
Example Analysis:
# Original script
def process_file(filename):
data = load_file(filename)
result = expensive_computation(data)
return result
results = []
for filename in file_list:
results.append(process_file(filename)) # Can be parallelizedAnalysis:
- ✅ Parallelizable: Each
process_filecall is independent - ✅ Stateless: Function doesn't maintain state
- ✅ Resource needs: May need CPU/memory per task
- ✅ Data: Each file is independent
Use this decision tree to choose the right Ray primitive:
Is your workload...
├─ Data processing/batch inference?
│ └─ Use Ray Data → resources/data/
├─ Distributed training?
│ └─ Use Ray Train → resources/train/
├─ Hyperparameter tuning?
│ └─ Use Ray Tune → resources/tune/
├─ Model serving?
│ └─ Use Ray Serve → resources/serve/
├─ Reinforcement learning?
│ └─ Use Ray RLlib → resources/rllib.md
└─ Generic distributed computing?
├─ Need to maintain state?
│ └─ Use @ray.remote class (Actor) → resources/ray-core/actors.md
└─ Stateless operations?
└─ Use @ray.remote function (Task) → resources/ray-core/tasks.md
- Stateless operations
- Independent function calls
- Embarrassingly parallel workloads
- One-time computations
import ray
import os
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
@ray.remote
def process_item(item):
# Stateless processing
return item * 2
items = [1, 2, 3, 4, 5]
results = ray.get([process_item.remote(i) for i in items])References:
- Stateful operations
- Services that maintain state
- Shared resources (models, connections)
- Sequential operations on shared state
import ray
import os
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
counter = Counter.remote()
results = [counter.increment.remote() for _ in range(10)]References:
- Ray Data: Batch data processing, ETL pipelines
- Ray Train: Distributed ML training (PyTorch, TensorFlow, etc.)
- Ray Tune: Hyperparameter optimization
- Ray Serve: Model serving and APIs
- Ray RLlib: Reinforcement learning
See Reference Sections by Use Case for details.
Apply these patterns for optimal cluster performance:
Always specify resource requirements for cluster scheduling:
@ray.remote(num_cpus=2, num_gpus=1)
def gpu_task(data):
# GPU computation
pass
@ray.remote(num_cpus=4, memory=8 * 1024 * 1024 * 1024) # 8GB
def memory_intensive_task(data):
# Memory-intensive operation
passReferences:
Use ray.put() for large objects shared across tasks:
import ray
import numpy as np
# Large shared data
large_data = np.random.rand(10000, 10000)
# Put in object store once
data_ref = ray.put(large_data)
# Pass reference to multiple tasks (avoids duplication)
@ray.remote
def process(data_ref, index):
data = ray.get(data_ref) # Fetch from object store
return data[index].sum()
results = [process.remote(data_ref, i) for i in range(100)]References:
Use placement groups and scheduling strategies for cluster-aware placement:
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Create placement group
pg = placement_group([{"CPU": 4}, {"GPU": 1}])
ray.get(pg.ready())
@ray.remote(
scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)
)
def task_with_placement():
passReferences:
Handle failures gracefully:
@ray.remote(max_retries=3)
def unreliable_task():
# May fail, will retry up to 3 times
pass
# Check for exceptions
try:
result = ray.get(unreliable_task.remote())
except Exception as e:
print(f"Task failed: {e}")References:
All Patterns:
- Pattern: Limit Running Tasks
- Pattern: Limit Pending Tasks
- Anti-Pattern: Pass Large Arg By Value
- Anti-Pattern: Closure Capture
Optimize your code for cluster-scale execution:
- Use
ray.put()for large shared objects - Avoid closure capture of large objects
- Use object refs instead of passing large values
# ❌ BAD: Large object in closure
large_data = np.random.rand(1000000)
@ray.remote
def task():
return len(large_data) # large_data serialized with task!
# ✅ GOOD: Pass via object ref
large_data_ref = ray.put(np.random.rand(1000000))
@ray.remote
def task(data_ref):
data = ray.get(data_ref)
return len(data)References:
Limit concurrent tasks to prevent OOM:
# Limit running tasks using resources
@ray.remote(memory=2 * 1024 * 1024 * 1024) # 2GB per task
def memory_intensive_task(data):
pass
# Limit pending tasks using ray.wait
MAX_PENDING = 100
pending = []
for item in items:
if len(pending) >= MAX_PENDING:
ready, pending = ray.wait(pending, num_returns=1)
ray.get(ready)
pending.append(memory_intensive_task.remote(item))References:
Request appropriate cluster resources:
# CPU-bound tasks
@ray.remote(num_cpus=4)
def cpu_task():
pass
# GPU tasks
@ray.remote(num_gpus=1)
def gpu_task():
pass
# Custom resources
@ray.remote(resources={"custom_resource": 1})
def custom_task():
passReferences:
- All pattern files in resources/ray-core/patterns/
Configure runtime environment for cluster workers:
import os
import json
import ray
# Parse runtime environment from env var
runtime_env_json = os.getenv("RAY_RUNTIME_ENV")
if runtime_env_json:
runtime_env = json.loads(runtime_env_json)
else:
runtime_env = {
"pip": ["numpy", "pandas"],
"env_vars": {"MY_VAR": "value"}
}
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE"),
runtime_env=runtime_env
)Specify dependencies that cluster workers need:
runtime_env = {
"pip": ["torch", "transformers", "datasets"],
"conda": ["pytorch"],
"env_vars": {
"HF_HOME": "/tmp/huggingface",
"TRANSFORMERS_CACHE": "/tmp/transformers"
},
"working_dir": "https://github.com/user/repo/archive/main.zip"
}
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE"),
runtime_env=runtime_env
)References:
Submit jobs to cluster using environment variables:
import os
from ray.job_submission import JobSubmissionClient
# Get cluster address from environment
cluster_address = os.getenv("RAY_ADDRESS", "http://localhost:8265")
# Parse runtime environment
import json
runtime_env = json.loads(os.getenv("RAY_RUNTIME_ENV", "{}"))
# Submit job
client = JobSubmissionClient(cluster_address)
job_id = client.submit_job(
entrypoint="python my_script.py",
runtime_env=runtime_env
)
print(f"Submitted job {job_id}")Set environment variables before submitting:
export RAY_ADDRESS="http://cluster-head:8265"
export RAY_RUNTIME_ENV='{"pip": ["numpy"], "env_vars": {"MY_VAR": "value"}}'
python submit_job.pyReferences:
Purpose: Create basic tests to verify the rayified script works and produces the desired output. These are NOT production-grade tests - just simple validation that the script runs correctly.
- Minimalist Approach: Simple, fast tests that verify basic functionality
- Local Execution: Tests run locally using
ray.init()(no cluster needed) - Minimal Data: Use tiny datasets (10-100 items instead of millions)
- Reduced Iterations: 1-2 iterations/epochs instead of full training
- Output Validation: Verify the script produces the expected output confirmed in Step 1
Create a tests/ directory in output/ with:
-
Basic Connectivity Test
- Verify Ray initializes
- Verify script imports without errors
- Verify cluster connection works (if RAY_ADDRESS set)
-
Core Functionality Test
- Run the main script logic with minimal data
- Verify it produces the expected output (from Step 1)
- Check for basic errors (timeouts, exceptions, wrong outputs)
-
Ray Primitives Test
- Verify tasks/actors execute correctly
- Verify data flows through Ray primitives
- Check object references work
- Fast: Tests should complete in < 30 seconds total
- Simple: No complex mocking or fixtures needed
- Clear: Each test clearly shows what it's checking
- Isolated: Tests don't depend on each other
- Verifiable: Expected outputs are explicit and checkable
For a script that processes files and returns results:
# Test: Verify script processes minimal data and returns expected format
# Expected output: List of processed results matching input count
# Minimal data: 3 test files instead of full datasetFor a training script:
# Test: Verify training setup works with 1 epoch, minimal data
# Expected output: Model trains without errors, returns metrics
# Minimal data: 10 samples instead of full dataset- Run Tests Locally:
pytest tests/ -vshould show all tests passing - Check Output: Verify actual output matches expected output from Step 1
- No Errors: Script should run without exceptions or timeouts
- Clean State: Ray should clean up properly after tests
- Don't test full-scale performance
- Don't test with production-sized data
- Don't create complex test infrastructure
- Don't test edge cases extensively
- Don't aim for high code coverage
Goal: Just verify the script works correctly with minimal data and produces the expected output.
Purpose: Create Dockerfile and docker-compose.yml for scripts that need to run indefinitely or for extended periods on a VM.
-
Template-Based Generation
- Use base templates for Dockerfile and docker-compose.yml
- Customize based on script requirements
- Place both files in
output/directory
-
Dockerfile Structure
- Base image with Python and Ray
- Install dependencies from requirements.txt
- Copy rayified script
- Set entrypoint to run the script
-
Docker Compose Configuration
- Service definition for the script
- Environment variables (RAY_ADDRESS, RAY_NAMESPACE, etc.)
- Resource limits (CPU, memory)
- Restart policy (unless-stopped or on-failure)
- Volume mounts if script needs data persistence
-
Environment Variables
- Inject cluster connection via environment
- Application-specific configuration
- Use .env file or docker-compose environment section
output/
├── my_script.py # Rayified script
├── Dockerfile # Container definition
├── docker-compose.yml # Compose configuration
└── requirements.txt # Python dependencies
-
Analyze Script Requirements
- Identify dependencies (pip packages)
- Determine resource needs (CPU, memory)
- Check for file I/O or data persistence needs
-
Generate Dockerfile
- Start from Python base image
- Install Ray and dependencies
- Copy script and set entrypoint
-
Generate docker-compose.yml
- Configure service with resource limits
- Set restart policy
- Configure environment variables
- Add volumes if needed
Dockerfile:
- Base image (Python version)
- Ray installation
- Dependencies installation
- Script copying
- Entrypoint command
docker-compose.yml:
- Service name
- Resource limits (cpus, memory)
- Restart policy
- Environment variables
- Volume mounts (if needed)
The generated files should be minimal and focused:
- Dockerfile: Builds container with script and dependencies
- docker-compose.yml: Orchestrates container with proper configuration
- Both files reference environment variables for cluster connection
Note: This is for containerization only. Deployment to VMs is handled separately by copying files and running docker-compose up -d on the target VM.
Use Ray Data for scalable data processing and batch inference.
Cluster Connection Example:
import os
import ray
import ray.data
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
# Create dataset
ds = ray.data.range(1000)
# Process in parallel on cluster
ds = ds.map(lambda x: x * 2)
# Batch inference
@ray.remote
class Model:
def __init__(self):
# Load model
pass
def predict(self, batch):
# Inference
return batch
predictions = ds.map_batches(
Model,
compute=ray.data.ActorPoolStrategy(size=4)
)References:
Use Ray Train for distributed ML training across cluster nodes.
Cluster Connection Example:
import os
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
def train_func():
# Training logic
pass
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(
num_workers=int(os.getenv("NUM_WORKERS", "4")),
use_gpu=os.getenv("USE_GPU", "false").lower() == "true"
)
)
result = trainer.fit()References:
Use Ray Tune for distributed hyperparameter optimization.
Cluster Connection Example:
import os
import ray
from ray import tune
from ray.train import ScalingConfig
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
def trainable(config):
# Training with hyperparameters
return {"loss": config["lr"] ** 2}
tuner = tune.Tuner(
trainable,
param_space={"lr": tune.grid_search([0.1, 0.01, 0.001])},
tune_config=tune.TuneConfig(
num_samples=int(os.getenv("NUM_SAMPLES", "10"))
)
)
results = tuner.fit()References:
- Tune Guide
- Tune Examples
- PBT Guide
- Integration Examples (W&B, MLflow, etc.)
Use Ray Serve for scalable model serving on clusters.
Cluster Connection Example:
import os
from ray import serve
from fastapi import FastAPI
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
app = FastAPI()
@serve.deployment(
num_replicas=int(os.getenv("NUM_REPLICAS", "2")),
ray_actor_options={"num_gpus": 1 if os.getenv("USE_GPU") == "true" else 0}
)
@serve.ingress(app)
class ModelDeployment:
def __init__(self):
# Load model
pass
@app.post("/predict")
def predict(self, data):
# Inference
return {"result": "prediction"}
serve.run(ModelDeployment.bind())References:
Use Ray RLlib for distributed reinforcement learning.
Cluster Connection Example:
import os
import ray
from ray.rllib.algorithms.ppo import PPOConfig
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
config = (
PPOConfig()
.environment(env="CartPole-v1")
.rollouts(num_rollout_workers=int(os.getenv("NUM_WORKERS", "4")))
)
algo = config.build()
algo.train()References:
Use Ray Core primitives (tasks, actors, objects) for custom distributed workloads.
Cluster Connection Example:
import os
import ray
ray.init(
address=os.getenv("RAY_ADDRESS"),
namespace=os.getenv("RAY_NAMESPACE")
)
@ray.remote(num_cpus=2)
def compute_task(data):
# Distributed computation
return result
results = ray.get([compute_task.remote(i) for i in range(100)])References:
-
Limit Running Tasks - Use resources to control concurrency
- Pattern: Limit Running Tasks
- Use when: Tasks are memory-intensive or cause interference
-
Limit Pending Tasks - Use
ray.wait()for backpressure- Pattern: Limit Pending Tasks
- Use when: Submitting tasks faster than they complete
-
Pass Large Arguments By Value - Use
ray.put()instead- Anti-Pattern: Pass Large Arg By Value
- Problem: Duplicates large data in object store
- Solution: Use
ray.put()and pass object refs
-
Closure Capture of Large Objects - Pass via arguments
- Anti-Pattern: Closure Capture
- Problem: Large objects serialized with task definition
- Solution: Pass large objects via
ray.put()and arguments
- Network Latency: Minimize data transfer between nodes
- Resource Availability: Check cluster resources before requesting
- Fault Tolerance: Handle node failures gracefully
- Data Locality: Consider data placement for performance
All Pattern Files:
- Core APIs -
ray.init(),ray.get(),ray.put(),ray.wait() - Actor APIs - Actor management
- Utility APIs - Utility functions
- Runtime Environment APIs - Runtime environment configuration
- Runtime Context APIs - Runtime context access
- Job Config APIs - Job configuration
- Logging APIs - Logging configuration
- Placement Group APIs - Resource placement
- Cross-Language APIs - Java/other languages
- Compiled Graph API - Ray DAG compilation
- Ray DAG - Dynamic task graphs
- API Reference - Complete API documentation
- Data Examples Overview
- Batch Inference - Object Detection
- PyTorch ResNet Batch Prediction
- HuggingFace ViT Batch Prediction
- Train Examples Overview
- PyTorch Examples
- Lightning Examples
- Transformers Examples
- DeepSpeed Examples
- Horovod Examples
- TensorFlow Examples
- Tune Examples Overview
- PBT Guide
- Integration Examples (W&B, MLflow, Comet, Aim)
- Framework Examples (PyTorch, Keras, XGBoost)
- Identify all parallelizable operations
- Determine stateful vs stateless components
- Map resource requirements (CPU/GPU/memory)
- Identify data dependencies and sharing needs
- Check cluster resource availability
- Verify cluster connection (RAY_ADDRESS set)
- Analyze desired output and confirm with developer
- Set up cluster connection via
RAY_ADDRESS - Choose appropriate Ray primitive (Task/Actor/Library)
- Apply resource specifications
- Use object refs for large shared data
- Configure runtime environment
- Add error handling and retries
- Create minimalist tests to verify script works and produces expected output
- Generate Dockerfile and docker-compose.yml in output/ (if script needs to run indefinitely)
- Run minimalist tests locally - all tests should pass
- Verify script produces expected output (from Step 1)
- Verify cluster connection works
- Check resource utilization
- Optimize memory usage (use
ray.put()for large objects) - Set concurrency limits if needed
- Add monitoring/logging
- Test fault tolerance
- Profile and optimize hot paths
- Monitor cluster resource usage
- Adjust resource requests based on actual usage
- Optimize data transfer (minimize object store transfers)
- Use placement groups for data locality
- Tune concurrency limits
- Profile task execution times
- Request appropriate resources (not too much, not too little)
- Use custom resources for specialized hardware
- Consider data locality when scheduling
- Monitor cluster-wide resource utilization
- Scale cluster if needed
| Variable | Description | Example |
|---|---|---|
RAY_ADDRESS |
Cluster address (required) | ray://head-node:10001 or http://head-node:8265 |
RAY_NAMESPACE |
Logical grouping namespace (required) | production or development |
| Variable | Description | Example |
|---|---|---|
RAY_RUNTIME_ENV |
JSON string for runtime environment | {"pip": ["numpy"], "env_vars": {"VAR": "value"}} |
RAY_JOB_CONFIG |
JSON string for job configuration | {"runtime_env": {"pip": ["requests"]}} |
| Variable | Description | Example |
|---|---|---|
RAY_gcs_rpc_server_reconnect_timeout_s |
GCS reconnection timeout (seconds) | 60 |
| Variable | Description | Example |
|---|---|---|
RAY_TASK_MAX_RETRIES |
Maximum task retries | 3 (default) or 0 to disable |
| Variable | Description | Example |
|---|---|---|
RAY_PICKLE_VERBOSE_DEBUG |
Enable pickle debugging | 0 (off), 1, or 2 |
Ray may use additional environment variables for configuration. Check Ray documentation for the complete list.
References:
Use this template as a starting point for converting scripts:
import os
import ray
# 1. Connect to cluster
cluster_address = os.getenv("RAY_ADDRESS", "auto")
namespace = os.getenv("RAY_NAMESPACE")
if not namespace:
raise ValueError("RAY_NAMESPACE environment variable is required")
ray.init(address=cluster_address, namespace=namespace)
# 2. Verify connection
if not ray.is_initialized():
raise RuntimeError("Failed to connect to Ray cluster")
# 3. Convert your functions to Ray tasks/actors
@ray.remote(num_cpus=1)
def your_function(arg):
# Your logic here
return result
# 4. Use Ray primitives
results = ray.get([your_function.remote(arg) for arg in args])
# 5. Cleanup
ray.shutdown()Remember: Always connect to a cluster via RAY_ADDRESS. Never assume local Ray unless explicitly required.