Temporal IA: Servidor Workers y Configuración

2026.03.11
Technology
741 Words
Temporal IA: Servidor Workers y Configuración

Parte 2 de 3. Lee la Parte 1 para conocer la arquitectura y prerrequisitos de Temporal, luego la Parte 3 para patrones de producción y CI/CD.

Repaso rápido desde la Parte 1: Temporal ejecuta cuatro servicios internos: Frontend, History, Matching, Worker, coordinados mediante colas de tareas. Los workers ejecutan activities y reportan resultados. Temporal persiste el estado en PostgreSQL e indexa el historial en Elasticsearch. He desplegado este stack en clusters de producción y la arquitectura aquí sobrevivió pruebas de carga reales. Visita Conoce al Ingeniero.

Paso 1: Crear Secrets y ConfigMaps

Antes de que cualquier componente de Temporal toque tu cluster, crea los recursos de Kubernetes para secrets y configuración. Los Secrets guardan credenciales de PostgreSQL, tu API key de OpenAI y una clave de encriptación para datos en reposo. Los ConfigMaps manejan visibilidad avanzada on-prem (Elasticsearch), retención de 30 días y tuning de concurrencia y persistencia. Consulta la documentación de Kubernetes para mejores prácticas de gestión de secrets.

Aplica estos recursos primero. Cada componente downstream (base de datos, servidor y worker) los necesita al arrancar.

temporal-secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: temporal-secrets
namespace: temporal
type: Opaque
stringData:
postgres-user: temporal
postgres-password: <your-secure-password>
openai-api-key: <your-openai-key>
temporal-encryption-key: <32-byte-base64-key>
---
# temporal-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: temporal-dynamic-config
namespace: temporal
data:
dynamic_config.yaml: |
system.advancedVisibilityWritingMode: "on-prem"
history.retentionInDays: 30
frontend.maxConcurrentLongRequests: 1000
worker.persistanceRateLimit: 1000
history.defaultCacheSize: 2048
matching.longPollExpirationInterval: "1m"
Terminal window
kubectl apply -f temporal-secrets.yaml
kubectl apply -f temporal-config.yaml

Paso 2: Desplegar Dependencias

Crea un namespace dedicado, luego despliega PostgreSQL para persistencia y Elasticsearch para visibilidad.

Namespace

Crea el namespace temporal con su label correspondiente. Todos los recursos de Temporal viven aquí para límites organizacionales claros y políticas más simples.

apiVersion: v1
kind: Namespace
metadata:
name: temporal
labels:
name: temporal

Deployment de PostgreSQL

PostgreSQL impulsa la persistencia de Temporal: estado, colas, historial, timers. Cada ejecución termina aquí. Para producción, configura un PVC con 50Gi. Consulta la documentación de PostgreSQL para ajustes. Establece requests en 500m CPU y 1Gi memoria, límites en 1 CPU y 2Gi. El contenedor ejecuta como usuario 999 y usa pg_isready para health checks.

apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: temporal
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
securityContext:
runAsUser: 999
runAsGroup: 999
containers:
- name: postgres
image: postgres:16-alpine
env:
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: temporal-secrets
key: postgres-user
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: temporal-secrets
key: postgres-password
- name: POSTGRES_DB
value: temporal
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
ports:
- containerPort: 5432
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1
memory: 2Gi
livenessProbe:
exec:
command: ["pg_isready", "-U", "temporal"]
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command: ["pg_isready", "-U", "temporal"]
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: postgres-data
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: temporal
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 50Gi
storageClassName: standard
---
apiVersion: v1
kind: Service
metadata:
name: postgres
namespace: temporal
spec:
selector:
app: postgres
ports:
- port: 5432
targetPort: 5432

Deployment de Elasticsearch

Elasticsearch impulsa el visibility store, el índice searchable de cada ejecución. Sin él, solo listas por ID. Con él, filtras por estado, tipo, atributos y rangos de tiempo. Despliega single-node con 100Gi y 4Gi heap JVM. Desactiva xpack.security.enabled por ahora. Revisa la guía de Elasticsearch para producción.

apiVersion: apps/v1
kind: Deployment
metadata:
name: elasticsearch
namespace: temporal
spec:
replicas: 1
selector:
matchLabels:
app: elasticsearch
template:
metadata:
labels:
app: elasticsearch
spec:
containers:
- name: elasticsearch
image: elasticsearch:8.14.0
env:
- name: discovery.type
value: single-node
- name: xpack.security.enabled
value: "false"
- name: ES_JAVA_OPTS
value: "-Xms1g -Xmx1g"
- name: cluster.name
value: temporal-visibility
ports:
- containerPort: 9200
name: http
- containerPort: 9300
name: transport
volumeMounts:
- name: es-data
mountPath: /usr/share/elasticsearch/data
resources:
requests:
cpu: 500m
memory: 2Gi
limits:
cpu: 1
memory: 4Gi
livenessProbe:
httpGet:
path: /_cluster/health
port: 9200
initialDelaySeconds: 60
periodSeconds: 10
volumes:
- name: es-data
persistentVolumeClaim:
claimName: es-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: es-pvc
namespace: temporal
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
storageClassName: standard
---
apiVersion: v1
kind: Service
metadata:
name: elasticsearch
namespace: temporal
spec:
selector:
app: elasticsearch
ports:
- port: 9200
targetPort: 9200
name: http

Paso 3: Desplegar el Servidor de Temporal

Revisa la documentación de Temporal para opciones de configuración del servidor. Revisa la documentación de Temporal para opciones de configuración del servidor. El servidor agrupa cuatro servicios internos: Frontend (7233), History (7234), Matching (7235) y Worker (7239), en un proceso. Las variables de entorno lo conectan a PostgreSQL y Elasticsearch. La configuración dinámica se carga del ConfigMap. Recursos: 1 CPU y 2Gi memoria.

apiVersion: apps/v1
kind: Deployment
metadata:
name: temporal-server
namespace: temporal
spec:
replicas: 1
selector:
matchLabels:
app: temporal-server
template:
metadata:
labels:
app: temporal-server
spec:
containers:
- name: temporal
image: temporalio/server:1.25.0
env:
- name: DB
value: postgresql
- name: DB_PORT
value: "5432"
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: temporal-secrets
key: postgres-user
- name: POSTGRES_PWD
valueFrom:
secretKeyRef:
name: temporal-secrets
key: postgres-password
- name: POSTGRES_SEEDS
value: postgres
- name: POSTGRES_DB
value: temporal
- name: VISIBILITY_STORE
value: elasticsearch
- name: ELASTICSEARCH_SEEDS
value: elasticsearch
- name: ELASTICSEARCH_PORT
value: "9200"
- name: DYNAMIC_CONFIG_FILE_PATH
value: /etc/temporal/dynamic_config.yaml
- name: TEMPORAL_ENCRYPTION_KEY
valueFrom:
secretKeyRef:
name: temporal-secrets
key: temporal-encryption-key
ports:
- containerPort: 7233
name: frontend
- containerPort: 7234
name: history
- containerPort: 7235
name: matching
- containerPort: 7239
name: worker
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1
memory: 2Gi
volumeMounts:
- name: dynamic-config
mountPath: /etc/temporal
livenessProbe:
httpGet:
path: /health
port: 7233
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: dynamic-config
configMap:
name: temporal-dynamic-config
---
apiVersion: v1
kind: Service
metadata:
name: temporal-server
namespace: temporal
spec:
selector:
app: temporal-server
ports:
- name: frontend
port: 7233
targetPort: 7233
- name: history
port: 7234
targetPort: 7234
- name: matching
port: 7235
targetPort: 7235

Paso 4: Desplegar el Worker de Python

Los workers ejecutan tu lógica de negocio. Hacen poll a colas, ejecutan activities y reportan resultados. Aquí tienes un worker Python listo para producción con métricas de Prometheus.

Requisitos del Worker

Instala el SDK de Python de Temporal, el cliente de Kubernetes, el SDK de OpenAI y el cliente de Prometheus.

temporalio==1.7.0
kubernetes==28.1.0
openai==1.30.0
prometheus-client==0.20.0

Código del Worker

Para patrones de integración con arquitecturas event-driven, consulta Pipelines de IA Event-Driven. El worker define cuatro activities para un pipeline de contenido IA. generate_text llama a GPT-4o con el prompt del usuario. summarize_text condensa el resultado. validate_output verifica que cumpla un umbral mínimo de longitud. compensate_generate limpia datos parciales si algo falla. La clase AIProcessingWorkflow las orquesta en secuencia: generar → validar → resumir, con un bloque try-except que ejecuta el compensation handler si la summarización falla. Cada activity usa backoff exponencial comenzando en 1 segundo, duplicando hasta 10 segundos, con máximo de 3 intentos. Los contadores de Prometheus rastrean workflows completados y ejecuciones por activity.

import asyncio
import os
import logging
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio import workflow
from temporalio.common import RetryPolicy
import openai
from prometheus_client import start_http_server, Counter
WORKFLOW_COUNTER = Counter('ai_workflows_completed', 'Completed AI workflows')
ACTIVITY_COUNTER = Counter('ai_activities_executed', 'Executed activities', ['activity'])
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def generate_text(prompt: str) -> str:
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
logger.info(f"Generating text for prompt: {prompt[:50]}...")
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
result = response.choices[0].message.content
ACTIVITY_COUNTER.labels(activity="generate_text").inc()
return result
async def summarize_text(text: str) -> str:
logger.info(f"Summarizing text of length {len(text)}...")
result = f"Summary: {text[:200]}..."
ACTIVITY_COUNTER.labels(activity="summarize_text").inc()
return result
async def validate_output(text: str) -> bool:
if len(text) < 10:
raise ValueError("Generated text too short")
return True
async def compensate_generate(text: str) -> None:
logger.info(f"Compensating: Cleaning up generated text {text[:50]}...")
ACTIVITY_COUNTER.labels(activity="compensate_generate").inc()
@workflow.defn
class AIProcessingWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
retry_policy = RetryPolicy(
maximum_attempts=3,
initial_interval=1,
maximum_interval=10,
backoff_coefficient=2.0
)
generated = await workflow.execute_activity(
generate_text, prompt,
retry_policy=retry_policy,
task_queue="ai-tasks",
start_to_close_timeout=60
)
await workflow.execute_activity(
validate_output, generated,
retry_policy=retry_policy,
task_queue="ai-tasks"
)
try:
summarized = await workflow.execute_activity(
summarize_text, generated,
retry_policy=retry_policy,
task_queue="ai-tasks",
start_to_close_timeout=30
)
except Exception as e:
await workflow.execute_activity(
compensate_generate, generated,
task_queue="ai-tasks"
)
raise e
WORKFLOW_COUNTER.inc()
return summarized
async def main():
start_http_server(8000)
client = await Client.connect("temporal-server.temporal.svc.cluster.local:7233")
worker = Worker(
client,
task_queue="ai-tasks",
workflows=[AIProcessingWorkflow],
activities=[generate_text, summarize_text, validate_output, compensate_generate]
)
logger.info("Worker started on ai-tasks queue, listening on :8000 for metrics...")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())

Deployment del Worker con HPA

Despliega el worker con un HorizontalPodAutoscaler que escala entre 3 y 10 réplicas. El HPA usa dos métricas: utilización de CPU al 70% como señal primaria y temporal_task_queue_depth sobre 10 por pod para picos repentinos. Este enfoque dual maneja tanto carga sostenida como spikes.

apiVersion: apps/v1
kind: Deployment
metadata:
name: temporal-ai-worker
namespace: temporal
spec:
replicas: 3
selector:
matchLabels:
app: temporal-ai-worker
template:
metadata:
labels:
app: temporal-ai-worker
spec:
containers:
- name: worker
image: your-registry/temporal-ai-worker:latest
env:
- name: TEMPORAL_HOST
value: "temporal-server.temporal.svc.cluster.local:7233"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: temporal-secrets
key: openai-api-key
ports:
- containerPort: 8000
name: metrics
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: temporal-ai-worker
namespace: temporal
spec:
selector:
app: temporal-ai-worker
ports:
- port: 8000
targetPort: 8000
name: metrics
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: temporal-ai-worker-hpa
namespace: temporal
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: temporal-ai-worker
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: temporal_task_queue_depth
target:
type: AverageValue
averageValue: "10"

Preguntas Frecuentes

¿Por qué usar manifests raw en lugar de Helm? Los manifests raw dan control total. Helm abstrae configuraciones que necesitas ajustar: tamaños de volumen, contextos de seguridad, límites de recursos. Para producción, los raw son más transparentes.

¿Cómo aseguro PostgreSQL en producción? Usa contraseñas seguras, habilita TLS, restringe acceso con NetworkPolicies. Nunca expongas la base de datos fuera del cluster. Consulta Seguridad en Automatización de IA para más detalles.

¿Qué pasa si el servidor de Temporal se cae? Los workers cachean estado y se reconectan al recuperarse. Las ejecuciones se pausan y reanudan automáticamente. El estado no se pierde porque Temporal persiste todo en PostgreSQL.

Continúa a la Parte 3 para configuración de persistencia, saga patterns, hardening de seguridad, monitoreo con Prometheus, el patrón híbrido con n8n y pipelines de CI/CD.

# Temporal # workflows-ia # Kubernetes # orquestacion-workflows # python