Los sistemas modernos demandan más que simples APIs REST. Exploremos patrones event-driven que transforman aplicaciones monolíticas en sistemas distribuidos resilientes y escalables.
Event Sourcing: Más Allá del CRUD Tradicional
El Problema con Estado Mutable
Los sistemas CRUD tradicionales almacenan solo el estado actual, perdiendo el contexto histórico y dificultando auditorías, debugging y análisis temporal.
Event Sourcing en Acción
// ❌ Enfoque tradicional: Estado mutable
class UserAccount {
constructor(id, balance) {
this.id = id;
this.balance = balance; // Solo estado actual
}
withdraw(amount) {
this.balance -= amount; // Historia perdida
return this.balance;
}
}
// ✅ Event Sourcing: Inmutable por diseño
class UserAccountES {
constructor(id) {
this.id = id;
this.events = [];
this.version = 0;
}
// Commands generan events
withdraw(amount, timestamp = Date.now()) {
const event = {
id: crypto.randomUUID(),
type: 'WithdrawalAttempted',
aggregateId: this.id,
version: this.version + 1,
timestamp,
data: { amount }
};
this.applyEvent(event);
return event;
}
// State reconstruction desde events
applyEvent(event) {
this.events.push(event);
this.version = event.version;
switch(event.type) {
case 'WithdrawalAttempted':
// Lógica de aplicación del evento
break;
}
}
// Snapshot del estado actual
getCurrentBalance() {
return this.events
.filter(e => e.type === 'DepositMade')
.reduce((sum, e) => sum + e.data.amount, 0) -
this.events
.filter(e => e.type === 'WithdrawalAttempted')
.reduce((sum, e) => sum + e.data.amount, 0);
}
}
Event Store Optimizado
# Implementación de Event Store con PostgreSQL
import asyncpg
import json
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class Event:
aggregate_id: str
event_type: str
version: int
timestamp: float
data: dict
metadata: dict = None
class EventStore:
def __init__(self, connection_pool):
self.pool = connection_pool
async def append_events(self, aggregate_id: str, events: List[Event],
expected_version: Optional[int] = None):
async with self.pool.acquire() as conn:
async with conn.transaction():
# Optimistic concurrency control
if expected_version is not None:
current_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
aggregate_id
)
if current_version != expected_version:
raise ConcurrencyError(f"Expected version {expected_version}, got {current_version}")
# Batch insert para performance
await conn.executemany(
"""INSERT INTO events (aggregate_id, event_type, version, timestamp, data, metadata)
VALUES ($1, $2, $3, $4, $5, $6)""",
[(e.aggregate_id, e.event_type, e.version, e.timestamp,
json.dumps(e.data), json.dumps(e.metadata or {})) for e in events]
)
async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""SELECT event_type, version, timestamp, data, metadata
FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version""",
aggregate_id, from_version
)
return [Event(
aggregate_id=aggregate_id,
event_type=row['event_type'],
version=row['version'],
timestamp=row['timestamp'],
data=json.loads(row['data']),
metadata=json.loads(row['metadata'])
) for row in rows]
CQRS: Separación de Comandos y Consultas
Read Models Optimizados
// Command Side: Write model optimizado para consistencia
interface CommandHandler<T> {
handle(command: T): Promise<Event[]>;
}
class WithdrawMoneyHandler implements CommandHandler<WithdrawMoney> {
constructor(
private eventStore: EventStore,
private accountRepo: AccountRepository
) {}
async handle(command: WithdrawMoney): Promise<Event[]> {
// Cargar aggregate desde eventos
const account = await this.accountRepo.getById(command.accountId);
// Business logic
const events = account.withdraw(command.amount);
// Persistir eventos atómicamente
await this.eventStore.appendEvents(
command.accountId,
events,
account.version
);
return events;
}
}
// Query Side: Read model desnormalizado para performance
interface AccountSummaryProjection {
accountId: string;
currentBalance: number;
lastTransactionDate: Date;
transactionCount: number;
riskScore: number; // Calculado desde histórico
}
class AccountProjectionHandler {
constructor(private readDB: ReadDatabase) {}
async on(event: WithdrawalAttempted) {
// Update read model optimizado para queries
await this.readDB.query(`
UPDATE account_summary
SET current_balance = current_balance - $1,
last_transaction_date = $2,
transaction_count = transaction_count + 1
WHERE account_id = $3
`, [event.data.amount, event.timestamp, event.aggregateId]);
// Calcular risk score en tiempo real
await this.updateRiskScore(event.aggregateId);
}
}
Saga Pattern: Orquestación de Transacciones Distribuidas
Process Manager con Compensación
// Saga para proceso de checkout completo
type OrderSaga struct {
ID string
OrderID string
State SagaState
Steps []SagaStep
Compensations []CompensationStep
}
type SagaStep struct {
Name string
Command interface{}
Completed bool
CompensateOn []string // Event types que requieren compensación
}
func (s *OrderSaga) Handle(event Event) error {
switch event.Type {
case "OrderCreated":
return s.processStep("ReserveInventory", ReserveInventoryCommand{
OrderID: event.Data["orderId"].(string),
Items: event.Data["items"],
})
case "InventoryReserved":
return s.processStep("ProcessPayment", ProcessPaymentCommand{
OrderID: event.Data["orderId"].(string),
Amount: event.Data["totalAmount"].(float64),
})
case "PaymentFailed":
return s.compensate("ReserveInventory")
case "PaymentProcessed":
return s.processStep("ShipOrder", ShipOrderCommand{
OrderID: event.Data["orderId"].(string),
})
case "OrderShipped":
s.State = SagaCompleted
return s.publishEvent("OrderCompleted", event.Data)
}
return nil
}
func (s *OrderSaga) compensate(stepName string) error {
// Ejecutar compensación en orden inverso
for i := len(s.Compensations) - 1; i >= 0; i-- {
comp := s.Compensations[i]
if comp.ForStep == stepName {
return s.executeCompensation(comp)
}
}
return nil
}
Event Bus de Alta Performance
Implementación con Kafka y Outbox Pattern
# Outbox pattern para garantizar entrega exactly-once
class OutboxEventPublisher:
def __init__(self, db_pool, kafka_producer):
self.db_pool = db_pool
self.kafka_producer = kafka_producer
self.is_publishing = False
async def publish_events_transactionally(self, events: List[Event],
transaction=None):
"""Almacena eventos en outbox dentro de la misma transacción"""
async with self.db_pool.acquire() as conn:
if transaction:
# Usar transacción existente
await self._store_in_outbox(transaction, events)
else:
# Nueva transacción
async with conn.transaction():
await self._store_in_outbox(conn, events)
async def _store_in_outbox(self, conn, events):
await conn.executemany(
"""INSERT INTO outbox_events
(event_id, aggregate_id, event_type, payload, created_at, published)
VALUES ($1, $2, $3, $4, $5, false)""",
[(e.id, e.aggregate_id, e.event_type,
json.dumps(e.data), e.timestamp) for e in events]
)
async def process_outbox(self):
"""Background process que envía eventos pendientes"""
if self.is_publishing:
return
self.is_publishing = True
try:
async with self.db_pool.acquire() as conn:
# Obtener eventos no publicados
events = await conn.fetch(
"""SELECT event_id, aggregate_id, event_type, payload
FROM outbox_events
WHERE published = false
ORDER BY created_at
LIMIT 100"""
)
for event_row in events:
# Publicar a Kafka
await self.kafka_producer.send(
topic=f"events.{event_row['event_type']}",
key=event_row['aggregate_id'],
value=event_row['payload']
)
# Marcar como publicado
await conn.execute(
"UPDATE outbox_events SET published = true WHERE event_id = $1",
event_row['event_id']
)
finally:
self.is_publishing = False
Consumer Resiliente con Dead Letter Queue
// Event consumer con retry exponencial y DLQ
class ResilientEventConsumer {
constructor(kafkaConsumer, eventHandlers, dlqProducer) {
this.consumer = kafkaConsumer;
this.handlers = eventHandlers;
this.dlq = dlqProducer;
this.retryDelays = [1000, 5000, 15000, 30000]; // ms
}
async processMessage(message) {
const event = JSON.parse(message.value);
const handler = this.handlers[event.type];
if (!handler) {
console.warn(`No handler for event type: ${event.type}`);
return;
}
let attempt = 0;
while (attempt < this.retryDelays.length) {
try {
await handler.handle(event);
return; // Success
} catch (error) {
attempt++;
if (attempt >= this.retryDelays.length) {
// Send to Dead Letter Queue
await this.dlq.send({
topic: 'events.dead-letter',
key: message.key,
value: JSON.stringify({
originalEvent: event,
error: error.message,
attempts: attempt,
timestamp: Date.now()
})
});
console.error(`Event processing failed after ${attempt} attempts:`, error);
return;
}
// Exponential backoff
await this.delay(this.retryDelays[attempt - 1]);
}
}
}
delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Monitoring y Observabilidad
Event Tracking y Métricas
// Métricas de eventos con Prometheus
use prometheus::{Counter, Histogram, Registry};
pub struct EventMetrics {
events_published: Counter,
events_processed: Counter,
processing_duration: Histogram,
errors: Counter,
}
impl EventMetrics {
pub fn new(registry: &Registry) -> Self {
let events_published = Counter::new(
"events_published_total",
"Total events published"
).expect("metric creation");
let processing_duration = Histogram::with_opts(
prometheus::HistogramOpts::new(
"event_processing_duration_seconds",
"Event processing duration"
).buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
).expect("metric creation");
registry.register(Box::new(events_published.clone())).unwrap();
registry.register(Box::new(processing_duration.clone())).unwrap();
Self {
events_published,
events_processed: Counter::new("events_processed_total", "").unwrap(),
processing_duration,
errors: Counter::new("event_processing_errors_total", "").unwrap(),
}
}
pub fn record_event_published(&self, event_type: &str) {
self.events_published.with_label_values(&[event_type]).inc();
}
pub fn record_processing_time(&self, duration: f64, event_type: &str) {
self.processing_duration
.with_label_values(&[event_type])
.observe(duration);
}
}
Patrones de Consistencia Eventual
Técnicas de Reconciliación
// Reconciliación periódica para detectar inconsistencias
class EventualConsistencyReconciler(
eventStore: EventStore,
readModels: Map[String, ReadModelRepository]
) {
def reconcile(aggregateId: String): Future[ReconciliationResult] = {
for {
// Reconstruir estado desde eventos
events <- eventStore.getEvents(aggregateId)
actualState = events.foldLeft(EmptyState)(_.apply(_))
// Comparar con read models
readModelStates <- Future.traverse(readModels) { case (name, repo) =>
repo.getState(aggregateId).map(name -> _)
}
discrepancies = readModelStates.collect {
case (name, readState) if readState != actualState =>
Discrepancy(name, readState, actualState)
}
// Reparar inconsistencias
_ <- Future.traverse(discrepancies)(repair)
} yield ReconciliationResult(discrepancies.size, actualState)
}
private def repair(discrepancy: Discrepancy): Future[Unit] = {
// Estrategias de reparación según el tipo de discrepancia
discrepancy.`type` match {
case StaleData => republishEvents(discrepancy.aggregateId)
case MissingData => rebuildReadModel(discrepancy.readModelName)
case CorruptedData => resetAndRebuild(discrepancy.readModelName)
}
}
}
Best Practices para Producción
Event Versioning y Backward Compatibility
// Schema evolution con versionado
{
"eventType": "UserRegistered",
"version": 2,
"data": {
"userId": "123",
"email": "user@example.com",
"profile": {
"name": "John Doe",
"preferences": {
"notifications": true
}
}
},
"metadata": {
"schemaVersion": "2.1.0",
"migrationRules": [
{
"fromVersion": "1.0.0",
"rule": "add_profile_field",
"defaultValue": {"name": "Unknown"}
}
]
}
}
Testing de Sistemas Event-Driven
# Test framework para sistemas event-driven
class EventDrivenTestFramework:
def __init__(self):
self.published_events = []
self.event_handlers = {}
def given_events(self, events: List[Event]):
"""Setup: eventos que ya ocurrieron"""
for event in events:
self.apply_event(event)
def when_command_executed(self, command):
"""Action: ejecutar comando"""
handler = self.get_handler(command)
new_events = handler.handle(command)
for event in new_events:
self.published_events.append(event)
self.apply_event(event)
return new_events
def then_events_published(self, expected_events):
"""Assertion: verificar eventos publicados"""
assert len(self.published_events) == len(expected_events)
for i, expected in enumerate(expected_events):
actual = self.published_events[i]
assert actual.type == expected.type
assert actual.data == expected.data
# Uso del framework
def test_account_withdrawal():
framework = EventDrivenTestFramework()
# Given: cuenta con balance inicial
framework.given_events([
Event("AccountCreated", {"accountId": "123", "initialBalance": 1000})
])
# When: comando de retiro
events = framework.when_command_executed(
WithdrawMoney(accountId="123", amount=500)
)
# Then: evento de retiro publicado
framework.then_events_published([
Event("WithdrawalProcessed", {"accountId": "123", "amount": 500})
])
Migración Gradual desde Monolitos
Estrategia de Strangler Fig Pattern
# Configuración de API Gateway para migración gradual
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: gradual-migration
spec:
http:
- match:
- uri:
prefix: "/api/users"
route:
- destination:
host: new-user-service
weight: 20 # 20% a nuevo servicio event-driven
- destination:
host: legacy-monolith
weight: 80 # 80% al monolito existente
- match:
- uri:
prefix: "/api/orders"
route:
- destination:
host: new-order-service
weight: 50 # Migración más agresiva para orders
- destination:
host: legacy-monolith
weight: 50
Los sistemas event-driven no son solo una tendencia arquitectural - son una necesidad para aplicaciones que requieren escalabilidad, auditabilidad y resiliencia. La migración debe ser gradual, con métricas claras y fallbacks sólidos.
¿Qué patrones event-driven han implementado en sus sistemas? ¿Cómo manejan la consistencia eventual en sus arquitecturas distribuidas? Compartamos experiencias sobre las complejidades y beneficios de estos patrones avanzados.
#EventDriven #Microservices #DistributedSystems Architecture #CQRS #EventSourcing #SystemDesign