D

Data Processing Ray Studio

Battle-tested skill for scalable, data, processing, workloads. Includes structured workflows, validation checks, and reusable patterns for ai research.

SkillClipticsai researchv1.0.0MIT
0 views0 copies

Data Processing with Ray Studio

Overview

Ray Data is the data processing library within the Ray ecosystem, designed specifically for ML and AI workloads. It provides a distributed dataset abstraction that scales from a laptop to a cluster without code changes. Unlike traditional data processing frameworks like Spark that were designed for ETL and SQL workloads, Ray Data is built from the ground up for the specific needs of machine learning: streaming execution for datasets larger than memory, native GPU support for accelerated preprocessing, and tight integration with training frameworks like PyTorch, TensorFlow, and HuggingFace.

The key architectural insight of Ray Data is streaming execution. Traditional frameworks materialize entire datasets in memory between operations, which limits them to datasets that fit in aggregate cluster memory. Ray Data processes data in a streaming fashion -- blocks of data flow through the pipeline without ever materializing the entire dataset at once. This means you can process a 10TB dataset on a cluster with 100GB of aggregate RAM, as long as each individual batch fits in memory.

Ray Data is used in production by companies like Pinterest (last-mile data processing for model training), ByteDance (scaling offline inference with multi-modal LLMs), and Spotify (ML platform for batch inference). It is part of the Ray 2.40+ ecosystem and integrates naturally with Ray Train, Ray Serve, and Ray Tune.

When to Use

  • Processing datasets larger than 100GB for ML training or inference
  • Building distributed data preprocessing pipelines across a cluster
  • Running batch inference over large datasets with GPU-accelerated models
  • Loading and transforming multi-modal data (images, audio, video, text)
  • Scaling data processing from a single laptop to a multi-node cluster without rewriting code
  • Integrating data loading with distributed training (Ray Train, PyTorch DDP)
  • Building streaming data pipelines that process data larger than available memory
  • Replacing Spark or Dask for ML-specific data processing workloads

Quick Start

# Install Ray with data processing support pip install -U "ray[data]" # Verify installation python -c "import ray; print(ray.__version__)"
import ray # Initialize Ray (auto-detects local resources) ray.init() # Load data from various sources ds = ray.data.read_parquet("s3://my-bucket/data/*.parquet") # Transform with vectorized batch operations def normalize(batch): batch["text"] = batch["text"].str.lower().str.strip() batch["word_count"] = batch["text"].str.split().str.len() return batch ds = ds.map_batches(normalize, batch_size=1000) # Filter ds = ds.filter(lambda row: row["word_count"] > 10) # Write results ds.write_parquet("s3://my-bucket/processed/") # Check stats print(ds.count()) print(ds.schema())

Core Concepts

Dataset Fundamentals

A Ray Dataset is a distributed collection of data blocks. Operations on datasets are lazy -- they build up a computation graph that is only executed when you consume the data.

import ray # ── Creating Datasets ───────────────────────────────────────── ds = ray.data.read_parquet("s3://bucket/data/*.parquet") # Parquet (recommended) ds = ray.data.read_csv("s3://bucket/data/*.csv") # CSV ds = ray.data.read_json("gs://bucket/data/*.json") # JSON ds = ray.data.read_images("s3://bucket/images/") # Images ds = ray.data.from_items([{"id": i} for i in range(1000)]) # Python objects ds = ray.data.range(1000000) # Synthetic

Transformations

Ray Data supports both vectorized batch transformations (fast) and row-by-row transformations (flexible).

import ray import numpy as np ds = ray.data.read_parquet("s3://bucket/training_data/*.parquet") # ── Batch Transformations (preferred for performance) ───────── def preprocess_batch(batch): """Process a batch of rows using vectorized operations.""" batch["text_clean"] = batch["text"].str.lower().str.strip() batch["text_length"] = batch["text"].str.len() batch["label_encoded"] = batch["label"].map({"pos": 1, "neg": 0, "neutral": 2}) return batch ds = ds.map_batches(preprocess_batch, batch_size=5000) # ── Row Transformations ─────────────────────────────────────── def process_row(row): """Process a single row (slower but simpler for complex logic).""" row["tokens"] = row["text"].split() row["num_tokens"] = len(row["tokens"]) return row ds = ds.map(process_row) # ── Filtering ───────────────────────────────────────────────── # Keep only rows matching condition ds = ds.filter(lambda row: row["num_tokens"] > 20) ds = ds.filter(lambda row: row["label"] != "spam") # ── Grouping and Aggregation ────────────────────────────────── # Count by category grouped = ds.groupby("label").count() # Custom aggregation def aggregate_group(group): return { "label": group["label"].iloc[0], "count": len(group), "avg_length": group["text_length"].mean(), } stats = ds.groupby("label").map_groups(aggregate_group)

GPU-Accelerated Processing

One of Ray Data's killer features is native GPU support for preprocessing and inference.

import ray import torch from typing import Dict import numpy as np # ── GPU Batch Inference ─────────────────────────────────────── class GPUImagePreprocessor: """Stateful transform that loads a model once per worker.""" def __init__(self): import torchvision.transforms as T self.transform = T.Compose([ T.Resize((224, 224)), T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) self.device = torch.device("cuda") def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: images = torch.tensor(batch["image"], dtype=torch.float32).to(self.device) images = images.permute(0, 3, 1, 2) / 255.0 # NHWC -> NCHW processed = self.transform(images) return {"processed_image": processed.cpu().numpy()} ds = ray.data.read_images("s3://bucket/images/") processed = ds.map_batches( GPUImagePreprocessor, batch_size=64, num_gpus=1, # Request 1 GPU per worker concurrency=4, # 4 parallel GPU workers ) # ── GPU Model Inference ─────────────────────────────────────── class ModelInference: """Load model once, run inference on batches.""" def __init__(self): from transformers import AutoModelForSequenceClassification, AutoTokenizer self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") self.model = AutoModelForSequenceClassification.from_pretrained( "bert-base-uncased" ).cuda().eval() def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: texts = list(batch["text"]) inputs = self.tokenizer( texts, padding=True, truncation=True, max_length=512, return_tensors="pt" ).to("cuda") with torch.no_grad(): outputs = self.model(**inputs) predictions = torch.argmax(outputs.logits, dim=-1) return { "text": batch["text"], "prediction": predictions.cpu().numpy(), } predictions = ds.map_batches( ModelInference, batch_size=32, num_gpus=1, concurrency=2, ) predictions.write_parquet("s3://bucket/predictions/")

Integration with Training Frameworks

import ray from ray.train import ScalingConfig from ray.train.torch import TorchTrainer train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet") def train_func(config): import torch, torch.nn as nn from ray.train import get_dataset_shard train_shard = get_dataset_shard("train") model = nn.Linear(768, 10).cuda() optimizer = torch.optim.Adam(model.parameters()) for epoch in range(10): for batch in train_shard.iter_torch_batches(batch_size=64): outputs = model(batch["features"].cuda()) loss = nn.CrossEntropyLoss()(outputs, batch["labels"].cuda().long()) loss.backward(); optimizer.step(); optimizer.zero_grad() trainer = TorchTrainer( train_func, datasets={"train": train_ds}, scaling_config=ScalingConfig(num_workers=4, use_gpu=True), ) result = trainer.fit() # Direct conversion to PyTorch/TensorFlow iterables torch_ds = ds.to_torch(label_column="label", batch_size=32) tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)

Streaming and Performance Optimization

import ray # Streaming: process data larger than memory ds = ray.data.read_parquet("s3://huge-dataset/") for batch in ds.iter_batches(batch_size=1000): process(batch) # Each batch fits in memory # Repartition to match cluster parallelism ds = ds.repartition(100) # 100 blocks for ~100 cores # Batch size tuning: large for vectorized ops, small for GPU inference ds.map_batches(transform_fn, batch_size=10000) ds.map_batches(model_inference, batch_size=32) # Write outputs (Parquet recommended) ds.write_parquet("s3://output/", num_rows_per_file=100000)

Configuration Reference

ParameterDefaultDescription
batch_size4096Default batch size for map_batches operations
num_gpus0Number of GPUs requested per worker
num_cpus1Number of CPUs requested per worker
concurrencyautoNumber of parallel workers for map_batches
num_rows_per_fileNoneTarget rows per output file
ray.init(num_cpus=N)autoTotal CPUs available to Ray
ray.init(num_gpus=N)autoTotal GPUs available to Ray
repartition(N)autoNumber of data blocks (controls parallelism)
prefetch_batches1Number of batches to prefetch during iteration
batch_format"pandas"Batch format: "pandas", "numpy", "pyarrow"

Performance Benchmarks

Horizontal scaling (100GB text processing):

  • 1 node (16 cores): ~30 minutes
  • 4 nodes (64 cores): ~8 minutes
  • 16 nodes (256 cores): ~2 minutes

GPU-accelerated image preprocessing:

  • CPU only: 1,000 images/sec
  • 1x A100 GPU: 5,000 images/sec
  • 4x A100 GPUs: 18,000 images/sec

Supported data formats:

FormatReadWriteBest For
ParquetYesYesML training data (recommended)
CSVYesYesTabular data interchange
JSONYesYesSemi-structured data
ImagesYesNoComputer vision datasets
NumPyYesYesArray data
PandasYesNoDataFrame conversion
Delta LakeYesYesVersioned datasets
BigQueryYesNoCloud data warehouse

Best Practices

  1. Use Parquet as your primary data format for ML workloads. Parquet is columnar, compressed, and supports predicate pushdown. It is 5-10x faster to read than CSV or JSON for large datasets. Convert your data to Parquet early in the pipeline.

  2. Prefer map_batches over map for performance. Row-by-row map operations have high per-row overhead. Batch operations using map_batches with vectorized Pandas or NumPy operations are typically 10-100x faster.

  3. Use stateful transforms (class-based) for GPU operations. When your transform needs to load a model, use a class with __init__ and __call__. The __init__ runs once per worker, loading the model a single time, while __call__ processes each batch. This avoids loading the model for every batch.

  4. Match repartition() to your cluster parallelism. If you have 64 cores, repartition to ~64-128 blocks. Too few blocks underutilizes the cluster. Too many blocks adds scheduling overhead.

  5. Tune batch sizes based on the operation. Use large batches (5000-10000) for simple Pandas operations. Use small batches (16-64) for GPU inference where each item is large. The optimal batch size balances memory usage against per-batch overhead.

  6. Use streaming execution for datasets that exceed cluster memory. Call iter_batches() to process data in a streaming fashion. This avoids materializing the entire dataset in memory and lets you process arbitrarily large datasets.

  7. Profile before optimizing. Use ds.stats() and ray.timeline() to understand where time is spent. Common bottlenecks include I/O (reading from cloud storage), serialization (converting between formats), and skew (uneven block sizes).

  8. Use concurrency parameter to control GPU utilization. Set concurrency to match your available GPUs. For example, if you have 4 GPUs and each transform uses 1 GPU, set concurrency=4 to keep all GPUs busy.

  9. Avoid excessive repartition() calls in the middle of a pipeline. Each repartition triggers a shuffle which is expensive. Structure your pipeline to minimize the number of repartitions needed.

  10. Test locally before deploying to a cluster. Ray Data code runs identically on a laptop and a cluster. Develop and test on a small sample locally, then scale up by simply changing ray.init() to point at your cluster.

Troubleshooting

Problem: OutOfMemoryError during data processing. Your batches are too large for available memory. Reduce batch_size in map_batches(), increase repartition() to create more smaller blocks, or use iter_batches() for streaming execution. If using GPU transforms, reduce the batch size further since GPU memory is typically more limited.

Problem: Processing is slow despite having many cores. Check the number of data blocks with ds.num_blocks(). If you have 64 cores but only 4 blocks, most cores are idle. Use ds.repartition(128) to create more blocks. Also check if your data source is the bottleneck -- reading from a single large file cannot be parallelized.

Problem: GPU utilization is low during model inference. You likely have too few concurrent workers or the CPU preprocessing before GPU inference is the bottleneck. Increase concurrency in map_batches() and consider splitting CPU preprocessing and GPU inference into separate map_batches calls so they can pipeline.

Problem: Data skew causes some workers to take much longer than others. Repartition with ds.repartition(N) to redistribute data evenly across blocks. If the skew comes from the source data (e.g., some Parquet files are much larger than others), increase the number of blocks to ensure even distribution.

Problem: Cloud storage reads are slow. Parquet files stored in S3/GCS have high per-request latency. Use larger Parquet files (100-500MB each) to reduce the number of requests. Enable Ray's object store spilling to SSD to avoid re-reading from cloud storage on retries.

Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates