S

Senior Ml Engineer Engine

Comprehensive skill designed for world, class, engineering, skill. Includes structured workflows, validation checks, and reusable patterns for development.

SkillClipticsdevelopmentv1.0.0MIT
0 views0 copies

Senior ML Engineer Engine

A production-grade skill for senior ML engineers covering model training infrastructure, experiment management, model serving, MLOps pipelines, and responsible AI practices at scale.

When to Use This Skill

Choose this skill when:

  • Building end-to-end ML training and serving infrastructure
  • Setting up experiment tracking, model versioning, and artifact management
  • Deploying models with A/B testing, shadow mode, and canary rollouts
  • Implementing feature stores and feature engineering pipelines
  • Establishing ML monitoring for data drift, model degradation, and fairness

Consider alternatives when:

  • Working on computer vision specifically → use a CV engineering skill
  • Building NLP/LLM applications → use an NLP or LLM skill
  • Doing data analysis without ML → use a data science skill
  • Building data pipelines without ML → use a data engineering skill

Quick Start

# End-to-end ML pipeline with MLflow import mlflow from mlflow.tracking import MlflowClient from sklearn.model_selection import train_test_split import joblib class MLPipeline: def __init__(self, experiment_name: str): mlflow.set_experiment(experiment_name) self.client = MlflowClient() def train_and_register(self, X, y, model, model_name: str): X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) with mlflow.start_run() as run: # Train model.fit(X_train, y_train) # Evaluate metrics = self.evaluate(model, X_test, y_test) mlflow.log_metrics(metrics) mlflow.log_params(model.get_params()) # Log model mlflow.sklearn.log_model(model, "model") # Register if performance threshold met if metrics['test_auc'] > 0.85: model_uri = f"runs:/{run.info.run_id}/model" mlflow.register_model(model_uri, model_name) return metrics

Core Concepts

MLOps Maturity Levels

LevelDescriptionCapabilities
0 - ManualJupyter notebooks, manual deploysAd-hoc experiments, manual model updates
1 - PipelineAutomated training pipelineReproducible training, version control
2 - CI/CDAutomated testing and deploymentModel validation, staging, automated rollout
3 - Full MLOpsAutomated retraining and monitoringDrift detection, auto-retraining, A/B testing

Model Serving Architecture

# FastAPI model server with versioning and health checks from fastapi import FastAPI, HTTPException from pydantic import BaseModel import numpy as np import joblib app = FastAPI() class ModelServer: def __init__(self): self.models: dict[str, any] = {} self.active_version: str = "" def load_model(self, version: str, path: str): self.models[version] = joblib.load(path) self.active_version = version def predict(self, features: np.ndarray, version: str = None) -> np.ndarray: v = version or self.active_version if v not in self.models: raise ValueError(f"Model version {v} not loaded") return self.models[v].predict(features) server = ModelServer() class PredictionRequest(BaseModel): features: list[float] model_version: str | None = None class PredictionResponse(BaseModel): prediction: float model_version: str confidence: float | None = None @app.post("/predict", response_model=PredictionResponse) async def predict(request: PredictionRequest): features = np.array(request.features).reshape(1, -1) version = request.model_version or server.active_version prediction = server.predict(features, version) return PredictionResponse( prediction=float(prediction[0]), model_version=version, ) @app.get("/health") async def health(): return { "status": "healthy", "active_model": server.active_version, "loaded_models": list(server.models.keys()), }

Model Monitoring and Drift Detection

from scipy import stats import numpy as np class ModelMonitor: def __init__(self, reference_data: np.ndarray, reference_predictions: np.ndarray): self.ref_data = reference_data self.ref_predictions = reference_predictions def detect_data_drift(self, current_data: np.ndarray, threshold: float = 0.05) -> dict: """Kolmogorov-Smirnov test for each feature.""" drift_results = {} for i in range(current_data.shape[1]): stat, p_value = stats.ks_2samp(self.ref_data[:, i], current_data[:, i]) drift_results[f'feature_{i}'] = { 'ks_statistic': stat, 'p_value': p_value, 'drift_detected': p_value < threshold, } return drift_results def detect_prediction_drift(self, current_predictions: np.ndarray) -> dict: """Population Stability Index for prediction distribution.""" psi = self._calculate_psi(self.ref_predictions, current_predictions) return { 'psi': psi, 'drift_level': 'none' if psi < 0.1 else 'moderate' if psi < 0.2 else 'significant', } def _calculate_psi(self, expected: np.ndarray, actual: np.ndarray, bins: int = 10) -> float: breakpoints = np.linspace(min(expected.min(), actual.min()), max(expected.max(), actual.max()), bins + 1) expected_pct = np.histogram(expected, breakpoints)[0] / len(expected) + 1e-6 actual_pct = np.histogram(actual, breakpoints)[0] / len(actual) + 1e-6 return np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))

Configuration

ParameterTypeDefaultDescription
experimentTrackerstring'mlflow'Experiment tracking: mlflow, wandb, or neptune
modelRegistrystring'mlflow'Model registry: mlflow, sagemaker, or vertex
servingFrameworkstring'fastapi'Model serving: fastapi, triton, or seldon
driftThresholdnumber0.05P-value threshold for drift detection
retrainingTriggerstring'drift'Retrain trigger: drift, schedule, or performance
featureStorestring'feast'Feature store: feast, tecton, or custom

Best Practices

  1. Version everything: data, code, models, and configs — A model artifact is meaningless without the exact data, preprocessing code, and hyperparameters that produced it. Use DVC for data, Git for code, and MLflow for models. Every prediction should be traceable to its origin.

  2. Deploy models behind a serving layer with versioning — Never embed model loading directly in application code. Use a model server that supports version switching, A/B testing, and shadow deployments. This decouples model updates from application deployments.

  3. Monitor input distributions, not just model accuracy — Ground truth labels arrive late or never. Monitor feature distributions (KS test, PSI) to detect drift early. Set up alerts for distribution shifts that exceed thresholds, triggering retraining before accuracy degrades.

  4. Use feature stores to prevent training-serving skew — Feature engineering code duplicated between training notebooks and serving pipelines inevitably diverges. A feature store computes features once and serves them consistently to both training and inference.

  5. Test models like software: unit tests for preprocessing, integration tests for pipelines — Test that preprocessing handles edge cases (nulls, outliers, new categories). Test that the full pipeline produces outputs with expected shapes and ranges. Test that model performance meets minimum thresholds on a golden dataset.

Common Issues

Training-serving skew causes production accuracy drops — Features computed differently in training (batch, full history) versus serving (real-time, partial data) produce different predictions. Use a feature store and validate that serving features match training features on a sample of production requests.

Model retraining pipeline breaks silently — A scheduled retraining job fails but the old model keeps serving, gradually degrading. Monitor pipeline health separately from model health. Alert on pipeline failures, not just model metrics.

GPU memory errors during training on large models — Reduce batch size, enable gradient accumulation, use mixed precision training (torch.cuda.amp), or apply gradient checkpointing. For very large models, use model parallelism or DeepSpeed ZeRO optimization.

Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates