Distributed Training Ray Engine
Boost productivity using this distributed, training, orchestration, across. Includes structured workflows, validation checks, and reusable patterns for ai research.
Distributed Training with Ray Train
Overview
A comprehensive skill for distributed machine learning training using Ray Train — the scalable training library that orchestrates training across GPUs and nodes with minimal code changes. Ray Train provides a unified API for scaling PyTorch, TensorFlow, HuggingFace, and Lightning training from laptop to cluster, with built-in fault tolerance, checkpointing, and hyperparameter tuning integration.
When to Use
- Scaling PyTorch/TensorFlow training to multi-GPU or multi-node
- Need fault-tolerant training with automatic restarts
- Want unified API across ML frameworks
- Integrating training with Ray Tune for hyperparameter search
- Running training jobs on heterogeneous clusters
- Deploying training pipelines on Kubernetes
Quick Start
# Install pip install -U "ray[train]" # Start Ray cluster ray start --head --num-gpus=4 # Or use existing cluster export RAY_ADDRESS="ray://cluster-head:10001"
import ray from ray.train.torch import TorchTrainer from ray.train import ScalingConfig import torch def train_func(config): import ray.train.torch model = torch.nn.Linear(10, 1) model = ray.train.torch.prepare_model(model) # DDP wrapper dataset = torch.utils.data.TensorDataset( torch.randn(1000, 10), torch.randn(1000, 1) ) dataloader = torch.utils.data.DataLoader(dataset, batch_size=32) dataloader = ray.train.torch.prepare_data_loader(dataloader) for epoch in range(10): for batch in dataloader: x, y = batch loss = torch.nn.functional.mse_loss(model(x), y) loss.backward() ray.train.report({"loss": loss.item()}) trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True), ) result = trainer.fit() print(f"Final loss: {result.metrics['loss']}")
Core Concepts
Scaling Configuration
from ray.train import ScalingConfig # Multi-GPU single node scaling = ScalingConfig(num_workers=4, use_gpu=True) # Multi-node with specific resources scaling = ScalingConfig( num_workers=16, use_gpu=True, resources_per_worker={"GPU": 1, "CPU": 8}, trainer_resources={"CPU": 4}, # Resources for the driver )
Checkpointing & Fault Tolerance
from ray.train import Checkpoint import tempfile, os def train_func(config): model = build_model() optimizer = torch.optim.Adam(model.parameters()) # Resume from checkpoint if available checkpoint = ray.train.get_checkpoint() if checkpoint: with checkpoint.as_directory() as ckpt_dir: state = torch.load(os.path.join(ckpt_dir, "model.pt")) model.load_state_dict(state["model"]) optimizer.load_state_dict(state["optimizer"]) start_epoch = state["epoch"] + 1 else: start_epoch = 0 for epoch in range(start_epoch, 100): loss = train_epoch(model, optimizer) # Save checkpoint every 10 epochs if epoch % 10 == 0: with tempfile.TemporaryDirectory() as tmpdir: torch.save({ "model": model.state_dict(), "optimizer": optimizer.state_dict(), "epoch": epoch, }, os.path.join(tmpdir, "model.pt")) ray.train.report( {"loss": loss}, checkpoint=Checkpoint.from_directory(tmpdir), ) else: ray.train.report({"loss": loss})
HuggingFace Transformers Integration
from ray.train.huggingface.transformers import ( RayTrainReportCallback, prepare_trainer, ) from transformers import Trainer, TrainingArguments def train_func(config): training_args = TrainingArguments( output_dir="./results", per_device_train_batch_size=config.get("batch_size", 16), num_train_epochs=config.get("epochs", 3), learning_rate=config.get("lr", 5e-5), bf16=True, ) trainer = Trainer( model=model, args=training_args, train_dataset=train_ds, callbacks=[RayTrainReportCallback()], ) trainer = prepare_trainer(trainer) trainer.train() ray_trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True), run_config=ray.train.RunConfig( storage_path="s3://my-bucket/results", checkpoint_config=ray.train.CheckpointConfig(num_to_keep=3), ), )
Integration with Ray Tune
from ray import tune from ray.tune import TuneConfig trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=True), ) tuner = tune.Tuner( trainer, param_space={ "train_loop_config": { "lr": tune.loguniform(1e-5, 1e-3), "batch_size": tune.choice([16, 32, 64]), "epochs": 10, } }, tune_config=TuneConfig( metric="loss", mode="min", num_samples=20, scheduler=tune.schedulers.ASHAScheduler(), ), ) result_grid = tuner.fit() best = result_grid.get_best_result()
Configuration Reference
| Parameter | Default | Description |
|---|---|---|
num_workers | 1 | Number of training workers |
use_gpu | False | Whether workers need GPU access |
resources_per_worker | Auto | CPU/GPU/memory per worker |
trainer_resources | {"CPU": 1} | Resources for the driver process |
placement_strategy | PACK | PACK or SPREAD across nodes |
checkpoint_config.num_to_keep | None | Max checkpoints to retain |
failure_config.max_failures | 0 | Auto-restart attempts on failure |
storage_path | Local | S3/GCS/NFS path for results |
Best Practices
- Use
prepare_modelandprepare_data_loader— These handle DDP wrapping and distributed sampling - Report metrics every epoch — Enables monitoring and early stopping integrations
- Set
num_to_keepfor checkpoints — Prevents disk space exhaustion - Use cloud storage for results —
storage_path="s3://..."enables multi-node access - Set
max_failuresfor long jobs — Automatic restart on transient failures - Profile with Ray Dashboard — Monitor GPU utilization and worker status at
localhost:8265 - Use SPREAD placement for heterogeneous clusters — Distributes workers across nodes evenly
- Pin CPU workers for data loading — Set appropriate
resources_per_workerCPU count - Test training function locally first — Run
train_func(config)before wrapping with Ray - Use Ray Data for large datasets — Streaming ingestion prevents OOM on large datasets
Troubleshooting
Workers failing silently
# Enable verbose logging import logging logging.basicConfig(level=logging.DEBUG) # Check worker logs in Ray Dashboard at http://localhost:8265
GPU not detected by workers
# Verify Ray sees GPUs ray status # Should show: GPU: X.0/Y.0 (X used, Y total) # Check CUDA visibility python -c "import torch; print(torch.cuda.device_count())"
OOM during training
# Reduce per-worker batch size # Effective batch = per_worker_batch × num_workers scaling = ScalingConfig(num_workers=8, use_gpu=True) # config["batch_size"] = 4 # smaller per-worker batch
Reviews
No reviews yet. Be the first to review this template!
Similar Templates
Full-Stack Code Reviewer
Comprehensive code review skill that checks for security vulnerabilities, performance issues, accessibility, and best practices across frontend and backend code.
Test Suite Generator
Generates comprehensive test suites with unit tests, integration tests, and edge cases. Supports Jest, Vitest, Pytest, and Go testing.
Pro Architecture Workspace
Battle-tested skill for architectural, decision, making, framework. Includes structured workflows, validation checks, and reusable patterns for development.