Pipeline IA: Validación Webhooks y Colas
Tabla de contenidos
Parte 2 de 3. Parte 1 | Parte 3
Validación de Webhooks
Configuro nodos de webhook en n8n con validación estricta en cada cluster de producción que gestiono. Tres controles son innegociables. Agrego dos más para ambientes de alto compliance:
- Firmas HMAC: Verifica cada payload con un secreto compartido. Los atacantes pueden falsificar webhooks para disparar workflows maliciosos o exfiltrar datos.
- Lista de IPs Permitidas: Restringe solicitudes entrantes a IPs conocidas. Incluso con HMAC, esto añade defensa contra claves comprometidas.
- Prevención de Replay: Rechaza payloads con timestamps antiguos (ventana de 5 minutos). Los atacantes pueden interceptar webhooks y reproducirlos.
- Rate Limiting: Máximo 100 solicitudes por minuto por IP. Previene ataques DDoS.
- Límite de Tamaño de Payload: Rechaza eventos mayores a 10KB. Los payloads grandes incrementan costos de LLM.
Aquí está la lógica de validación que uso en un nodo Function de n8n:
const crypto = require('crypto');const ipRangeCheck = require('ip-range-check');
// 1. Validación HMACconst expectedSignature = crypto .createHmac('sha256', process.env.WEBHOOK_SECRET) .update(JSON.stringify($input.first().json)) .digest('hex');const receivedSignature = $input.first().headers['x-webhook-signature'];
if (expectedSignature !== receivedSignature) { $input.first().context.logger.error('Invalid webhook signature', { expected: expectedSignature, received: receivedSignature }); throw new Error('Invalid webhook signature');}
// 2. Lista de IPs Permitidasconst allowedIPs = process.env.ALLOWED_IPS.split(',');const clientIP = $input.first().ip;if (!allowedIPs.some(range => ipRangeCheck(clientIP, range.trim()))) { $input.first().context.logger.error('IP not allowed', { clientIP }); throw new Error('IP not allowed');}
// 3. Prevención de Replayconst payloadTime = $input.first().json.timestamp;if (Date.now() - payloadTime > 300000) { // Ventana de 5 minutos $input.first().context.logger.error('Replay attack detected', { payloadTime }); throw new Error('Replay attack detected');}
// 4. Límite de Tamaño de Payloadconst payloadSize = JSON.stringify($input.first().json).length;if (payloadSize > 10240) { // Límite de 10KB throw new Error('Payload too large');}
$input.first().context.logger.info('Webhook validated successfully');Patrones de Colas de Mensajes
Elegir la cola correcta es clave para manejar backpressure. Comparé tres opciones en clusters de producción a 10k, 50k y 100k eventos por segundo en nodos de 4 vCPU y 16GB RAM:
| Sistema de Cola | Soporte de Backpressure | Throughput | Overhead Operacional | Costo | Mejor Para |
|---|---|---|---|---|---|
| Redis Streams | Grupos de consumidores, pending entry list, maxlen | Medio (10k eventos/s) | Bajo (si ya usas Redis) | Bajo | Pipelines pequeños-medios, baja latencia |
| RabbitMQ | Prefetch counts, QoS, colas prioritarias | Alto (50k eventos/s) | Medio | Medio | Enrutamiento complejo, colas prioritarias |
| Kafka | Escalado por particiones, retención, log compaction | Muy Alto (100k+ eventos/s) | Alto | Alto | Ingesta masiva, pipelines de logs |
Para el 80% de los casos, Redis Streams es la opción más equilibrada. Bajo overhead si ya ejecutas Redis (como la mayoría de los clusters), soporta grupos de consumidores y tiene backpressure integrada. He ejecutado Redis Streams a 15k eventos/s en 3 nodos sin problemas.
Aquí está el despliegue de Redis Streams que uso en Kubernetes:
apiVersion: apps/v1kind: Deploymentmetadata: name: redis-streams namespace: ai-pipelines labels: app: redis-streamsspec: replicas: 3 selector: matchLabels: app: redis-streams template: metadata: labels: app: redis-streams annotations: prometheus.io/scrape: "true" prometheus.io/port: "9121" spec: containers: - name: redis image: redis:7.2-alpine args: - "--appendonly" - "yes" - "--maxmemory" - "4gb" - "--maxmemory-policy" - "allkeys-lru" - "--requirepass" - "$(REDIS_PASSWORD)" ports: - containerPort: 6379 name: redis env: - name: REDIS_PASSWORD valueFrom: secretKeyRef: name: redis-secret key: password volumeMounts: - name: redis-data mountPath: /data resources: requests: cpu: 500m memory: 4Gi limits: cpu: 1 memory: 4Gi livenessProbe: tcpSocket: port: 6379 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: tcpSocket: port: 6379 initialDelaySeconds: 5 periodSeconds: 5 - name: redis-exporter image: oliver006/redis_exporter:v1.55.0 ports: - containerPort: 9121 name: metrics env: - name: REDIS_PASSWORD valueFrom: secretKeyRef: name: redis-secret key: password volumes: - name: redis-data persistentVolumeClaim: claimName: redis-streams-pvc---apiVersion: v1kind: Servicemetadata: name: redis-streams namespace: ai-pipelines labels: app: redis-streamsspec: selector: app: redis-streams ports: - port: 6379 targetPort: 6379 name: redis - port: 9121 targetPort: 9121 name: metricsPatrones de Idempotencia
Las APIs de LLM cobran por uso. Los duplicados drenan tu presupuesto rápido. He visto pipelines quemar $10k en un día por llamadas duplicadas durante un pico de tráfico. Aplico tres controles de idempotencia en cada pipeline:
- Claves de Deduplicación: Hash del payload + ID de fuente, almacenado en Redis con TTL de 24h.
- IDs de Job Únicos: IDs de workflow de Temporal vinculados a la clave de dedup.
- Manejo Al-Menos-Una-Vez: Procesa eventos una o más veces, pero elimina efectos secundarios duplicados.
- Verificación de Clave Obsoleta: Si una clave de dedup tiene más de 24h, procesa el evento de nuevo.
- Manejo de Colisiones: Usa hash SHA de 256 bits para minimizar riesgo de colisión.
Aquí está la lógica de idempotencia de un worker de Temporal en producción:
import { WorkflowClient, Connection } from '@temporalio/client';import Redis from 'ioredis';import crypto from 'crypto';
const redis = new Redis(process.env.REDIS_URL!);const connection = await Connection.connect({ address: process.env.TEMPORAL_ADDRESS });const client = new WorkflowClient({ connection });
async function processEvent(event: any) { const dedupKey = `dedup:${event.source}:${crypto.createHash('sha256').update(JSON.stringify(event)).digest('hex')}`;
const isDuplicate = await redis.set(dedupKey, '1', 'EX', 86400, 'NX');
if (!isDuplicate) { console.log(`Duplicate event ${dedupKey}, skipping`); return { status: 'duplicate', dedupKey }; }
try { const handle = await client.start(processLLMWorkflow, { args: [event], workflowId: `llm-${dedupKey}`, taskQueue: 'ai-pipelines', retry: { maximumAttempts: 5, initialInterval: '1s', backoffCoefficient: 2, } }); console.log(`Started workflow ${handle.workflowId}`); return { status: 'started', workflowId: handle.workflowId }; } catch (error) { if (error.message.includes('already exists')) { console.log(`Workflow already exists for ${dedupKey}`); return { status: 'duplicate', dedupKey }; } throw error; }}FAQ
¿Cómo evito ataques de replay sin sincronización exacta de reloj?
Uso una ventana de 5 minutos con la firma HMAC. El timestamp va en el payload y está firmado. Si el webhook llega más de 5 minutos después del timestamp, lo rechazo. Para sistemas con skew de reloj, extiende la ventana a 15 minutos.
¿Qué pasa si los 3 nodos de Redis fallan simultáneamente?
Redis Streams con AOF persistente recupera la cola. El deployment usa --appendonly yes para que los datos sobrevivan reinicios. Si todos los nodos mueren, pierdes eventos entre el último sync AOF y el crash. Para cero pérdidas, agrega Kafka como cola secundaria.
¿Debo usar procesamiento FIFO o no-FIFO?
La mayoría de los pipelines de IA no necesitan FIFO estricto. No-FIFO con grupos de consumidores da mejor throughput porque los workers procesan en paralelo. Si el orden importa (ej. mensajes de chat), usa un solo grupo de consumidores por partición.
¿Cómo pruebo idempotencia localmente antes de producción?
Ejecuta un servidor Temporal de desarrollo y un contenedor Redis. Envía el mismo evento dos veces. El segundo intento debe devolver ‘duplicate’. Lo pruebo en CI con GitHub Actions que levanta Temporal y Redis, envía 100 eventos duplicados y verifica exactamente 100 respuestas ‘duplicate’.