AI Pipeline Webhook Validation and Queue Setup

2026.03.20
Technology
764 Words
AI Pipeline Webhook Validation and Queue Setup

Part 2 of 3: Part 1 | Part 3*

Webhook Validation

I configure n8n webhook nodes with strict validation on every production cluster I manage. Three controls are non-negotiable. I add two more for high-compliance environments:

  1. HMAC Signatures: Verify every webhook payload using a shared secret. Unverified payloads let attackers forge requests to trigger malicious workflows or exfiltrate data.
  2. IP Allowlisting: Restrict incoming requests to known service IPs. Even with HMAC, this adds defense against compromised keys.
  3. Replay Attack Prevention: Reject payloads with stale timestamps (I use a 5-minute window). Attackers can intercept valid webhooks and replay them to trigger duplicate actions.
  4. Rate Limiting: Max 100 requests per minute per IP. This prevents DDoS attacks from crashing your ingress.
  5. Payload Size Limit: Reject events larger than 10KB. Large payloads increase LLM costs and processing time.

Here’s the validation logic I use in an n8n Function node, with logging and error reporting:

const crypto = require('crypto');
const ipRangeCheck = require('ip-range-check');
// 1. HMAC Validation
const expectedSignature = crypto
.createHmac('sha256', process.env.WEBHOOK_SECRET)
.update(JSON.stringify($input.first().json))
.digest('hex');
const receivedSignature = $input.first().headers['x-webhook-signature'];
if (expectedSignature !== receivedSignature) {
$input.first().context.logger.error('Invalid webhook signature', {
expected: expectedSignature,
received: receivedSignature
});
throw new Error('Invalid webhook signature');
}
// 2. IP Allowlisting
const allowedIPs = process.env.ALLOWED_IPS.split(',');
const clientIP = $input.first().ip;
if (!allowedIPs.some(range => ipRangeCheck(clientIP, range.trim()))) {
$input.first().context.logger.error('IP not allowed', { clientIP });
throw new Error('IP not allowed');
}
// 3. Replay Attack Prevention
const payloadTime = $input.first().json.timestamp;
if (Date.now() - payloadTime > 300000) { // 5 minute window
$input.first().context.logger.error('Replay attack detected', { payloadTime });
throw new Error('Replay attack detected');
}
// 4. Payload Size Limit
const payloadSize = JSON.stringify($input.first().json).length;
if (payloadSize > 10240) { // 10KB limit
throw new Error('Payload too large');
}
$input.first().context.logger.info('Webhook validated successfully');

Message Queue Patterns

Choosing the right queue is critical for handling backpressure. I benchmarked three options across production clusters at 10k, 50k, and 100k events per second. All tests ran on 4 vCPU, 16GB RAM nodes.

Queue SystemBackpressure SupportThroughputOperational OverheadCostBest For
Redis StreamsConsumer groups, pending entry list, maxlenMedium (10k events/sec)Low (if using existing Redis)LowSmall-medium pipelines, low latency, existing Redis users
RabbitMQPrefetch counts, QoS, priority queuesHigh (50k events/sec)MediumMediumComplex routing, priority queues, enterprise environments
KafkaPartition-based scaling, retention, log compactionVery High (100k+ events/sec)HighHighLarge-scale event ingestion, log pipelines, data lakes

For 80% of use cases, Redis Streams hits the sweet spot. It’s low overhead if you already run Redis (most clusters do), supports consumer groups for load balancing, and has built-in backpressure via pending entry lists. I’ve run Redis Streams at 15k events/sec on a 3-node cluster with no issues.

Here’s the production Redis Streams deployment I use on Kubernetes, with persistence, memory limits, monitoring, and liveness probes:

redis-streams-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis-streams
namespace: ai-pipelines
labels:
app: redis-streams
spec:
replicas: 3
selector:
matchLabels:
app: redis-streams
template:
metadata:
labels:
app: redis-streams
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9121"
spec:
containers:
- name: redis
image: redis:7.2-alpine
args:
- "--appendonly"
- "yes"
- "--maxmemory"
- "4gb"
- "--maxmemory-policy"
- "allkeys-lru"
- "--requirepass"
- "$(REDIS_PASSWORD)"
ports:
- containerPort: 6379
name: redis
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
volumeMounts:
- name: redis-data
mountPath: /data
resources:
requests:
cpu: 500m
memory: 4Gi
limits:
cpu: 1
memory: 4Gi
livenessProbe:
tcpSocket:
port: 6379
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
tcpSocket:
port: 6379
initialDelaySeconds: 5
periodSeconds: 5
- name: redis-exporter
image: oliver006/redis_exporter:v1.55.0
ports:
- containerPort: 9121
name: metrics
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
volumes:
- name: redis-data
persistentVolumeClaim:
claimName: redis-streams-pvc
---
apiVersion: v1
kind: Service
metadata:
name: redis-streams
namespace: ai-pipelines
labels:
app: redis-streams
spec:
selector:
app: redis-streams
ports:
- port: 6379
targetPort: 6379
name: redis
- port: 9121
targetPort: 9121
name: metrics

Idempotency Patterns

LLM APIs are metered. Duplicates drain your budget fast. I’ve seen pipelines burn $10k in a day from duplicate LLM calls during a traffic spike. I enforce three idempotency controls on every pipeline, with two more for high-volume systems:

  1. Deduplication Keys: Hash of event payload + source ID, stored in Redis with 24h TTL. This prevents processing the same event twice within a day.
  2. Unique Job IDs: Temporal workflow IDs tied to the dedup key. This prevents duplicate Temporal workflows from starting.
  3. At-Least-Once Handling: Process events once or more, but eliminate duplicate side effects. If an LLM call succeeds but the save fails, retry the save, not the LLM call.
  4. Stale Key Check: If a dedup key is older than 24h, process the event again in case it was a failed retry.
  5. Dedup Key Collision Handling: Use a 256-bit SHA hash to minimize collision risk.

Here’s the idempotency logic from a Temporal worker I run in production:

temporal-worker/idempotency.ts
import { WorkflowClient, Connection } from '@temporalio/client';
import Redis from 'ioredis';
import crypto from 'crypto';
const redis = new Redis(process.env.REDIS_URL!);
const connection = await Connection.connect({ address: process.env.TEMPORAL_ADDRESS });
const client = new WorkflowClient({ connection });
async function processEvent(event: any) {
// Generate dedup key: hash of event payload + source ID
const dedupKey = `dedup:${event.source}:${crypto.createHash('sha256').update(JSON.stringify(event)).digest('hex')}`;
// Check if event is duplicate (24h TTL)
const isDuplicate = await redis.set(dedupKey, '1', 'EX', 86400, 'NX');
if (!isDuplicate) {
console.log(`Duplicate event ${dedupKey}, skipping`);
return { status: 'duplicate', dedupKey };
}
// Start Temporal workflow with unique ID
try {
const handle = await client.start(processLLMWorkflow, {
args: [event],
workflowId: `llm-${dedupKey}`,
taskQueue: 'ai-pipelines',
retry: {
maximumAttempts: 5,
initialInterval: '1s',
backoffCoefficient: 2,
}
});
console.log(`Started workflow ${handle.workflowId}`);
return { status: 'started', workflowId: handle.workflowId };
} catch (error) {
// If workflow already exists (race condition), log and continue
if (error.message.includes('already exists')) {
console.log(`Workflow already exists for ${dedupKey}`);
return { status: 'duplicate', dedupKey };
}
throw error;
}
}

FAQ

How do I prevent replay attacks on webhooks without tight clock synchronization?

I use a 5-minute timestamp window with the HMAC signature. The timestamp is embedded in the payload and signed. If the webhook arrives more than 5 minutes after the timestamp, I reject it. For systems with clock skew, extend the window to 15 minutes and log the skew for monitoring.

What happens when all 3 Redis nodes in a cluster fail simultaneously?

Redis Streams with AOF persistence recovers the queue. The deployment above uses --appendonly yes so data survives restarts. If all nodes die, you lose events in the window between the last AOF sync and the crash. For zero-loss requirements, add Kafka as a secondary queue with Kafka MirrorMaker.

Should I use FIFO or non-FIFO processing for AI pipelines?

Most AI pipelines don’t need strict FIFO. Non-FIFO with consumer groups gives better throughput because workers can process events in parallel. If order matters (e.g., processing chat messages in sequence), use Redis Streams with a single consumer group per partition and process events by stream position.

How do I test idempotency locally before deploying to production?

Run a local Temporal dev server and a Redis container. Send the same event twice. The second attempt should return ā€˜duplicate’. I test this in CI with a GitHub Actions workflow that spins up Temporal and Redis, sends 100 duplicate events, and asserts exactly 100 ā€˜duplicate’ responses.


Parts in this series: ← Part 1 | Part 3 →

# ai-automation # event-driven-architecture # Kubernetes # Temporal # redis-streams # N8N