🔧 Arquitecturas Event-Driven: Patrones Avanzados para Sistemas Resilientes

Los sistemas modernos demandan más que simples APIs REST. Exploremos patrones event-driven que transforman aplicaciones monolíticas en sistemas distribuidos resilientes y escalables.

:bullseye: Event Sourcing: Más Allá del CRUD Tradicional

El Problema con Estado Mutable

Los sistemas CRUD tradicionales almacenan solo el estado actual, perdiendo el contexto histórico y dificultando auditorías, debugging y análisis temporal.

Event Sourcing en Acción

// ❌ Enfoque tradicional: Estado mutable
class UserAccount {
  constructor(id, balance) {
    this.id = id;
    this.balance = balance; // Solo estado actual
  }
  
  withdraw(amount) {
    this.balance -= amount; // Historia perdida
    return this.balance;
  }
}

// ✅ Event Sourcing: Inmutable por diseño
class UserAccountES {
  constructor(id) {
    this.id = id;
    this.events = [];
    this.version = 0;
  }
  
  // Commands generan events
  withdraw(amount, timestamp = Date.now()) {
    const event = {
      id: crypto.randomUUID(),
      type: 'WithdrawalAttempted',
      aggregateId: this.id,
      version: this.version + 1,
      timestamp,
      data: { amount }
    };
    
    this.applyEvent(event);
    return event;
  }
  
  // State reconstruction desde events
  applyEvent(event) {
    this.events.push(event);
    this.version = event.version;
    
    switch(event.type) {
      case 'WithdrawalAttempted':
        // Lógica de aplicación del evento
        break;
    }
  }
  
  // Snapshot del estado actual
  getCurrentBalance() {
    return this.events
      .filter(e => e.type === 'DepositMade')
      .reduce((sum, e) => sum + e.data.amount, 0) -
      this.events
      .filter(e => e.type === 'WithdrawalAttempted') 
      .reduce((sum, e) => sum + e.data.amount, 0);
  }
}

Event Store Optimizado

# Implementación de Event Store con PostgreSQL
import asyncpg
import json
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class Event:
    aggregate_id: str
    event_type: str
    version: int
    timestamp: float
    data: dict
    metadata: dict = None

class EventStore:
    def __init__(self, connection_pool):
        self.pool = connection_pool
    
    async def append_events(self, aggregate_id: str, events: List[Event], 
                          expected_version: Optional[int] = None):
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                # Optimistic concurrency control
                if expected_version is not None:
                    current_version = await conn.fetchval(
                        "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1",
                        aggregate_id
                    )
                    if current_version != expected_version:
                        raise ConcurrencyError(f"Expected version {expected_version}, got {current_version}")
                
                # Batch insert para performance
                await conn.executemany(
                    """INSERT INTO events (aggregate_id, event_type, version, timestamp, data, metadata)
                       VALUES ($1, $2, $3, $4, $5, $6)""",
                    [(e.aggregate_id, e.event_type, e.version, e.timestamp, 
                      json.dumps(e.data), json.dumps(e.metadata or {})) for e in events]
                )
    
    async def get_events(self, aggregate_id: str, from_version: int = 0) -> List[Event]:
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """SELECT event_type, version, timestamp, data, metadata 
                   FROM events 
                   WHERE aggregate_id = $1 AND version > $2 
                   ORDER BY version""",
                aggregate_id, from_version
            )
            
            return [Event(
                aggregate_id=aggregate_id,
                event_type=row['event_type'],
                version=row['version'],
                timestamp=row['timestamp'],
                data=json.loads(row['data']),
                metadata=json.loads(row['metadata'])
            ) for row in rows]

:rocket: CQRS: Separación de Comandos y Consultas

Read Models Optimizados

// Command Side: Write model optimizado para consistencia
interface CommandHandler<T> {
  handle(command: T): Promise<Event[]>;
}

class WithdrawMoneyHandler implements CommandHandler<WithdrawMoney> {
  constructor(
    private eventStore: EventStore,
    private accountRepo: AccountRepository
  ) {}
  
  async handle(command: WithdrawMoney): Promise<Event[]> {
    // Cargar aggregate desde eventos
    const account = await this.accountRepo.getById(command.accountId);
    
    // Business logic
    const events = account.withdraw(command.amount);
    
    // Persistir eventos atómicamente
    await this.eventStore.appendEvents(
      command.accountId, 
      events,
      account.version
    );
    
    return events;
  }
}

// Query Side: Read model desnormalizado para performance
interface AccountSummaryProjection {
  accountId: string;
  currentBalance: number;
  lastTransactionDate: Date;
  transactionCount: number;
  riskScore: number; // Calculado desde histórico
}

class AccountProjectionHandler {
  constructor(private readDB: ReadDatabase) {}
  
  async on(event: WithdrawalAttempted) {
    // Update read model optimizado para queries
    await this.readDB.query(`
      UPDATE account_summary 
      SET current_balance = current_balance - $1,
          last_transaction_date = $2,
          transaction_count = transaction_count + 1
      WHERE account_id = $3
    `, [event.data.amount, event.timestamp, event.aggregateId]);
    
    // Calcular risk score en tiempo real
    await this.updateRiskScore(event.aggregateId);
  }
}

:satellite_antenna: Saga Pattern: Orquestación de Transacciones Distribuidas

Process Manager con Compensación

// Saga para proceso de checkout completo
type OrderSaga struct {
    ID          string
    OrderID     string
    State       SagaState
    Steps       []SagaStep
    Compensations []CompensationStep
}

type SagaStep struct {
    Name        string
    Command     interface{}
    Completed   bool
    CompensateOn []string // Event types que requieren compensación
}

func (s *OrderSaga) Handle(event Event) error {
    switch event.Type {
    case "OrderCreated":
        return s.processStep("ReserveInventory", ReserveInventoryCommand{
            OrderID: event.Data["orderId"].(string),
            Items:   event.Data["items"],
        })
        
    case "InventoryReserved":
        return s.processStep("ProcessPayment", ProcessPaymentCommand{
            OrderID: event.Data["orderId"].(string),
            Amount:  event.Data["totalAmount"].(float64),
        })
        
    case "PaymentFailed":
        return s.compensate("ReserveInventory")
        
    case "PaymentProcessed":
        return s.processStep("ShipOrder", ShipOrderCommand{
            OrderID: event.Data["orderId"].(string),
        })
        
    case "OrderShipped":
        s.State = SagaCompleted
        return s.publishEvent("OrderCompleted", event.Data)
    }
    
    return nil
}

func (s *OrderSaga) compensate(stepName string) error {
    // Ejecutar compensación en orden inverso
    for i := len(s.Compensations) - 1; i >= 0; i-- {
        comp := s.Compensations[i]
        if comp.ForStep == stepName {
            return s.executeCompensation(comp)
        }
    }
    return nil
}

:counterclockwise_arrows_button: Event Bus de Alta Performance

Implementación con Kafka y Outbox Pattern

# Outbox pattern para garantizar entrega exactly-once
class OutboxEventPublisher:
    def __init__(self, db_pool, kafka_producer):
        self.db_pool = db_pool
        self.kafka_producer = kafka_producer
        self.is_publishing = False
    
    async def publish_events_transactionally(self, events: List[Event], 
                                           transaction=None):
        """Almacena eventos en outbox dentro de la misma transacción"""
        async with self.db_pool.acquire() as conn:
            if transaction:
                # Usar transacción existente
                await self._store_in_outbox(transaction, events)
            else:
                # Nueva transacción
                async with conn.transaction():
                    await self._store_in_outbox(conn, events)
    
    async def _store_in_outbox(self, conn, events):
        await conn.executemany(
            """INSERT INTO outbox_events 
               (event_id, aggregate_id, event_type, payload, created_at, published)
               VALUES ($1, $2, $3, $4, $5, false)""",
            [(e.id, e.aggregate_id, e.event_type, 
              json.dumps(e.data), e.timestamp) for e in events]
        )
    
    async def process_outbox(self):
        """Background process que envía eventos pendientes"""
        if self.is_publishing:
            return
            
        self.is_publishing = True
        try:
            async with self.db_pool.acquire() as conn:
                # Obtener eventos no publicados
                events = await conn.fetch(
                    """SELECT event_id, aggregate_id, event_type, payload 
                       FROM outbox_events 
                       WHERE published = false 
                       ORDER BY created_at 
                       LIMIT 100"""
                )
                
                for event_row in events:
                    # Publicar a Kafka
                    await self.kafka_producer.send(
                        topic=f"events.{event_row['event_type']}",
                        key=event_row['aggregate_id'],
                        value=event_row['payload']
                    )
                    
                    # Marcar como publicado
                    await conn.execute(
                        "UPDATE outbox_events SET published = true WHERE event_id = $1",
                        event_row['event_id']
                    )
        finally:
            self.is_publishing = False

Consumer Resiliente con Dead Letter Queue

// Event consumer con retry exponencial y DLQ
class ResilientEventConsumer {
  constructor(kafkaConsumer, eventHandlers, dlqProducer) {
    this.consumer = kafkaConsumer;
    this.handlers = eventHandlers;
    this.dlq = dlqProducer;
    this.retryDelays = [1000, 5000, 15000, 30000]; // ms
  }
  
  async processMessage(message) {
    const event = JSON.parse(message.value);
    const handler = this.handlers[event.type];
    
    if (!handler) {
      console.warn(`No handler for event type: ${event.type}`);
      return;
    }
    
    let attempt = 0;
    while (attempt < this.retryDelays.length) {
      try {
        await handler.handle(event);
        return; // Success
      } catch (error) {
        attempt++;
        
        if (attempt >= this.retryDelays.length) {
          // Send to Dead Letter Queue
          await this.dlq.send({
            topic: 'events.dead-letter',
            key: message.key,
            value: JSON.stringify({
              originalEvent: event,
              error: error.message,
              attempts: attempt,
              timestamp: Date.now()
            })
          });
          
          console.error(`Event processing failed after ${attempt} attempts:`, error);
          return;
        }
        
        // Exponential backoff
        await this.delay(this.retryDelays[attempt - 1]);
      }
    }
  }
  
  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

:bar_chart: Monitoring y Observabilidad

Event Tracking y Métricas

// Métricas de eventos con Prometheus
use prometheus::{Counter, Histogram, Registry};

pub struct EventMetrics {
    events_published: Counter,
    events_processed: Counter,
    processing_duration: Histogram,
    errors: Counter,
}

impl EventMetrics {
    pub fn new(registry: &Registry) -> Self {
        let events_published = Counter::new(
            "events_published_total", 
            "Total events published"
        ).expect("metric creation");
        
        let processing_duration = Histogram::with_opts(
            prometheus::HistogramOpts::new(
                "event_processing_duration_seconds",
                "Event processing duration"
            ).buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
        ).expect("metric creation");
        
        registry.register(Box::new(events_published.clone())).unwrap();
        registry.register(Box::new(processing_duration.clone())).unwrap();
        
        Self {
            events_published,
            events_processed: Counter::new("events_processed_total", "").unwrap(),
            processing_duration,
            errors: Counter::new("event_processing_errors_total", "").unwrap(),
        }
    }
    
    pub fn record_event_published(&self, event_type: &str) {
        self.events_published.with_label_values(&[event_type]).inc();
    }
    
    pub fn record_processing_time(&self, duration: f64, event_type: &str) {
        self.processing_duration
            .with_label_values(&[event_type])
            .observe(duration);
    }
}

:bullseye: Patrones de Consistencia Eventual

Técnicas de Reconciliación

// Reconciliación periódica para detectar inconsistencias
class EventualConsistencyReconciler(
  eventStore: EventStore,
  readModels: Map[String, ReadModelRepository]
) {
  
  def reconcile(aggregateId: String): Future[ReconciliationResult] = {
    for {
      // Reconstruir estado desde eventos
      events <- eventStore.getEvents(aggregateId)
      actualState = events.foldLeft(EmptyState)(_.apply(_))
      
      // Comparar con read models
      readModelStates <- Future.traverse(readModels) { case (name, repo) =>
        repo.getState(aggregateId).map(name -> _)
      }
      
      discrepancies = readModelStates.collect {
        case (name, readState) if readState != actualState =>
          Discrepancy(name, readState, actualState)
      }
      
      // Reparar inconsistencias
      _ <- Future.traverse(discrepancies)(repair)
      
    } yield ReconciliationResult(discrepancies.size, actualState)
  }
  
  private def repair(discrepancy: Discrepancy): Future[Unit] = {
    // Estrategias de reparación según el tipo de discrepancia
    discrepancy.`type` match {
      case StaleData => republishEvents(discrepancy.aggregateId)
      case MissingData => rebuildReadModel(discrepancy.readModelName)
      case CorruptedData => resetAndRebuild(discrepancy.readModelName)
    }
  }
}

:light_bulb: Best Practices para Producción

Event Versioning y Backward Compatibility

// Schema evolution con versionado
{
  "eventType": "UserRegistered",
  "version": 2,
  "data": {
    "userId": "123",
    "email": "user@example.com",
    "profile": {
      "name": "John Doe",
      "preferences": {
        "notifications": true
      }
    }
  },
  "metadata": {
    "schemaVersion": "2.1.0",
    "migrationRules": [
      {
        "fromVersion": "1.0.0",
        "rule": "add_profile_field",
        "defaultValue": {"name": "Unknown"}
      }
    ]
  }
}

Testing de Sistemas Event-Driven

# Test framework para sistemas event-driven
class EventDrivenTestFramework:
    def __init__(self):
        self.published_events = []
        self.event_handlers = {}
    
    def given_events(self, events: List[Event]):
        """Setup: eventos que ya ocurrieron"""
        for event in events:
            self.apply_event(event)
    
    def when_command_executed(self, command):
        """Action: ejecutar comando"""
        handler = self.get_handler(command)
        new_events = handler.handle(command)
        
        for event in new_events:
            self.published_events.append(event)
            self.apply_event(event)
        
        return new_events
    
    def then_events_published(self, expected_events):
        """Assertion: verificar eventos publicados"""
        assert len(self.published_events) == len(expected_events)
        
        for i, expected in enumerate(expected_events):
            actual = self.published_events[i]
            assert actual.type == expected.type
            assert actual.data == expected.data

# Uso del framework
def test_account_withdrawal():
    framework = EventDrivenTestFramework()
    
    # Given: cuenta con balance inicial
    framework.given_events([
        Event("AccountCreated", {"accountId": "123", "initialBalance": 1000})
    ])
    
    # When: comando de retiro
    events = framework.when_command_executed(
        WithdrawMoney(accountId="123", amount=500)
    )
    
    # Then: evento de retiro publicado
    framework.then_events_published([
        Event("WithdrawalProcessed", {"accountId": "123", "amount": 500})
    ])

:rocket: Migración Gradual desde Monolitos

Estrategia de Strangler Fig Pattern

# Configuración de API Gateway para migración gradual
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: gradual-migration
spec:
  http:
  - match:
    - uri:
        prefix: "/api/users"
    route:
    - destination:
        host: new-user-service
      weight: 20  # 20% a nuevo servicio event-driven
    - destination:
        host: legacy-monolith
      weight: 80  # 80% al monolito existente
  - match:
    - uri:
        prefix: "/api/orders"
    route:
    - destination:
        host: new-order-service
      weight: 50  # Migración más agresiva para orders
    - destination:
        host: legacy-monolith
      weight: 50

Los sistemas event-driven no son solo una tendencia arquitectural - son una necesidad para aplicaciones que requieren escalabilidad, auditabilidad y resiliencia. La migración debe ser gradual, con métricas claras y fallbacks sólidos.

¿Qué patrones event-driven han implementado en sus sistemas? ¿Cómo manejan la consistencia eventual en sus arquitecturas distribuidas? Compartamos experiencias sobre las complejidades y beneficios de estos patrones avanzados.

#EventDriven #Microservices #DistributedSystems Architecture #CQRS #EventSourcing #SystemDesign