Pipeline IA: Validación Webhooks y Colas

2026.03.20
Technology
670 Words
Pipeline IA: Validación Webhooks y Colas

Parte 2 de 3. Parte 1 | Parte 3

Validación de Webhooks

Configuro nodos de webhook en n8n con validación estricta en cada cluster de producción que gestiono. Tres controles son innegociables. Agrego dos más para ambientes de alto compliance:

  1. Firmas HMAC: Verifica cada payload con un secreto compartido. Los atacantes pueden falsificar webhooks para disparar workflows maliciosos o exfiltrar datos.
  2. Lista de IPs Permitidas: Restringe solicitudes entrantes a IPs conocidas. Incluso con HMAC, esto añade defensa contra claves comprometidas.
  3. Prevención de Replay: Rechaza payloads con timestamps antiguos (ventana de 5 minutos). Los atacantes pueden interceptar webhooks y reproducirlos.
  4. Rate Limiting: Máximo 100 solicitudes por minuto por IP. Previene ataques DDoS.
  5. Límite de Tamaño de Payload: Rechaza eventos mayores a 10KB. Los payloads grandes incrementan costos de LLM.

Aquí está la lógica de validación que uso en un nodo Function de n8n:

const crypto = require('crypto');
const ipRangeCheck = require('ip-range-check');
// 1. Validación HMAC
const expectedSignature = crypto
.createHmac('sha256', process.env.WEBHOOK_SECRET)
.update(JSON.stringify($input.first().json))
.digest('hex');
const receivedSignature = $input.first().headers['x-webhook-signature'];
if (expectedSignature !== receivedSignature) {
$input.first().context.logger.error('Invalid webhook signature', {
expected: expectedSignature,
received: receivedSignature
});
throw new Error('Invalid webhook signature');
}
// 2. Lista de IPs Permitidas
const allowedIPs = process.env.ALLOWED_IPS.split(',');
const clientIP = $input.first().ip;
if (!allowedIPs.some(range => ipRangeCheck(clientIP, range.trim()))) {
$input.first().context.logger.error('IP not allowed', { clientIP });
throw new Error('IP not allowed');
}
// 3. Prevención de Replay
const payloadTime = $input.first().json.timestamp;
if (Date.now() - payloadTime > 300000) { // Ventana de 5 minutos
$input.first().context.logger.error('Replay attack detected', { payloadTime });
throw new Error('Replay attack detected');
}
// 4. Límite de Tamaño de Payload
const payloadSize = JSON.stringify($input.first().json).length;
if (payloadSize > 10240) { // Límite de 10KB
throw new Error('Payload too large');
}
$input.first().context.logger.info('Webhook validated successfully');

Patrones de Colas de Mensajes

Elegir la cola correcta es clave para manejar backpressure. Comparé tres opciones en clusters de producción a 10k, 50k y 100k eventos por segundo en nodos de 4 vCPU y 16GB RAM:

Sistema de ColaSoporte de BackpressureThroughputOverhead OperacionalCostoMejor Para
Redis StreamsGrupos de consumidores, pending entry list, maxlenMedio (10k eventos/s)Bajo (si ya usas Redis)BajoPipelines pequeños-medios, baja latencia
RabbitMQPrefetch counts, QoS, colas prioritariasAlto (50k eventos/s)MedioMedioEnrutamiento complejo, colas prioritarias
KafkaEscalado por particiones, retención, log compactionMuy Alto (100k+ eventos/s)AltoAltoIngesta masiva, pipelines de logs

Para el 80% de los casos, Redis Streams es la opción más equilibrada. Bajo overhead si ya ejecutas Redis (como la mayoría de los clusters), soporta grupos de consumidores y tiene backpressure integrada. He ejecutado Redis Streams a 15k eventos/s en 3 nodos sin problemas.

Aquí está el despliegue de Redis Streams que uso en Kubernetes:

redis-streams-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis-streams
namespace: ai-pipelines
labels:
app: redis-streams
spec:
replicas: 3
selector:
matchLabels:
app: redis-streams
template:
metadata:
labels:
app: redis-streams
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9121"
spec:
containers:
- name: redis
image: redis:7.2-alpine
args:
- "--appendonly"
- "yes"
- "--maxmemory"
- "4gb"
- "--maxmemory-policy"
- "allkeys-lru"
- "--requirepass"
- "$(REDIS_PASSWORD)"
ports:
- containerPort: 6379
name: redis
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
volumeMounts:
- name: redis-data
mountPath: /data
resources:
requests:
cpu: 500m
memory: 4Gi
limits:
cpu: 1
memory: 4Gi
livenessProbe:
tcpSocket:
port: 6379
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
tcpSocket:
port: 6379
initialDelaySeconds: 5
periodSeconds: 5
- name: redis-exporter
image: oliver006/redis_exporter:v1.55.0
ports:
- containerPort: 9121
name: metrics
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
volumes:
- name: redis-data
persistentVolumeClaim:
claimName: redis-streams-pvc
---
apiVersion: v1
kind: Service
metadata:
name: redis-streams
namespace: ai-pipelines
labels:
app: redis-streams
spec:
selector:
app: redis-streams
ports:
- port: 6379
targetPort: 6379
name: redis
- port: 9121
targetPort: 9121
name: metrics

Patrones de Idempotencia

Las APIs de LLM cobran por uso. Los duplicados drenan tu presupuesto rápido. He visto pipelines quemar $10k en un día por llamadas duplicadas durante un pico de tráfico. Aplico tres controles de idempotencia en cada pipeline:

  1. Claves de Deduplicación: Hash del payload + ID de fuente, almacenado en Redis con TTL de 24h.
  2. IDs de Job Únicos: IDs de workflow de Temporal vinculados a la clave de dedup.
  3. Manejo Al-Menos-Una-Vez: Procesa eventos una o más veces, pero elimina efectos secundarios duplicados.
  4. Verificación de Clave Obsoleta: Si una clave de dedup tiene más de 24h, procesa el evento de nuevo.
  5. Manejo de Colisiones: Usa hash SHA de 256 bits para minimizar riesgo de colisión.

Aquí está la lógica de idempotencia de un worker de Temporal en producción:

temporal-worker/idempotency.ts
import { WorkflowClient, Connection } from '@temporalio/client';
import Redis from 'ioredis';
import crypto from 'crypto';
const redis = new Redis(process.env.REDIS_URL!);
const connection = await Connection.connect({ address: process.env.TEMPORAL_ADDRESS });
const client = new WorkflowClient({ connection });
async function processEvent(event: any) {
const dedupKey = `dedup:${event.source}:${crypto.createHash('sha256').update(JSON.stringify(event)).digest('hex')}`;
const isDuplicate = await redis.set(dedupKey, '1', 'EX', 86400, 'NX');
if (!isDuplicate) {
console.log(`Duplicate event ${dedupKey}, skipping`);
return { status: 'duplicate', dedupKey };
}
try {
const handle = await client.start(processLLMWorkflow, {
args: [event],
workflowId: `llm-${dedupKey}`,
taskQueue: 'ai-pipelines',
retry: {
maximumAttempts: 5,
initialInterval: '1s',
backoffCoefficient: 2,
}
});
console.log(`Started workflow ${handle.workflowId}`);
return { status: 'started', workflowId: handle.workflowId };
} catch (error) {
if (error.message.includes('already exists')) {
console.log(`Workflow already exists for ${dedupKey}`);
return { status: 'duplicate', dedupKey };
}
throw error;
}
}

FAQ

¿Cómo evito ataques de replay sin sincronización exacta de reloj?

Uso una ventana de 5 minutos con la firma HMAC. El timestamp va en el payload y está firmado. Si el webhook llega más de 5 minutos después del timestamp, lo rechazo. Para sistemas con skew de reloj, extiende la ventana a 15 minutos.

¿Qué pasa si los 3 nodos de Redis fallan simultáneamente?

Redis Streams con AOF persistente recupera la cola. El deployment usa --appendonly yes para que los datos sobrevivan reinicios. Si todos los nodos mueren, pierdes eventos entre el último sync AOF y el crash. Para cero pérdidas, agrega Kafka como cola secundaria.

¿Debo usar procesamiento FIFO o no-FIFO?

La mayoría de los pipelines de IA no necesitan FIFO estricto. No-FIFO con grupos de consumidores da mejor throughput porque los workers procesan en paralelo. Si el orden importa (ej. mensajes de chat), usa un solo grupo de consumidores por partición.

¿Cómo pruebo idempotencia localmente antes de producción?

Ejecuta un servidor Temporal de desarrollo y un contenedor Redis. Envía el mismo evento dos veces. El segundo intento debe devolver ‘duplicate’. Lo pruebo en CI con GitHub Actions que levanta Temporal y Redis, envía 100 eventos duplicados y verifica exactamente 100 respuestas ‘duplicate’.


Partes in this series: ← Parte 1 | Parte 3 →

# automatizacion-ia # arquitectura-eventos # Kubernetes # Temporal # redis-streams # N8N