M

Master Senior Data Engineer

Streamline your workflow with this world, class, data, engineering. Includes structured workflows, validation checks, and reusable patterns for development.

SkillClipticsdevelopmentv1.0.0MIT
0 views0 copies

Master Senior Data Engineer

A production-grade skill for senior data engineers covering pipeline orchestration, data warehousing, streaming architectures, data quality frameworks, and scalable ETL/ELT patterns with modern data stack tools.

When to Use This Skill

Choose this skill when:

  • Designing and building data pipelines with Airflow, Dagster, or Prefect
  • Implementing data warehouse schemas (star, snowflake, data vault)
  • Setting up streaming data processing with Kafka, Flink, or Spark Streaming
  • Building data quality checks and monitoring for production pipelines
  • Migrating from batch ETL to real-time ELT architectures

Consider alternatives when:

  • Building ML model training pipelines → use an ML engineering skill
  • Working on database administration → use a DBA skill
  • Analyzing data with SQL/Python → use a data analysis skill
  • Setting up BI dashboards → use a BI/analytics skill

Quick Start

# Dagster pipeline example — modern data orchestration from dagster import asset, define_asset_job, Definitions, MaterializeResult import pandas as pd @asset(group_name="ingestion") def raw_orders(context) -> pd.DataFrame: """Extract orders from source database.""" df = pd.read_sql("SELECT * FROM orders WHERE date > CURRENT_DATE - 1", source_conn) context.log.info(f"Extracted {len(df)} orders") return df @asset(group_name="transform", deps=[raw_orders]) def clean_orders(raw_orders: pd.DataFrame) -> pd.DataFrame: """Clean and validate order data.""" df = raw_orders.dropna(subset=['customer_id', 'amount']) df = df[df['amount'] > 0] df['order_date'] = pd.to_datetime(df['order_date']) return df @asset(group_name="warehouse", deps=[clean_orders]) def dim_customers(clean_orders: pd.DataFrame) -> MaterializeResult: """Build customer dimension table with SCD Type 2.""" # Merge with existing dimension, handle slowly changing attributes load_scd2(clean_orders, target_table='dim_customers', key='customer_id') return MaterializeResult(metadata={"row_count": len(clean_orders)})

Core Concepts

Data Architecture Patterns

PatternLatencyComplexityUse Case
Batch ETLHoursLowDaily reports, data warehouse loads
Micro-batchMinutesMediumNear-real-time dashboards
Stream ProcessingSecondsHighReal-time alerts, event processing
LambdaMixedVery HighBatch accuracy + stream speed
KappaSecondsHighStream-only, replay from log
ELTMinutesLow-MediumCloud warehouse transformations

Data Quality Framework

from great_expectations.core import ExpectationSuite, ExpectationConfiguration def build_order_expectations() -> ExpectationSuite: suite = ExpectationSuite("orders_quality") suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_not_be_null", kwargs={"column": "order_id"}, )) suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_be_between", kwargs={"column": "amount", "min_value": 0, "max_value": 1_000_000}, )) suite.add_expectation(ExpectationConfiguration( expectation_type="expect_column_values_to_be_unique", kwargs={"column": "order_id"}, )) suite.add_expectation(ExpectationConfiguration( expectation_type="expect_table_row_count_to_be_between", kwargs={"min_value": 1000, "max_value": 500_000}, )) return suite

Streaming Pipeline Architecture

# Kafka consumer with exactly-once processing from confluent_kafka import Consumer, KafkaError import json class StreamProcessor: def __init__(self, config: dict): self.consumer = Consumer({ 'bootstrap.servers': config['brokers'], 'group.id': config['consumer_group'], 'enable.auto.commit': False, 'auto.offset.reset': 'earliest', }) self.consumer.subscribe(config['topics']) def process(self): while True: msg = self.consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue raise Exception(msg.error()) event = json.loads(msg.value()) self.handle_event(event) self.consumer.commit(msg) # commit after successful processing def handle_event(self, event: dict): # Idempotent processing — check if event already processed if self.is_duplicate(event['event_id']): return self.transform_and_load(event) self.mark_processed(event['event_id'])

Configuration

ParameterTypeDefaultDescription
orchestratorstring'dagster'Pipeline orchestrator: dagster, airflow, or prefect
warehouseTypestring'snowflake'Target warehouse: snowflake, bigquery, or redshift
schemaPatternstring'star'Schema design: star, snowflake, or data vault
qualityFrameworkstring'great_expectations'Data quality tool
streamingBackendstring'kafka'Streaming platform: kafka, kinesis, or pulsar
partitionStrategystring'daily'Table partitioning: daily, hourly, or monthly

Best Practices

  1. Implement data contracts between producers and consumers — Define explicit schemas (Avro, Protobuf, JSON Schema) for data exchanged between teams. Schema registries enforce compatibility and prevent breaking changes from propagating through pipelines.

  2. Build idempotent pipelines that can safely re-run — Every pipeline stage should produce the same output regardless of how many times it runs. Use MERGE/UPSERT operations, deduplication keys, and tombstone markers rather than DELETE+INSERT patterns.

  3. Test pipelines with synthetic data before production deployment — Generate test datasets that cover edge cases: null values, extreme ranges, duplicate keys, late-arriving data. Run pipeline tests in CI with deterministic test data.

  4. Partition and cluster warehouse tables by query patterns — Partition by the most common filter column (usually date). Cluster by frequently joined or filtered columns. Proper partitioning can reduce query costs and latency by 10-100x.

  5. Monitor data freshness and completeness, not just pipeline success — A pipeline can succeed while producing incomplete data (source had fewer records than expected). Track row counts, null ratios, and data freshness SLAs alongside pipeline execution status.

Common Issues

Late-arriving data breaks aggregation accuracy — Events arrive after the processing window closes. Use watermarking in stream processing to define how late data is acceptable. For batch, maintain a reconciliation job that corrects aggregations with late data.

Pipeline succeeds but warehouse queries are slow — Data is loaded but not optimized for query patterns. Add appropriate clustering keys, materialized views for common joins, and ensure statistics are updated after large loads. Monitor query plans for full table scans.

Schema evolution breaks downstream consumers — Adding a required column or changing a type breaks existing consumers. Use a schema registry with backward compatibility checks. Add new fields as optional (nullable) and deprecate old fields with a migration timeline.

Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates