D

Data Pipeline Command

Design and generate ETL/ELT data pipeline architectures with source connectors, transformation logic, scheduling, error handling, and monitoring. Produces production-ready pipeline code for tools like Airflow, dbt, Prefect, or custom Node/Python scripts.

CommandCommunitybackendv1.0.0MIT
0 views0 copies

Command

/data-pipeline

Description

Designs a complete data pipeline from source to destination, generating configuration files, transformation logic, scheduling setup, and monitoring dashboards. Supports batch ETL, streaming, and hybrid architectures.

Behavior

  1. Gather requirements: source, destination, frequency, transformations
  2. Design pipeline architecture with data flow diagram
  3. Generate pipeline code for your chosen framework
  4. Add error handling, retries, and dead letter queues
  5. Include monitoring, alerting, and data quality checks

Pipeline Architecture Template

[Source] --> [Extract] --> [Transform] --> [Load] --> [Destination]
                |              |             |
                v              v             v
           [Error Queue]  [Quality]    [Monitoring]
                           Checks       Dashboard

Supported Frameworks

FrameworkBest ForGenerated Files
AirflowComplex DAGs, schedulingdags/*.py
dbtSQL transformationsmodels/*.sql, schema.yml
PrefectPython-native pipelinesflows/*.py
CustomSimple pipelinesscripts/pipeline.ts or .py

Output Format

Example: Airflow DAG

from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime, timedelta default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email': ['[email protected]'], 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, } with DAG( 'user_activity_etl', default_args=default_args, description='Extract user activity, transform, load to warehouse', schedule_interval='0 2 * * *', # Daily at 2 AM UTC start_date=datetime(2026, 1, 1), catchup=False, tags=['etl', 'user-activity'], ) as dag: def extract(**context): """Extract user activity from source database.""" hook = PostgresHook(postgres_conn_id='source_db') execution_date = context['ds'] records = hook.get_records( "SELECT * FROM user_events WHERE date = %s", parameters=[execution_date] ) return records def transform(records, **context): """Apply business logic transformations.""" # Deduplicate, validate, enrich pass def load(transformed, **context): """Load into data warehouse.""" pass def quality_check(**context): """Verify data quality post-load.""" # Row counts, null checks, schema validation pass t_extract = PythonOperator(task_id='extract', python_callable=extract) t_transform = PythonOperator(task_id='transform', python_callable=transform) t_load = PythonOperator(task_id='load', python_callable=load) t_quality = PythonOperator(task_id='quality_check', python_callable=quality_check) t_extract >> t_transform >> t_load >> t_quality

Data Quality Checks

quality_checks: - name: row_count type: minimum threshold: 1000 on_failure: alert - name: null_check columns: [user_id, event_type, timestamp] max_null_percent: 0.1 on_failure: fail - name: freshness max_delay: 4h on_failure: alert

Examples

# Design a pipeline interactively /data-pipeline # Specify source and destination /data-pipeline --source postgres --dest bigquery --framework airflow # Generate dbt transformation models /data-pipeline --framework dbt --models users,orders,products
Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates