D

Distributed Training Ray Engine

Boost productivity using this distributed, training, orchestration, across. Includes structured workflows, validation checks, and reusable patterns for ai research.

SkillClipticsai researchv1.0.0MIT
0 views0 copies

Distributed Training with Ray Train

Overview

A comprehensive skill for distributed machine learning training using Ray Train — the scalable training library that orchestrates training across GPUs and nodes with minimal code changes. Ray Train provides a unified API for scaling PyTorch, TensorFlow, HuggingFace, and Lightning training from laptop to cluster, with built-in fault tolerance, checkpointing, and hyperparameter tuning integration.

When to Use

  • Scaling PyTorch/TensorFlow training to multi-GPU or multi-node
  • Need fault-tolerant training with automatic restarts
  • Want unified API across ML frameworks
  • Integrating training with Ray Tune for hyperparameter search
  • Running training jobs on heterogeneous clusters
  • Deploying training pipelines on Kubernetes

Quick Start

# Install pip install -U "ray[train]" # Start Ray cluster ray start --head --num-gpus=4 # Or use existing cluster export RAY_ADDRESS="ray://cluster-head:10001"
import ray from ray.train.torch import TorchTrainer from ray.train import ScalingConfig import torch def train_func(config): import ray.train.torch model = torch.nn.Linear(10, 1) model = ray.train.torch.prepare_model(model) # DDP wrapper dataset = torch.utils.data.TensorDataset( torch.randn(1000, 10), torch.randn(1000, 1) ) dataloader = torch.utils.data.DataLoader(dataset, batch_size=32) dataloader = ray.train.torch.prepare_data_loader(dataloader) for epoch in range(10): for batch in dataloader: x, y = batch loss = torch.nn.functional.mse_loss(model(x), y) loss.backward() ray.train.report({"loss": loss.item()}) trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True), ) result = trainer.fit() print(f"Final loss: {result.metrics['loss']}")

Core Concepts

Scaling Configuration

from ray.train import ScalingConfig # Multi-GPU single node scaling = ScalingConfig(num_workers=4, use_gpu=True) # Multi-node with specific resources scaling = ScalingConfig( num_workers=16, use_gpu=True, resources_per_worker={"GPU": 1, "CPU": 8}, trainer_resources={"CPU": 4}, # Resources for the driver )

Checkpointing & Fault Tolerance

from ray.train import Checkpoint import tempfile, os def train_func(config): model = build_model() optimizer = torch.optim.Adam(model.parameters()) # Resume from checkpoint if available checkpoint = ray.train.get_checkpoint() if checkpoint: with checkpoint.as_directory() as ckpt_dir: state = torch.load(os.path.join(ckpt_dir, "model.pt")) model.load_state_dict(state["model"]) optimizer.load_state_dict(state["optimizer"]) start_epoch = state["epoch"] + 1 else: start_epoch = 0 for epoch in range(start_epoch, 100): loss = train_epoch(model, optimizer) # Save checkpoint every 10 epochs if epoch % 10 == 0: with tempfile.TemporaryDirectory() as tmpdir: torch.save({ "model": model.state_dict(), "optimizer": optimizer.state_dict(), "epoch": epoch, }, os.path.join(tmpdir, "model.pt")) ray.train.report( {"loss": loss}, checkpoint=Checkpoint.from_directory(tmpdir), ) else: ray.train.report({"loss": loss})

HuggingFace Transformers Integration

from ray.train.huggingface.transformers import ( RayTrainReportCallback, prepare_trainer, ) from transformers import Trainer, TrainingArguments def train_func(config): training_args = TrainingArguments( output_dir="./results", per_device_train_batch_size=config.get("batch_size", 16), num_train_epochs=config.get("epochs", 3), learning_rate=config.get("lr", 5e-5), bf16=True, ) trainer = Trainer( model=model, args=training_args, train_dataset=train_ds, callbacks=[RayTrainReportCallback()], ) trainer = prepare_trainer(trainer) trainer.train() ray_trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=4, use_gpu=True), run_config=ray.train.RunConfig( storage_path="s3://my-bucket/results", checkpoint_config=ray.train.CheckpointConfig(num_to_keep=3), ), )

Integration with Ray Tune

from ray import tune from ray.tune import TuneConfig trainer = TorchTrainer( train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=True), ) tuner = tune.Tuner( trainer, param_space={ "train_loop_config": { "lr": tune.loguniform(1e-5, 1e-3), "batch_size": tune.choice([16, 32, 64]), "epochs": 10, } }, tune_config=TuneConfig( metric="loss", mode="min", num_samples=20, scheduler=tune.schedulers.ASHAScheduler(), ), ) result_grid = tuner.fit() best = result_grid.get_best_result()

Configuration Reference

ParameterDefaultDescription
num_workers1Number of training workers
use_gpuFalseWhether workers need GPU access
resources_per_workerAutoCPU/GPU/memory per worker
trainer_resources{"CPU": 1}Resources for the driver process
placement_strategyPACKPACK or SPREAD across nodes
checkpoint_config.num_to_keepNoneMax checkpoints to retain
failure_config.max_failures0Auto-restart attempts on failure
storage_pathLocalS3/GCS/NFS path for results

Best Practices

  1. Use prepare_model and prepare_data_loader — These handle DDP wrapping and distributed sampling
  2. Report metrics every epoch — Enables monitoring and early stopping integrations
  3. Set num_to_keep for checkpoints — Prevents disk space exhaustion
  4. Use cloud storage for resultsstorage_path="s3://..." enables multi-node access
  5. Set max_failures for long jobs — Automatic restart on transient failures
  6. Profile with Ray Dashboard — Monitor GPU utilization and worker status at localhost:8265
  7. Use SPREAD placement for heterogeneous clusters — Distributes workers across nodes evenly
  8. Pin CPU workers for data loading — Set appropriate resources_per_worker CPU count
  9. Test training function locally first — Run train_func(config) before wrapping with Ray
  10. Use Ray Data for large datasets — Streaming ingestion prevents OOM on large datasets

Troubleshooting

Workers failing silently

# Enable verbose logging import logging logging.basicConfig(level=logging.DEBUG) # Check worker logs in Ray Dashboard at http://localhost:8265

GPU not detected by workers

# Verify Ray sees GPUs ray status # Should show: GPU: X.0/Y.0 (X used, Y total) # Check CUDA visibility python -c "import torch; print(torch.cuda.device_count())"

OOM during training

# Reduce per-worker batch size # Effective batch = per_worker_batch × num_workers scaling = ScalingConfig(num_workers=8, use_gpu=True) # config["batch_size"] = 4 # smaller per-worker batch
Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates