🔄 Event-Driven Architecture: Diseñando Sistemas Resilientes y Escalables

¡Hola arquitectos! :building_construction:

La evolución hacia sistemas distribuidos ha puesto la Event-Driven Architecture (EDA) en el centro de las discusiones arquitecturales. Cuando se implementa correctamente, EDA permite construir sistemas que escalan independientemente, se recuperan automáticamente de fallos y evolucionan sin romper dependencias.

:bullseye: ¿Por Qué Event-Driven Architecture?

El Problema de las Arquitecturas Síncronas

# Flujo típico síncrono - punto de fallo único
Usuario → API Gateway → Auth Service → Order Service → Payment Service → Inventory Service
   ↓                                        ↓                ↓                ↓
Timeout                               Si falla uno,      falla todo      el resto falla

La Solución Event-Driven

# Flujo asíncrono con eventos
Usuario → API Gateway → Order Service → Event Bus
                           ↓              ↓
                    Respuesta inmediata    ├→ Payment Service
                                          ├→ Inventory Service
                                          ├→ Notification Service
                                          └→ Analytics Service

# Cada servicio procesa independientemente

:incoming_envelope: Patrones Fundamentales de EDA

1. Event Sourcing

En lugar de almacenar el estado actual, almacenamos la secuencia de eventos que llevaron a ese estado:

// ❌ Estado tradicional
const account = {
    id: "123",
    balance: 1500,
    lastUpdated: "2025-01-15"
};

// ✅ Event Sourcing
const events = [
    { type: "AccountCreated", accountId: "123", initialBalance: 0, timestamp: "2025-01-01" },
    { type: "MoneyDeposited", accountId: "123", amount: 2000, timestamp: "2025-01-10" },
    { type: "MoneyWithdrawn", accountId: "123", amount: 500, timestamp: "2025-01-15" }
];

// El balance actual se calcula reproduciendo eventos
function calculateBalance(events) {
    return events.reduce((balance, event) => {
        switch(event.type) {
            case "AccountCreated": return event.initialBalance;
            case "MoneyDeposited": return balance + event.amount;
            case "MoneyWithdrawn": return balance - event.amount;
            default: return balance;
        }
    }, 0);
}

2. CQRS (Command Query Responsibility Segregation)

Separar operaciones de lectura y escritura para optimizar cada una independientemente:

// Lado de Commands (Escritura)
interface CreateOrderCommand {
    customerId: string;
    items: OrderItem[];
    paymentMethod: string;
}

class OrderCommandHandler {
    async handle(command: CreateOrderCommand): Promise<void> {
        // Validar comando
        const order = new Order(command.customerId, command.items);
        
        // Guardar eventos
        await this.eventStore.append(order.getUncommittedEvents());
        
        // Publicar eventos
        for (const event of order.getUncommittedEvents()) {
            await this.eventBus.publish(event);
        }
    }
}

// Lado de Queries (Lectura)
interface OrderProjection {
    id: string;
    customerId: string;
    status: OrderStatus;
    total: number;
    items: OrderItemProjection[];
}

class OrderQueryHandler {
    constructor(private readModel: OrderReadModel) {}
    
    async getOrder(orderId: string): Promise<OrderProjection> {
        return await this.readModel.findById(orderId);
    }
    
    async getOrdersByCustomer(customerId: string): Promise<OrderProjection[]> {
        return await this.readModel.findByCustomerId(customerId);
    }
}

3. Saga Pattern

Para transacciones distribuidas que requieren coordinación entre múltiples servicios:

// Saga de procesamiento de pedido
class OrderProcessingSaga {
    private steps = [
        { service: 'inventory', action: 'reserve', compensate: 'release' },
        { service: 'payment', action: 'charge', compensate: 'refund' },
        { service: 'shipping', action: 'create', compensate: 'cancel' }
    ];
    
    async execute(order: Order): Promise<void> {
        const executedSteps: string[] = [];
        
        try {
            for (const step of this.steps) {
                await this.executeStep(step, order);
                executedSteps.push(step.service);
            }
        } catch (error) {
            // Compensar en orden inverso
            for (const service of executedSteps.reverse()) {
                await this.compensateStep(service, order);
            }
            throw error;
        }
    }
    
    private async executeStep(step: SagaStep, order: Order): Promise<void> {
        const event = {
            type: `${step.service}.${step.action}`,
            orderId: order.id,
            data: order.toJSON()
        };
        
        await this.eventBus.publish(event);
        
        // Esperar confirmación con timeout
        await this.waitForConfirmation(event, 30000);
    }
}

:rocket: Implementación con Apache Kafka

Configuración Optimizada para Producción

# docker-compose.yml para Kafka cluster
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zk-data:/var/lib/zookeeper/data
      - zk-logs:/var/lib/zookeeper/log

  kafka-1:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
      # Configuración para alta throughput
      KAFKA_NUM_NETWORK_THREADS: 8
      KAFKA_NUM_IO_THREADS: 8
      KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
      KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824

  kafka-2:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
      # ... misma configuración

  kafka-3:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
      # ... misma configuración

Producer Optimizado

import { Kafka, Partitioners } from 'kafkajs';

class EventPublisher {
    private kafka: Kafka;
    private producer: Producer;
    
    constructor() {
        this.kafka = new Kafka({
            clientId: 'order-service',
            brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
            retry: {
                initialRetryTime: 100,
                retries: 8,
                maxRetryTime: 30000,
                factor: 2
            }
        });
        
        this.producer = this.kafka.producer({
            createPartitioner: Partitioners.LegacyPartitioner,
            // Configuración para alta throughput
            maxInFlightRequests: 5,
            idempotent: true,
            transactionTimeout: 30000
        });
    }
    
    async publishEvent<T>(topic: string, event: T, partitionKey?: string): Promise<void> {
        const message = {
            key: partitionKey || this.generateKey(event),
            value: JSON.stringify({
                ...event,
                eventId: crypto.randomUUID(),
                timestamp: new Date().toISOString(),
                version: '1.0'
            }),
            headers: {
                'content-type': 'application/json',
                'event-type': event.constructor.name
            }
        };
        
        await this.producer.send({
            topic,
            messages: [message],
            timeout: 10000
        });
    }
    
    // Batch publishing para mejor performance
    async publishBatch<T>(topic: string, events: T[]): Promise<void> {
        const messages = events.map(event => ({
            key: this.generateKey(event),
            value: JSON.stringify({
                ...event,
                eventId: crypto.randomUUID(),
                timestamp: new Date().toISOString()
            })
        }));
        
        await this.producer.sendBatch({
            topicMessages: [{
                topic,
                messages
            }]
        });
    }
}

Consumer con Dead Letter Queue

class EventConsumer {
    private consumer: Consumer;
    private deadLetterProducer: Producer;
    
    constructor(groupId: string) {
        this.consumer = this.kafka.consumer({
            groupId,
            sessionTimeout: 30000,
            heartbeatInterval: 3000,
            maxWaitTimeInMs: 5000,
            allowAutoTopicCreation: false
        });
    }
    
    async subscribe(topic: string, handler: EventHandler): Promise<void> {
        await this.consumer.subscribe({ topic });
        
        await this.consumer.run({
            partitionsConsumedConcurrently: 3,
            eachMessage: async ({ topic, partition, message }) => {
                const retryCount = parseInt(message.headers?.['retry-count']?.toString() || '0');
                
                try {
                    const event = JSON.parse(message.value.toString());
                    await handler.handle(event);
                    
                } catch (error) {
                    if (retryCount < 3) {
                        // Retry con backoff exponencial
                        await this.scheduleRetry(topic, message, retryCount + 1);
                    } else {
                        // Enviar a Dead Letter Queue
                        await this.sendToDeadLetter(topic, message, error);
                    }
                }
            }
        });
    }
    
    private async scheduleRetry(topic: string, message: KafkaMessage, retryCount: number): Promise<void> {
        const delay = Math.pow(2, retryCount) * 1000; // Backoff exponencial
        
        setTimeout(async () => {
            await this.producer.send({
                topic: `${topic}.retry`,
                messages: [{
                    ...message,
                    headers: {
                        ...message.headers,
                        'retry-count': retryCount.toString(),
                        'original-topic': topic
                    }
                }]
            });
        }, delay);
    }
    
    private async sendToDeadLetter(topic: string, message: KafkaMessage, error: Error): Promise<void> {
        await this.deadLetterProducer.send({
            topic: `${topic}.dead-letter`,
            messages: [{
                ...message,
                headers: {
                    ...message.headers,
                    'error-message': error.message,
                    'error-timestamp': new Date().toISOString(),
                    'original-topic': topic
                }
            }]
        });
    }
}

:counterclockwise_arrows_button: Outbox Pattern para Consistencia

Para garantizar que los eventos se publiquen junto con los cambios de estado:

class OrderService {
    constructor(
        private orderRepository: OrderRepository,
        private outboxRepository: OutboxRepository,
        private transactionManager: TransactionManager
    ) {}
    
    async createOrder(command: CreateOrderCommand): Promise<void> {
        await this.transactionManager.executeInTransaction(async (tx) => {
            // 1. Crear la orden
            const order = new Order(command);
            await this.orderRepository.save(order, tx);
            
            // 2. Guardar eventos en outbox en la misma transacción
            const events = order.getUncommittedEvents();
            for (const event of events) {
                await this.outboxRepository.save({
                    id: crypto.randomUUID(),
                    aggregateId: order.id,
                    eventType: event.constructor.name,
                    eventData: JSON.stringify(event),
                    createdAt: new Date(),
                    processed: false
                }, tx);
            }
        });
        
        // 3. Background job publica eventos del outbox
        this.scheduleOutboxProcessing();
    }
}

// Processor separado para el outbox
class OutboxProcessor {
    async processOutboxEvents(): Promise<void> {
        const unprocessedEvents = await this.outboxRepository.findUnprocessed(100);
        
        for (const outboxEvent of unprocessedEvents) {
            try {
                await this.eventPublisher.publishEvent(
                    this.getTopicName(outboxEvent.eventType),
                    JSON.parse(outboxEvent.eventData),
                    outboxEvent.aggregateId
                );
                
                await this.outboxRepository.markAsProcessed(outboxEvent.id);
                
            } catch (error) {
                // Log error, implementar retry logic
                console.error(`Failed to process outbox event ${outboxEvent.id}:`, error);
            }
        }
    }
}

:bar_chart: Monitoring y Observabilidad

Métricas Críticas para EDA

class EventMetrics {
    private prometheus = require('prom-client');
    
    private eventPublishedCounter = new this.prometheus.Counter({
        name: 'events_published_total',
        help: 'Total number of events published',
        labelNames: ['topic', 'event_type', 'status']
    });
    
    private eventProcessingDuration = new this.prometheus.Histogram({
        name: 'event_processing_duration_seconds',
        help: 'Duration of event processing',
        labelNames: ['handler', 'event_type'],
        buckets: [0.1, 0.5, 1, 2, 5, 10]
    });
    
    private lagGauge = new this.prometheus.Gauge({
        name: 'consumer_lag',
        help: 'Consumer lag in messages',
        labelNames: ['topic', 'partition', 'consumer_group']
    });
    
    recordEventPublished(topic: string, eventType: string, success: boolean): void {
        this.eventPublishedCounter
            .labels(topic, eventType, success ? 'success' : 'failure')
            .inc();
    }
    
    recordProcessingTime(handler: string, eventType: string, duration: number): void {
        this.eventProcessingDuration
            .labels(handler, eventType)
            .observe(duration);
    }
}

Health Checks para Componentes EDA

class EDAHealthCheck {
    async checkKafkaConnectivity(): Promise<HealthStatus> {
        try {
            const admin = this.kafka.admin();
            await admin.connect();
            const metadata = await admin.fetchTopicMetadata();
            await admin.disconnect();
            
            return {
                status: 'healthy',
                details: { topics: metadata.topics.length }
            };
        } catch (error) {
            return {
                status: 'unhealthy',
                error: error.message
            };
        }
    }
    
    async checkConsumerLag(): Promise<HealthStatus> {
        const admin = this.kafka.admin();
        const lag = await admin.fetchOffsets({
            groupId: 'order-processing-group',
            topics: ['orders', 'payments', 'inventory']
        });
        
        const maxLag = Math.max(...lag.map(t => 
            Math.max(...t.partitions.map(p => p.lag))
        ));
        
        return {
            status: maxLag > 1000 ? 'degraded' : 'healthy',
            details: { maxLag }
        };
    }
}

:bullseye: Mejores Prácticas de Implementación

1. Diseño de Eventos

// ✅ Buen diseño de evento
interface OrderCreatedEvent {
    eventId: string;
    eventType: 'OrderCreated';
    aggregateId: string;
    aggregateVersion: number;
    timestamp: string;
    data: {
        orderId: string;
        customerId: string;
        items: OrderItem[];
        totalAmount: number;
        currency: string;
    };
    metadata: {
        correlationId: string;
        causationId?: string;
        userId: string;
        source: string;
    };
}

// ❌ Mal diseño - demasiado acoplado
interface BadOrderEvent {
    orderObject: Order; // ¡No serializar objetos completos!
    customerObject: Customer;
    database: 'postgres';
    internalId: 12345;
}

2. Versionado de Eventos

class EventVersioning {
    private migrators = new Map<string, Function[]>();
    
    constructor() {
        // Registrar migradores de versión
        this.migrators.set('OrderCreated', [
            this.migrateOrderCreatedV1ToV2,
            this.migrateOrderCreatedV2ToV3
        ]);
    }
    
    deserializeEvent(serializedEvent: string): any {
        const eventData = JSON.parse(serializedEvent);
        const currentVersion = this.getCurrentVersion(eventData.eventType);
        
        if (eventData.version < currentVersion) {
            return this.migrateToCurrentVersion(eventData);
        }
        
        return eventData;
    }
    
    private migrateOrderCreatedV1ToV2(event: any): any {
        // V1 → V2: Agregar campo currency
        return {
            ...event,
            version: 2,
            data: {
                ...event.data,
                currency: 'USD' // Valor por defecto
            }
        };
    }
}

3. Testing de Sistemas Event-Driven

describe('Order Processing Saga', () => {
    let saga: OrderProcessingSaga;
    let mockEventBus: jest.Mocked<EventBus>;
    
    beforeEach(() => {
        mockEventBus = {
            publish: jest.fn(),
            subscribe: jest.fn()
        };
        saga = new OrderProcessingSaga(mockEventBus);
    });
    
    it('should complete successfully when all steps succeed', async () => {
        // Arrange
        const order = new Order('123', [{ id: '1', quantity: 2 }]);
        
        // Simular respuestas exitosas
        mockEventBus.publish.mockImplementation(async (event) => {
            if (event.type.includes('inventory.reserve')) {
                await saga.handleEvent({ type: 'inventory.reserved', orderId: order.id });
            }
            // ... simular otros eventos
        });
        
        // Act
        await saga.execute(order);
        
        // Assert
        expect(mockEventBus.publish).toHaveBeenCalledTimes(3);
        expect(mockEventBus.publish).toHaveBeenCalledWith(
            expect.objectContaining({
                type: 'inventory.reserve'
            })
        );
    });
    
    it('should compensate when step fails', async () => {
        // Arrange
        const order = new Order('123', [{ id: '1', quantity: 2 }]);
        
        mockEventBus.publish.mockImplementation(async (event) => {
            if (event.type.includes('payment.charge')) {
                throw new Error('Payment failed');
            }
        });
        
        // Act & Assert
        await expect(saga.execute(order)).rejects.toThrow('Payment failed');
        
        // Verificar que se ejecutó compensación
        expect(mockEventBus.publish).toHaveBeenCalledWith(
            expect.objectContaining({
                type: 'inventory.release'
            })
        );
    });
});

:police_car_light: Desafíos y Consideraciones

1. Eventual Consistency

Los sistemas event-driven son eventualmente consistentes, lo que requiere:

  • Diseñar UX que maneje estados intermedios
  • Implementar idempotencia en todos los handlers
  • Comunicar claramente los estados a los usuarios

2. Debugging Distribuido

  • Correlation IDs en todos los eventos
  • Distributed tracing con herramientas como Jaeger
  • Event sourcing para reconstruir estados históricos

3. Complejidad Operacional

  • Monitoreo proactivo de lag de consumers
  • Alertas por eventos no procesados
  • Runbooks para escenarios de fallo comunes

:bullseye: Cuándo Usar Event-Driven Architecture

:white_check_mark: Casos Ideales:

  • Sistemas con múltiples bounded contexts
  • Requisitos de alta escalabilidad
  • Necesidad de auditabilidad completa
  • Integración con múltiples sistemas externos

:cross_mark: Cuándo NO Usar:

  • Aplicaciones simples CRUD
  • Equipos sin experiencia en sistemas distribuidos
  • Requisitos de consistencia estricta inmediata
  • Recursos limitados para operaciones

Event-Driven Architecture no es una bala de plata, pero cuando se implementa correctamente, proporciona una base sólida para sistemas que necesitan escalar, evolucionar y mantener alta disponibilidad.

¿Qué patrones de EDA han implementado en sus arquitecturas? ¿Cuáles han sido los principales desafíos que han enfrentado?

#EventDrivenArchitecture #Microservices #Apache #Kafka #CQRS #EventSourcing #DistributedSystems #Saga