Data Pipeline Command
Design and generate ETL/ELT data pipeline architectures with source connectors, transformation logic, scheduling, error handling, and monitoring. Produces production-ready pipeline code for tools like Airflow, dbt, Prefect, or custom Node/Python scripts.
Command
/data-pipeline
Description
Designs a complete data pipeline from source to destination, generating configuration files, transformation logic, scheduling setup, and monitoring dashboards. Supports batch ETL, streaming, and hybrid architectures.
Behavior
- Gather requirements: source, destination, frequency, transformations
- Design pipeline architecture with data flow diagram
- Generate pipeline code for your chosen framework
- Add error handling, retries, and dead letter queues
- Include monitoring, alerting, and data quality checks
Pipeline Architecture Template
[Source] --> [Extract] --> [Transform] --> [Load] --> [Destination]
| | |
v v v
[Error Queue] [Quality] [Monitoring]
Checks Dashboard
Supported Frameworks
| Framework | Best For | Generated Files |
|---|---|---|
| Airflow | Complex DAGs, scheduling | dags/*.py |
| dbt | SQL transformations | models/*.sql, schema.yml |
| Prefect | Python-native pipelines | flows/*.py |
| Custom | Simple pipelines | scripts/pipeline.ts or .py |
Output Format
Example: Airflow DAG
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime, timedelta default_args = { 'owner': 'data-team', 'depends_on_past': False, 'email_on_failure': True, 'email': ['[email protected]'], 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, } with DAG( 'user_activity_etl', default_args=default_args, description='Extract user activity, transform, load to warehouse', schedule_interval='0 2 * * *', # Daily at 2 AM UTC start_date=datetime(2026, 1, 1), catchup=False, tags=['etl', 'user-activity'], ) as dag: def extract(**context): """Extract user activity from source database.""" hook = PostgresHook(postgres_conn_id='source_db') execution_date = context['ds'] records = hook.get_records( "SELECT * FROM user_events WHERE date = %s", parameters=[execution_date] ) return records def transform(records, **context): """Apply business logic transformations.""" # Deduplicate, validate, enrich pass def load(transformed, **context): """Load into data warehouse.""" pass def quality_check(**context): """Verify data quality post-load.""" # Row counts, null checks, schema validation pass t_extract = PythonOperator(task_id='extract', python_callable=extract) t_transform = PythonOperator(task_id='transform', python_callable=transform) t_load = PythonOperator(task_id='load', python_callable=load) t_quality = PythonOperator(task_id='quality_check', python_callable=quality_check) t_extract >> t_transform >> t_load >> t_quality
Data Quality Checks
quality_checks: - name: row_count type: minimum threshold: 1000 on_failure: alert - name: null_check columns: [user_id, event_type, timestamp] max_null_percent: 0.1 on_failure: fail - name: freshness max_delay: 4h on_failure: alert
Examples
# Design a pipeline interactively /data-pipeline # Specify source and destination /data-pipeline --source postgres --dest bigquery --framework airflow # Generate dbt transformation models /data-pipeline --framework dbt --models users,orders,products
Reviews
No reviews yet. Be the first to review this template!
Similar Templates
Git Commit Message Generator
Generates well-structured conventional commit messages by analyzing staged changes. Follows Conventional Commits spec with scope detection.
React Component Scaffolder
Scaffolds a complete React component with TypeScript types, Tailwind styles, Storybook stories, and unit tests. Follows project conventions automatically.
CI/CD Pipeline Generator
Generates GitHub Actions workflows for CI/CD including linting, testing, building, and deploying. Detects project stack automatically.