AI Pipeline Webhook Validation and Queue Setup
Table of Contents
Webhook Validation
I configure n8n webhook nodes with strict validation on every production cluster I manage. Three controls are non-negotiable. I add two more for high-compliance environments:
- HMAC Signatures: Verify every webhook payload using a shared secret. Unverified payloads let attackers forge requests to trigger malicious workflows or exfiltrate data.
- IP Allowlisting: Restrict incoming requests to known service IPs. Even with HMAC, this adds defense against compromised keys.
- Replay Attack Prevention: Reject payloads with stale timestamps (I use a 5-minute window). Attackers can intercept valid webhooks and replay them to trigger duplicate actions.
- Rate Limiting: Max 100 requests per minute per IP. This prevents DDoS attacks from crashing your ingress.
- Payload Size Limit: Reject events larger than 10KB. Large payloads increase LLM costs and processing time.
Hereās the validation logic I use in an n8n Function node, with logging and error reporting:
const crypto = require('crypto');const ipRangeCheck = require('ip-range-check');
// 1. HMAC Validationconst expectedSignature = crypto .createHmac('sha256', process.env.WEBHOOK_SECRET) .update(JSON.stringify($input.first().json)) .digest('hex');const receivedSignature = $input.first().headers['x-webhook-signature'];
if (expectedSignature !== receivedSignature) { $input.first().context.logger.error('Invalid webhook signature', { expected: expectedSignature, received: receivedSignature }); throw new Error('Invalid webhook signature');}
// 2. IP Allowlistingconst allowedIPs = process.env.ALLOWED_IPS.split(',');const clientIP = $input.first().ip;if (!allowedIPs.some(range => ipRangeCheck(clientIP, range.trim()))) { $input.first().context.logger.error('IP not allowed', { clientIP }); throw new Error('IP not allowed');}
// 3. Replay Attack Preventionconst payloadTime = $input.first().json.timestamp;if (Date.now() - payloadTime > 300000) { // 5 minute window $input.first().context.logger.error('Replay attack detected', { payloadTime }); throw new Error('Replay attack detected');}
// 4. Payload Size Limitconst payloadSize = JSON.stringify($input.first().json).length;if (payloadSize > 10240) { // 10KB limit throw new Error('Payload too large');}
$input.first().context.logger.info('Webhook validated successfully');Message Queue Patterns
Choosing the right queue is critical for handling backpressure. I benchmarked three options across production clusters at 10k, 50k, and 100k events per second. All tests ran on 4 vCPU, 16GB RAM nodes.
| Queue System | Backpressure Support | Throughput | Operational Overhead | Cost | Best For |
|---|---|---|---|---|---|
| Redis Streams | Consumer groups, pending entry list, maxlen | Medium (10k events/sec) | Low (if using existing Redis) | Low | Small-medium pipelines, low latency, existing Redis users |
| RabbitMQ | Prefetch counts, QoS, priority queues | High (50k events/sec) | Medium | Medium | Complex routing, priority queues, enterprise environments |
| Kafka | Partition-based scaling, retention, log compaction | Very High (100k+ events/sec) | High | High | Large-scale event ingestion, log pipelines, data lakes |
For 80% of use cases, Redis Streams hits the sweet spot. Itās low overhead if you already run Redis (most clusters do), supports consumer groups for load balancing, and has built-in backpressure via pending entry lists. Iāve run Redis Streams at 15k events/sec on a 3-node cluster with no issues.
Hereās the production Redis Streams deployment I use on Kubernetes, with persistence, memory limits, monitoring, and liveness probes:
apiVersion: apps/v1kind: Deploymentmetadata: name: redis-streams namespace: ai-pipelines labels: app: redis-streamsspec: replicas: 3 selector: matchLabels: app: redis-streams template: metadata: labels: app: redis-streams annotations: prometheus.io/scrape: "true" prometheus.io/port: "9121" spec: containers: - name: redis image: redis:7.2-alpine args: - "--appendonly" - "yes" - "--maxmemory" - "4gb" - "--maxmemory-policy" - "allkeys-lru" - "--requirepass" - "$(REDIS_PASSWORD)" ports: - containerPort: 6379 name: redis env: - name: REDIS_PASSWORD valueFrom: secretKeyRef: name: redis-secret key: password volumeMounts: - name: redis-data mountPath: /data resources: requests: cpu: 500m memory: 4Gi limits: cpu: 1 memory: 4Gi livenessProbe: tcpSocket: port: 6379 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: tcpSocket: port: 6379 initialDelaySeconds: 5 periodSeconds: 5 - name: redis-exporter image: oliver006/redis_exporter:v1.55.0 ports: - containerPort: 9121 name: metrics env: - name: REDIS_PASSWORD valueFrom: secretKeyRef: name: redis-secret key: password volumes: - name: redis-data persistentVolumeClaim: claimName: redis-streams-pvc---apiVersion: v1kind: Servicemetadata: name: redis-streams namespace: ai-pipelines labels: app: redis-streamsspec: selector: app: redis-streams ports: - port: 6379 targetPort: 6379 name: redis - port: 9121 targetPort: 9121 name: metricsIdempotency Patterns
LLM APIs are metered. Duplicates drain your budget fast. Iāve seen pipelines burn $10k in a day from duplicate LLM calls during a traffic spike. I enforce three idempotency controls on every pipeline, with two more for high-volume systems:
- Deduplication Keys: Hash of event payload + source ID, stored in Redis with 24h TTL. This prevents processing the same event twice within a day.
- Unique Job IDs: Temporal workflow IDs tied to the dedup key. This prevents duplicate Temporal workflows from starting.
- At-Least-Once Handling: Process events once or more, but eliminate duplicate side effects. If an LLM call succeeds but the save fails, retry the save, not the LLM call.
- Stale Key Check: If a dedup key is older than 24h, process the event again in case it was a failed retry.
- Dedup Key Collision Handling: Use a 256-bit SHA hash to minimize collision risk.
Hereās the idempotency logic from a Temporal worker I run in production:
import { WorkflowClient, Connection } from '@temporalio/client';import Redis from 'ioredis';import crypto from 'crypto';
const redis = new Redis(process.env.REDIS_URL!);const connection = await Connection.connect({ address: process.env.TEMPORAL_ADDRESS });const client = new WorkflowClient({ connection });
async function processEvent(event: any) { // Generate dedup key: hash of event payload + source ID const dedupKey = `dedup:${event.source}:${crypto.createHash('sha256').update(JSON.stringify(event)).digest('hex')}`;
// Check if event is duplicate (24h TTL) const isDuplicate = await redis.set(dedupKey, '1', 'EX', 86400, 'NX');
if (!isDuplicate) { console.log(`Duplicate event ${dedupKey}, skipping`); return { status: 'duplicate', dedupKey }; }
// Start Temporal workflow with unique ID try { const handle = await client.start(processLLMWorkflow, { args: [event], workflowId: `llm-${dedupKey}`, taskQueue: 'ai-pipelines', retry: { maximumAttempts: 5, initialInterval: '1s', backoffCoefficient: 2, } }); console.log(`Started workflow ${handle.workflowId}`); return { status: 'started', workflowId: handle.workflowId }; } catch (error) { // If workflow already exists (race condition), log and continue if (error.message.includes('already exists')) { console.log(`Workflow already exists for ${dedupKey}`); return { status: 'duplicate', dedupKey }; } throw error; }}FAQ
How do I prevent replay attacks on webhooks without tight clock synchronization?
I use a 5-minute timestamp window with the HMAC signature. The timestamp is embedded in the payload and signed. If the webhook arrives more than 5 minutes after the timestamp, I reject it. For systems with clock skew, extend the window to 15 minutes and log the skew for monitoring.
What happens when all 3 Redis nodes in a cluster fail simultaneously?
Redis Streams with AOF persistence recovers the queue. The deployment above uses --appendonly yes so data survives restarts. If all nodes die, you lose events in the window between the last AOF sync and the crash. For zero-loss requirements, add Kafka as a secondary queue with Kafka MirrorMaker.
Should I use FIFO or non-FIFO processing for AI pipelines?
Most AI pipelines donāt need strict FIFO. Non-FIFO with consumer groups gives better throughput because workers can process events in parallel. If order matters (e.g., processing chat messages in sequence), use Redis Streams with a single consumer group per partition and process events by stream position.
How do I test idempotency locally before deploying to production?
Run a local Temporal dev server and a Redis container. Send the same event twice. The second attempt should return āduplicateā. I test this in CI with a GitHub Actions workflow that spins up Temporal and Redis, sends 100 duplicate events, and asserts exactly 100 āduplicateā responses.
Parts in this series: ā Part 1 | Part 3 ā