¡Hola arquitectos!
La evolución hacia sistemas distribuidos ha puesto la Event-Driven Architecture (EDA) en el centro de las discusiones arquitecturales. Cuando se implementa correctamente, EDA permite construir sistemas que escalan independientemente, se recuperan automáticamente de fallos y evolucionan sin romper dependencias.
¿Por Qué Event-Driven Architecture?
El Problema de las Arquitecturas Síncronas
# Flujo típico síncrono - punto de fallo único
Usuario → API Gateway → Auth Service → Order Service → Payment Service → Inventory Service
↓ ↓ ↓ ↓
Timeout Si falla uno, falla todo el resto falla
La Solución Event-Driven
# Flujo asíncrono con eventos
Usuario → API Gateway → Order Service → Event Bus
↓ ↓
Respuesta inmediata ├→ Payment Service
├→ Inventory Service
├→ Notification Service
└→ Analytics Service
# Cada servicio procesa independientemente
Patrones Fundamentales de EDA
1. Event Sourcing
En lugar de almacenar el estado actual, almacenamos la secuencia de eventos que llevaron a ese estado:
// ❌ Estado tradicional
const account = {
id: "123",
balance: 1500,
lastUpdated: "2025-01-15"
};
// ✅ Event Sourcing
const events = [
{ type: "AccountCreated", accountId: "123", initialBalance: 0, timestamp: "2025-01-01" },
{ type: "MoneyDeposited", accountId: "123", amount: 2000, timestamp: "2025-01-10" },
{ type: "MoneyWithdrawn", accountId: "123", amount: 500, timestamp: "2025-01-15" }
];
// El balance actual se calcula reproduciendo eventos
function calculateBalance(events) {
return events.reduce((balance, event) => {
switch(event.type) {
case "AccountCreated": return event.initialBalance;
case "MoneyDeposited": return balance + event.amount;
case "MoneyWithdrawn": return balance - event.amount;
default: return balance;
}
}, 0);
}
2. CQRS (Command Query Responsibility Segregation)
Separar operaciones de lectura y escritura para optimizar cada una independientemente:
// Lado de Commands (Escritura)
interface CreateOrderCommand {
customerId: string;
items: OrderItem[];
paymentMethod: string;
}
class OrderCommandHandler {
async handle(command: CreateOrderCommand): Promise<void> {
// Validar comando
const order = new Order(command.customerId, command.items);
// Guardar eventos
await this.eventStore.append(order.getUncommittedEvents());
// Publicar eventos
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event);
}
}
}
// Lado de Queries (Lectura)
interface OrderProjection {
id: string;
customerId: string;
status: OrderStatus;
total: number;
items: OrderItemProjection[];
}
class OrderQueryHandler {
constructor(private readModel: OrderReadModel) {}
async getOrder(orderId: string): Promise<OrderProjection> {
return await this.readModel.findById(orderId);
}
async getOrdersByCustomer(customerId: string): Promise<OrderProjection[]> {
return await this.readModel.findByCustomerId(customerId);
}
}
3. Saga Pattern
Para transacciones distribuidas que requieren coordinación entre múltiples servicios:
// Saga de procesamiento de pedido
class OrderProcessingSaga {
private steps = [
{ service: 'inventory', action: 'reserve', compensate: 'release' },
{ service: 'payment', action: 'charge', compensate: 'refund' },
{ service: 'shipping', action: 'create', compensate: 'cancel' }
];
async execute(order: Order): Promise<void> {
const executedSteps: string[] = [];
try {
for (const step of this.steps) {
await this.executeStep(step, order);
executedSteps.push(step.service);
}
} catch (error) {
// Compensar en orden inverso
for (const service of executedSteps.reverse()) {
await this.compensateStep(service, order);
}
throw error;
}
}
private async executeStep(step: SagaStep, order: Order): Promise<void> {
const event = {
type: `${step.service}.${step.action}`,
orderId: order.id,
data: order.toJSON()
};
await this.eventBus.publish(event);
// Esperar confirmación con timeout
await this.waitForConfirmation(event, 30000);
}
}
Implementación con Apache Kafka
Configuración Optimizada para Producción
# docker-compose.yml para Kafka cluster
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- zk-data:/var/lib/zookeeper/data
- zk-logs:/var/lib/zookeeper/log
kafka-1:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
# Configuración para alta throughput
KAFKA_NUM_NETWORK_THREADS: 8
KAFKA_NUM_IO_THREADS: 8
KAFKA_SOCKET_SEND_BUFFER_BYTES: 102400
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 102400
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
kafka-2:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092
# ... misma configuración
kafka-3:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092
# ... misma configuración
Producer Optimizado
import { Kafka, Partitioners } from 'kafkajs';
class EventPublisher {
private kafka: Kafka;
private producer: Producer;
constructor() {
this.kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
retry: {
initialRetryTime: 100,
retries: 8,
maxRetryTime: 30000,
factor: 2
}
});
this.producer = this.kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
// Configuración para alta throughput
maxInFlightRequests: 5,
idempotent: true,
transactionTimeout: 30000
});
}
async publishEvent<T>(topic: string, event: T, partitionKey?: string): Promise<void> {
const message = {
key: partitionKey || this.generateKey(event),
value: JSON.stringify({
...event,
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
version: '1.0'
}),
headers: {
'content-type': 'application/json',
'event-type': event.constructor.name
}
};
await this.producer.send({
topic,
messages: [message],
timeout: 10000
});
}
// Batch publishing para mejor performance
async publishBatch<T>(topic: string, events: T[]): Promise<void> {
const messages = events.map(event => ({
key: this.generateKey(event),
value: JSON.stringify({
...event,
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString()
})
}));
await this.producer.sendBatch({
topicMessages: [{
topic,
messages
}]
});
}
}
Consumer con Dead Letter Queue
class EventConsumer {
private consumer: Consumer;
private deadLetterProducer: Producer;
constructor(groupId: string) {
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxWaitTimeInMs: 5000,
allowAutoTopicCreation: false
});
}
async subscribe(topic: string, handler: EventHandler): Promise<void> {
await this.consumer.subscribe({ topic });
await this.consumer.run({
partitionsConsumedConcurrently: 3,
eachMessage: async ({ topic, partition, message }) => {
const retryCount = parseInt(message.headers?.['retry-count']?.toString() || '0');
try {
const event = JSON.parse(message.value.toString());
await handler.handle(event);
} catch (error) {
if (retryCount < 3) {
// Retry con backoff exponencial
await this.scheduleRetry(topic, message, retryCount + 1);
} else {
// Enviar a Dead Letter Queue
await this.sendToDeadLetter(topic, message, error);
}
}
}
});
}
private async scheduleRetry(topic: string, message: KafkaMessage, retryCount: number): Promise<void> {
const delay = Math.pow(2, retryCount) * 1000; // Backoff exponencial
setTimeout(async () => {
await this.producer.send({
topic: `${topic}.retry`,
messages: [{
...message,
headers: {
...message.headers,
'retry-count': retryCount.toString(),
'original-topic': topic
}
}]
});
}, delay);
}
private async sendToDeadLetter(topic: string, message: KafkaMessage, error: Error): Promise<void> {
await this.deadLetterProducer.send({
topic: `${topic}.dead-letter`,
messages: [{
...message,
headers: {
...message.headers,
'error-message': error.message,
'error-timestamp': new Date().toISOString(),
'original-topic': topic
}
}]
});
}
}
Outbox Pattern para Consistencia
Para garantizar que los eventos se publiquen junto con los cambios de estado:
class OrderService {
constructor(
private orderRepository: OrderRepository,
private outboxRepository: OutboxRepository,
private transactionManager: TransactionManager
) {}
async createOrder(command: CreateOrderCommand): Promise<void> {
await this.transactionManager.executeInTransaction(async (tx) => {
// 1. Crear la orden
const order = new Order(command);
await this.orderRepository.save(order, tx);
// 2. Guardar eventos en outbox en la misma transacción
const events = order.getUncommittedEvents();
for (const event of events) {
await this.outboxRepository.save({
id: crypto.randomUUID(),
aggregateId: order.id,
eventType: event.constructor.name,
eventData: JSON.stringify(event),
createdAt: new Date(),
processed: false
}, tx);
}
});
// 3. Background job publica eventos del outbox
this.scheduleOutboxProcessing();
}
}
// Processor separado para el outbox
class OutboxProcessor {
async processOutboxEvents(): Promise<void> {
const unprocessedEvents = await this.outboxRepository.findUnprocessed(100);
for (const outboxEvent of unprocessedEvents) {
try {
await this.eventPublisher.publishEvent(
this.getTopicName(outboxEvent.eventType),
JSON.parse(outboxEvent.eventData),
outboxEvent.aggregateId
);
await this.outboxRepository.markAsProcessed(outboxEvent.id);
} catch (error) {
// Log error, implementar retry logic
console.error(`Failed to process outbox event ${outboxEvent.id}:`, error);
}
}
}
}
Monitoring y Observabilidad
Métricas Críticas para EDA
class EventMetrics {
private prometheus = require('prom-client');
private eventPublishedCounter = new this.prometheus.Counter({
name: 'events_published_total',
help: 'Total number of events published',
labelNames: ['topic', 'event_type', 'status']
});
private eventProcessingDuration = new this.prometheus.Histogram({
name: 'event_processing_duration_seconds',
help: 'Duration of event processing',
labelNames: ['handler', 'event_type'],
buckets: [0.1, 0.5, 1, 2, 5, 10]
});
private lagGauge = new this.prometheus.Gauge({
name: 'consumer_lag',
help: 'Consumer lag in messages',
labelNames: ['topic', 'partition', 'consumer_group']
});
recordEventPublished(topic: string, eventType: string, success: boolean): void {
this.eventPublishedCounter
.labels(topic, eventType, success ? 'success' : 'failure')
.inc();
}
recordProcessingTime(handler: string, eventType: string, duration: number): void {
this.eventProcessingDuration
.labels(handler, eventType)
.observe(duration);
}
}
Health Checks para Componentes EDA
class EDAHealthCheck {
async checkKafkaConnectivity(): Promise<HealthStatus> {
try {
const admin = this.kafka.admin();
await admin.connect();
const metadata = await admin.fetchTopicMetadata();
await admin.disconnect();
return {
status: 'healthy',
details: { topics: metadata.topics.length }
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message
};
}
}
async checkConsumerLag(): Promise<HealthStatus> {
const admin = this.kafka.admin();
const lag = await admin.fetchOffsets({
groupId: 'order-processing-group',
topics: ['orders', 'payments', 'inventory']
});
const maxLag = Math.max(...lag.map(t =>
Math.max(...t.partitions.map(p => p.lag))
));
return {
status: maxLag > 1000 ? 'degraded' : 'healthy',
details: { maxLag }
};
}
}
Mejores Prácticas de Implementación
1. Diseño de Eventos
// ✅ Buen diseño de evento
interface OrderCreatedEvent {
eventId: string;
eventType: 'OrderCreated';
aggregateId: string;
aggregateVersion: number;
timestamp: string;
data: {
orderId: string;
customerId: string;
items: OrderItem[];
totalAmount: number;
currency: string;
};
metadata: {
correlationId: string;
causationId?: string;
userId: string;
source: string;
};
}
// ❌ Mal diseño - demasiado acoplado
interface BadOrderEvent {
orderObject: Order; // ¡No serializar objetos completos!
customerObject: Customer;
database: 'postgres';
internalId: 12345;
}
2. Versionado de Eventos
class EventVersioning {
private migrators = new Map<string, Function[]>();
constructor() {
// Registrar migradores de versión
this.migrators.set('OrderCreated', [
this.migrateOrderCreatedV1ToV2,
this.migrateOrderCreatedV2ToV3
]);
}
deserializeEvent(serializedEvent: string): any {
const eventData = JSON.parse(serializedEvent);
const currentVersion = this.getCurrentVersion(eventData.eventType);
if (eventData.version < currentVersion) {
return this.migrateToCurrentVersion(eventData);
}
return eventData;
}
private migrateOrderCreatedV1ToV2(event: any): any {
// V1 → V2: Agregar campo currency
return {
...event,
version: 2,
data: {
...event.data,
currency: 'USD' // Valor por defecto
}
};
}
}
3. Testing de Sistemas Event-Driven
describe('Order Processing Saga', () => {
let saga: OrderProcessingSaga;
let mockEventBus: jest.Mocked<EventBus>;
beforeEach(() => {
mockEventBus = {
publish: jest.fn(),
subscribe: jest.fn()
};
saga = new OrderProcessingSaga(mockEventBus);
});
it('should complete successfully when all steps succeed', async () => {
// Arrange
const order = new Order('123', [{ id: '1', quantity: 2 }]);
// Simular respuestas exitosas
mockEventBus.publish.mockImplementation(async (event) => {
if (event.type.includes('inventory.reserve')) {
await saga.handleEvent({ type: 'inventory.reserved', orderId: order.id });
}
// ... simular otros eventos
});
// Act
await saga.execute(order);
// Assert
expect(mockEventBus.publish).toHaveBeenCalledTimes(3);
expect(mockEventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
type: 'inventory.reserve'
})
);
});
it('should compensate when step fails', async () => {
// Arrange
const order = new Order('123', [{ id: '1', quantity: 2 }]);
mockEventBus.publish.mockImplementation(async (event) => {
if (event.type.includes('payment.charge')) {
throw new Error('Payment failed');
}
});
// Act & Assert
await expect(saga.execute(order)).rejects.toThrow('Payment failed');
// Verificar que se ejecutó compensación
expect(mockEventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
type: 'inventory.release'
})
);
});
});
Desafíos y Consideraciones
1. Eventual Consistency
Los sistemas event-driven son eventualmente consistentes, lo que requiere:
- Diseñar UX que maneje estados intermedios
- Implementar idempotencia en todos los handlers
- Comunicar claramente los estados a los usuarios
2. Debugging Distribuido
- Correlation IDs en todos los eventos
- Distributed tracing con herramientas como Jaeger
- Event sourcing para reconstruir estados históricos
3. Complejidad Operacional
- Monitoreo proactivo de lag de consumers
- Alertas por eventos no procesados
- Runbooks para escenarios de fallo comunes
Cuándo Usar Event-Driven Architecture
Casos Ideales:
- Sistemas con múltiples bounded contexts
- Requisitos de alta escalabilidad
- Necesidad de auditabilidad completa
- Integración con múltiples sistemas externos
Cuándo NO Usar:
- Aplicaciones simples CRUD
- Equipos sin experiencia en sistemas distribuidos
- Requisitos de consistencia estricta inmediata
- Recursos limitados para operaciones
Event-Driven Architecture no es una bala de plata, pero cuando se implementa correctamente, proporciona una base sólida para sistemas que necesitan escalar, evolucionar y mantener alta disponibilidad.
¿Qué patrones de EDA han implementado en sus arquitecturas? ¿Cuáles han sido los principales desafíos que han enfrentado?
#EventDrivenArchitecture #Microservices #Apache #Kafka #CQRS #EventSourcing #DistributedSystems #Saga