Ultimate Dispatching Framework
Comprehensive skill designed for facing, independent, tasks, worked. Includes structured workflows, validation checks, and reusable patterns for ai research.
Ultimate Dispatching Framework
Overview
A comprehensive skill for designing and implementing event dispatching, message routing, and task distribution systems in modern applications. Covers event-driven architectures, message queues, pub/sub patterns, command dispatching, and workflow orchestration — enabling scalable, loosely-coupled systems that handle complex routing logic with reliability.
When to Use
- Building event-driven microservices architectures
- Implementing CQRS (Command Query Responsibility Segregation)
- Designing pub/sub messaging systems
- Creating task queues with priority routing
- Building workflow orchestration engines
- Implementing the mediator pattern for decoupled components
- Routing requests across distributed services
Quick Start
# Event dispatcher with TypeScript npm init -y && npm install eventemitter3 bullmq ioredis # Python event-driven system pip install celery redis kombu
// Minimal event dispatcher import { EventEmitter } from 'eventemitter3'; class EventDispatcher { private emitter = new EventEmitter(); private middleware: ((event: string, data: any) => any)[] = []; use(fn: (event: string, data: any) => any) { this.middleware.push(fn); } on(event: string, handler: (...args: any[]) => void) { this.emitter.on(event, handler); } dispatch(event: string, data: any) { let processed = data; for (const mw of this.middleware) { processed = mw(event, processed) ?? processed; } this.emitter.emit(event, processed); } } const dispatcher = new EventDispatcher(); dispatcher.use((event, data) => ({ ...data, timestamp: Date.now() })); dispatcher.on('user.created', (data) => console.log('New user:', data)); dispatcher.dispatch('user.created', { name: 'Alice', email: '[email protected]' });
Core Patterns
1. Command Dispatcher (CQRS)
interface Command { type: string; payload: Record<string, any>; metadata: { userId: string; timestamp: number; correlationId: string }; } interface CommandHandler<T extends Command = Command> { execute(command: T): Promise<any>; } class CommandBus { private handlers = new Map<string, CommandHandler>(); private middleware: ((cmd: Command, next: () => Promise<any>) => Promise<any>)[] = []; register(type: string, handler: CommandHandler) { this.handlers.set(type, handler); } use(mw: (cmd: Command, next: () => Promise<any>) => Promise<any>) { this.middleware.push(mw); } async dispatch(command: Command): Promise<any> { const handler = this.handlers.get(command.type); if (!handler) throw new Error(`No handler for command: ${command.type}`); // Build middleware chain let index = 0; const next = async (): Promise<any> => { if (index < this.middleware.length) { return this.middleware[index++](command, next); } return handler.execute(command); }; return next(); } } // Usage const bus = new CommandBus(); // Logging middleware bus.use(async (cmd, next) => { console.log(`[CMD] ${cmd.type}`, cmd.payload); const start = Date.now(); const result = await next(); console.log(`[CMD] ${cmd.type} completed in ${Date.now() - start}ms`); return result; }); // Validation middleware bus.use(async (cmd, next) => { if (!cmd.metadata.userId) throw new Error('userId required'); return next(); }); bus.register('CreateOrder', { async execute(cmd) { return { orderId: 'ord_123', ...cmd.payload }; }, });
2. Event Sourcing Dispatcher
interface DomainEvent { eventId: string; aggregateId: string; type: string; data: Record<string, any>; version: number; timestamp: Date; } class EventStore { private events: DomainEvent[] = []; private subscribers = new Map<string, ((event: DomainEvent) => Promise<void>)[]>(); async append(event: DomainEvent): Promise<void> { // Optimistic concurrency check const existing = this.events.filter(e => e.aggregateId === event.aggregateId); if (existing.length > 0 && existing[existing.length - 1].version >= event.version) { throw new Error('Concurrency conflict'); } this.events.push(event); // Dispatch to subscribers const handlers = this.subscribers.get(event.type) || []; await Promise.all(handlers.map(h => h(event))); } subscribe(eventType: string, handler: (event: DomainEvent) => Promise<void>) { const handlers = this.subscribers.get(eventType) || []; handlers.push(handler); this.subscribers.set(eventType, handlers); } getEvents(aggregateId: string): DomainEvent[] { return this.events.filter(e => e.aggregateId === aggregateId); } }
3. Priority Task Queue
import { Queue, Worker, Job } from 'bullmq'; import IORedis from 'ioredis'; const connection = new IORedis({ maxRetriesPerRequest: null }); // Create priority queues const highPriority = new Queue('tasks:high', { connection }); const normalPriority = new Queue('tasks:normal', { connection }); const lowPriority = new Queue('tasks:low', { connection }); class TaskDispatcher { async dispatch(task: { type: string; data: any; priority: 'high' | 'normal' | 'low' }) { const queue = { high: highPriority, normal: normalPriority, low: lowPriority }[task.priority]; await queue.add(task.type, task.data, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: 100, removeOnFail: 50, }); } } // Worker processes tasks by priority const worker = new Worker('tasks:high', async (job: Job) => { console.log(`Processing ${job.name}:`, job.data); }, { connection, concurrency: 5 }); worker.on('completed', (job) => console.log(`Done: ${job.id}`)); worker.on('failed', (job, err) => console.error(`Failed: ${job?.id}`, err));
4. Pub/Sub with Topic Routing
class TopicRouter { private subscriptions = new Map<string, Set<(data: any) => void>>(); subscribe(pattern: string, handler: (data: any) => void) { if (!this.subscriptions.has(pattern)) { this.subscriptions.set(pattern, new Set()); } this.subscriptions.get(pattern)!.add(handler); return () => this.subscriptions.get(pattern)?.delete(handler); } publish(topic: string, data: any) { for (const [pattern, handlers] of this.subscriptions) { if (this.matches(topic, pattern)) { handlers.forEach(h => h({ topic, data, timestamp: Date.now() })); } } } private matches(topic: string, pattern: string): boolean { const regex = pattern .replace(/\./g, '\\.') .replace(/\*/g, '[^.]+') .replace(/#/g, '.*'); return new RegExp(`^${regex}$`).test(topic); } } // Usage const router = new TopicRouter(); router.subscribe('orders.*', (msg) => console.log('Order event:', msg)); router.subscribe('orders.created', (msg) => sendConfirmationEmail(msg.data)); router.subscribe('#.error', (msg) => alertOps(msg)); router.publish('orders.created', { orderId: '123', total: 99.99 }); router.publish('payments.error', { reason: 'Card declined' });
Dispatch Strategy Comparison
| Pattern | Use Case | Ordering | Durability | Latency |
|---|---|---|---|---|
| In-Process Events | Same-service decoupling | Guaranteed | None | <1ms |
| Command Bus | CQRS, request/response | Sequential | Optional | <5ms |
| Message Queue | Async task processing | FIFO per queue | Persistent | 10-100ms |
| Pub/Sub | Fan-out notifications | Per-topic | Configurable | 5-50ms |
| Event Sourcing | Audit trail, replay | Per-aggregate | Full history | 10-50ms |
| Saga Orchestration | Multi-step transactions | Workflow-defined | Persistent | Varies |
Configuration Reference
| Setting | Default | Description |
|---|---|---|
concurrency | 1 | Parallel task processing per worker |
maxRetries | 3 | Retry attempts on failure |
backoffType | exponential | Retry delay strategy |
backoffDelay | 1000ms | Initial retry delay |
timeout | 30000ms | Task execution timeout |
removeOnComplete | 100 | Keep last N completed jobs |
deadLetterQueue | null | Queue for permanently failed tasks |
rateLimiter.max | unlimited | Max jobs per time window |
rateLimiter.duration | 1000ms | Rate limit window |
Best Practices
- Use idempotent handlers — Messages may be delivered more than once; design handlers to safely re-process
- Include correlation IDs — Thread a unique ID through all events in a workflow for tracing
- Separate command and query paths — Commands change state, queries read it — don't mix
- Dead letter queue everything — Failed messages should go somewhere recoverable, not disappear
- Schema version your events — Add a version field so consumers can handle format changes
- Monitor queue depth — Set alerts when queues grow beyond expected thresholds
- Limit payload size — Store large data externally, pass references in messages
- Use circuit breakers — Protect downstream services from cascading failures
- Log dispatch decisions — Record why messages were routed where for debugging
- Test with chaos — Simulate failures, delays, and out-of-order delivery in tests
Troubleshooting
Messages processing out of order
// Add sequence numbers and reorder buffer class OrderedProcessor { private buffer = new Map<number, any>(); private nextExpected = 0; process(seq: number, data: any) { this.buffer.set(seq, data); while (this.buffer.has(this.nextExpected)) { this.handle(this.buffer.get(this.nextExpected)); this.buffer.delete(this.nextExpected); this.nextExpected++; } } }
Memory leaks from unremoved listeners
// Always return unsubscribe functions and call them const unsub = dispatcher.on('event', handler); // When done: unsub(); // Or use AbortController const controller = new AbortController(); dispatcher.on('event', handler, { signal: controller.signal }); controller.abort(); // Removes listener
Queue backpressure causing OOM
// Set max queue size and apply backpressure const queue = new Queue('tasks', { defaultJobOptions: { removeOnComplete: 50, removeOnFail: 20, }, }); // Check queue size before adding const waiting = await queue.getWaitingCount(); if (waiting > 10000) { throw new Error('Queue at capacity — try again later'); }
Reviews
No reviews yet. Be the first to review this template!
Similar Templates
Full-Stack Code Reviewer
Comprehensive code review skill that checks for security vulnerabilities, performance issues, accessibility, and best practices across frontend and backend code.
Test Suite Generator
Generates comprehensive test suites with unit tests, integration tests, and edge cases. Supports Jest, Vitest, Pytest, and Go testing.
Pro Architecture Workspace
Battle-tested skill for architectural, decision, making, framework. Includes structured workflows, validation checks, and reusable patterns for development.