U

Ultimate Dispatching Framework

Comprehensive skill designed for facing, independent, tasks, worked. Includes structured workflows, validation checks, and reusable patterns for ai research.

SkillClipticsai researchv1.0.0MIT
0 views0 copies

Ultimate Dispatching Framework

Overview

A comprehensive skill for designing and implementing event dispatching, message routing, and task distribution systems in modern applications. Covers event-driven architectures, message queues, pub/sub patterns, command dispatching, and workflow orchestration — enabling scalable, loosely-coupled systems that handle complex routing logic with reliability.

When to Use

  • Building event-driven microservices architectures
  • Implementing CQRS (Command Query Responsibility Segregation)
  • Designing pub/sub messaging systems
  • Creating task queues with priority routing
  • Building workflow orchestration engines
  • Implementing the mediator pattern for decoupled components
  • Routing requests across distributed services

Quick Start

# Event dispatcher with TypeScript npm init -y && npm install eventemitter3 bullmq ioredis # Python event-driven system pip install celery redis kombu
// Minimal event dispatcher import { EventEmitter } from 'eventemitter3'; class EventDispatcher { private emitter = new EventEmitter(); private middleware: ((event: string, data: any) => any)[] = []; use(fn: (event: string, data: any) => any) { this.middleware.push(fn); } on(event: string, handler: (...args: any[]) => void) { this.emitter.on(event, handler); } dispatch(event: string, data: any) { let processed = data; for (const mw of this.middleware) { processed = mw(event, processed) ?? processed; } this.emitter.emit(event, processed); } } const dispatcher = new EventDispatcher(); dispatcher.use((event, data) => ({ ...data, timestamp: Date.now() })); dispatcher.on('user.created', (data) => console.log('New user:', data)); dispatcher.dispatch('user.created', { name: 'Alice', email: '[email protected]' });

Core Patterns

1. Command Dispatcher (CQRS)

interface Command { type: string; payload: Record<string, any>; metadata: { userId: string; timestamp: number; correlationId: string }; } interface CommandHandler<T extends Command = Command> { execute(command: T): Promise<any>; } class CommandBus { private handlers = new Map<string, CommandHandler>(); private middleware: ((cmd: Command, next: () => Promise<any>) => Promise<any>)[] = []; register(type: string, handler: CommandHandler) { this.handlers.set(type, handler); } use(mw: (cmd: Command, next: () => Promise<any>) => Promise<any>) { this.middleware.push(mw); } async dispatch(command: Command): Promise<any> { const handler = this.handlers.get(command.type); if (!handler) throw new Error(`No handler for command: ${command.type}`); // Build middleware chain let index = 0; const next = async (): Promise<any> => { if (index < this.middleware.length) { return this.middleware[index++](command, next); } return handler.execute(command); }; return next(); } } // Usage const bus = new CommandBus(); // Logging middleware bus.use(async (cmd, next) => { console.log(`[CMD] ${cmd.type}`, cmd.payload); const start = Date.now(); const result = await next(); console.log(`[CMD] ${cmd.type} completed in ${Date.now() - start}ms`); return result; }); // Validation middleware bus.use(async (cmd, next) => { if (!cmd.metadata.userId) throw new Error('userId required'); return next(); }); bus.register('CreateOrder', { async execute(cmd) { return { orderId: 'ord_123', ...cmd.payload }; }, });

2. Event Sourcing Dispatcher

interface DomainEvent { eventId: string; aggregateId: string; type: string; data: Record<string, any>; version: number; timestamp: Date; } class EventStore { private events: DomainEvent[] = []; private subscribers = new Map<string, ((event: DomainEvent) => Promise<void>)[]>(); async append(event: DomainEvent): Promise<void> { // Optimistic concurrency check const existing = this.events.filter(e => e.aggregateId === event.aggregateId); if (existing.length > 0 && existing[existing.length - 1].version >= event.version) { throw new Error('Concurrency conflict'); } this.events.push(event); // Dispatch to subscribers const handlers = this.subscribers.get(event.type) || []; await Promise.all(handlers.map(h => h(event))); } subscribe(eventType: string, handler: (event: DomainEvent) => Promise<void>) { const handlers = this.subscribers.get(eventType) || []; handlers.push(handler); this.subscribers.set(eventType, handlers); } getEvents(aggregateId: string): DomainEvent[] { return this.events.filter(e => e.aggregateId === aggregateId); } }

3. Priority Task Queue

import { Queue, Worker, Job } from 'bullmq'; import IORedis from 'ioredis'; const connection = new IORedis({ maxRetriesPerRequest: null }); // Create priority queues const highPriority = new Queue('tasks:high', { connection }); const normalPriority = new Queue('tasks:normal', { connection }); const lowPriority = new Queue('tasks:low', { connection }); class TaskDispatcher { async dispatch(task: { type: string; data: any; priority: 'high' | 'normal' | 'low' }) { const queue = { high: highPriority, normal: normalPriority, low: lowPriority }[task.priority]; await queue.add(task.type, task.data, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: 100, removeOnFail: 50, }); } } // Worker processes tasks by priority const worker = new Worker('tasks:high', async (job: Job) => { console.log(`Processing ${job.name}:`, job.data); }, { connection, concurrency: 5 }); worker.on('completed', (job) => console.log(`Done: ${job.id}`)); worker.on('failed', (job, err) => console.error(`Failed: ${job?.id}`, err));

4. Pub/Sub with Topic Routing

class TopicRouter { private subscriptions = new Map<string, Set<(data: any) => void>>(); subscribe(pattern: string, handler: (data: any) => void) { if (!this.subscriptions.has(pattern)) { this.subscriptions.set(pattern, new Set()); } this.subscriptions.get(pattern)!.add(handler); return () => this.subscriptions.get(pattern)?.delete(handler); } publish(topic: string, data: any) { for (const [pattern, handlers] of this.subscriptions) { if (this.matches(topic, pattern)) { handlers.forEach(h => h({ topic, data, timestamp: Date.now() })); } } } private matches(topic: string, pattern: string): boolean { const regex = pattern .replace(/\./g, '\\.') .replace(/\*/g, '[^.]+') .replace(/#/g, '.*'); return new RegExp(`^${regex}$`).test(topic); } } // Usage const router = new TopicRouter(); router.subscribe('orders.*', (msg) => console.log('Order event:', msg)); router.subscribe('orders.created', (msg) => sendConfirmationEmail(msg.data)); router.subscribe('#.error', (msg) => alertOps(msg)); router.publish('orders.created', { orderId: '123', total: 99.99 }); router.publish('payments.error', { reason: 'Card declined' });

Dispatch Strategy Comparison

PatternUse CaseOrderingDurabilityLatency
In-Process EventsSame-service decouplingGuaranteedNone<1ms
Command BusCQRS, request/responseSequentialOptional<5ms
Message QueueAsync task processingFIFO per queuePersistent10-100ms
Pub/SubFan-out notificationsPer-topicConfigurable5-50ms
Event SourcingAudit trail, replayPer-aggregateFull history10-50ms
Saga OrchestrationMulti-step transactionsWorkflow-definedPersistentVaries

Configuration Reference

SettingDefaultDescription
concurrency1Parallel task processing per worker
maxRetries3Retry attempts on failure
backoffTypeexponentialRetry delay strategy
backoffDelay1000msInitial retry delay
timeout30000msTask execution timeout
removeOnComplete100Keep last N completed jobs
deadLetterQueuenullQueue for permanently failed tasks
rateLimiter.maxunlimitedMax jobs per time window
rateLimiter.duration1000msRate limit window

Best Practices

  1. Use idempotent handlers — Messages may be delivered more than once; design handlers to safely re-process
  2. Include correlation IDs — Thread a unique ID through all events in a workflow for tracing
  3. Separate command and query paths — Commands change state, queries read it — don't mix
  4. Dead letter queue everything — Failed messages should go somewhere recoverable, not disappear
  5. Schema version your events — Add a version field so consumers can handle format changes
  6. Monitor queue depth — Set alerts when queues grow beyond expected thresholds
  7. Limit payload size — Store large data externally, pass references in messages
  8. Use circuit breakers — Protect downstream services from cascading failures
  9. Log dispatch decisions — Record why messages were routed where for debugging
  10. Test with chaos — Simulate failures, delays, and out-of-order delivery in tests

Troubleshooting

Messages processing out of order

// Add sequence numbers and reorder buffer class OrderedProcessor { private buffer = new Map<number, any>(); private nextExpected = 0; process(seq: number, data: any) { this.buffer.set(seq, data); while (this.buffer.has(this.nextExpected)) { this.handle(this.buffer.get(this.nextExpected)); this.buffer.delete(this.nextExpected); this.nextExpected++; } } }

Memory leaks from unremoved listeners

// Always return unsubscribe functions and call them const unsub = dispatcher.on('event', handler); // When done: unsub(); // Or use AbortController const controller = new AbortController(); dispatcher.on('event', handler, { signal: controller.signal }); controller.abort(); // Removes listener

Queue backpressure causing OOM

// Set max queue size and apply backpressure const queue = new Queue('tasks', { defaultJobOptions: { removeOnComplete: 50, removeOnFail: 20, }, }); // Check queue size before adding const waiting = await queue.getWaitingCount(); if (waiting > 10000) { throw new Error('Queue at capacity — try again later'); }
Community

Reviews

Write a review

No reviews yet. Be the first to review this template!

Similar Templates