Temporal IA: Servidor Workers y Configuración
Tabla de contenidos
Parte 2 de 3. Lee la Parte 1 para conocer la arquitectura y prerrequisitos de Temporal, luego la Parte 3 para patrones de producción y CI/CD.
Repaso rápido desde la Parte 1: Temporal ejecuta cuatro servicios internos: Frontend, History, Matching, Worker, coordinados mediante colas de tareas. Los workers ejecutan activities y reportan resultados. Temporal persiste el estado en PostgreSQL e indexa el historial en Elasticsearch. He desplegado este stack en clusters de producción y la arquitectura aquí sobrevivió pruebas de carga reales. Visita Conoce al Ingeniero.
Paso 1: Crear Secrets y ConfigMaps
Antes de que cualquier componente de Temporal toque tu cluster, crea los recursos de Kubernetes para secrets y configuración. Los Secrets guardan credenciales de PostgreSQL, tu API key de OpenAI y una clave de encriptación para datos en reposo. Los ConfigMaps manejan visibilidad avanzada on-prem (Elasticsearch), retención de 30 días y tuning de concurrencia y persistencia. Consulta la documentación de Kubernetes para mejores prácticas de gestión de secrets.
Aplica estos recursos primero. Cada componente downstream (base de datos, servidor y worker) los necesita al arrancar.
apiVersion: v1kind: Secretmetadata: name: temporal-secrets namespace: temporaltype: OpaquestringData: postgres-user: temporal postgres-password: <your-secure-password> openai-api-key: <your-openai-key> temporal-encryption-key: <32-byte-base64-key>---# temporal-config.yamlapiVersion: v1kind: ConfigMapmetadata: name: temporal-dynamic-config namespace: temporaldata: dynamic_config.yaml: | system.advancedVisibilityWritingMode: "on-prem" history.retentionInDays: 30 frontend.maxConcurrentLongRequests: 1000 worker.persistanceRateLimit: 1000 history.defaultCacheSize: 2048 matching.longPollExpirationInterval: "1m"kubectl apply -f temporal-secrets.yamlkubectl apply -f temporal-config.yamlPaso 2: Desplegar Dependencias
Crea un namespace dedicado, luego despliega PostgreSQL para persistencia y Elasticsearch para visibilidad.
Namespace
Crea el namespace temporal con su label correspondiente. Todos los recursos de Temporal viven aquí para límites organizacionales claros y políticas más simples.
apiVersion: v1kind: Namespacemetadata: name: temporal labels: name: temporalDeployment de PostgreSQL
PostgreSQL impulsa la persistencia de Temporal: estado, colas, historial, timers. Cada ejecución termina aquí. Para producción, configura un PVC con 50Gi. Consulta la documentación de PostgreSQL para ajustes. Establece requests en 500m CPU y 1Gi memoria, límites en 1 CPU y 2Gi. El contenedor ejecuta como usuario 999 y usa pg_isready para health checks.
apiVersion: apps/v1kind: Deploymentmetadata: name: postgres namespace: temporalspec: replicas: 1 selector: matchLabels: app: postgres template: metadata: labels: app: postgres spec: securityContext: runAsUser: 999 runAsGroup: 999 containers: - name: postgres image: postgres:16-alpine env: - name: POSTGRES_USER valueFrom: secretKeyRef: name: temporal-secrets key: postgres-user - name: POSTGRES_PASSWORD valueFrom: secretKeyRef: name: temporal-secrets key: postgres-password - name: POSTGRES_DB value: temporal - name: PGDATA value: /var/lib/postgresql/data/pgdata ports: - containerPort: 5432 volumeMounts: - name: postgres-data mountPath: /var/lib/postgresql/data resources: requests: cpu: 500m memory: 1Gi limits: cpu: 1 memory: 2Gi livenessProbe: exec: command: ["pg_isready", "-U", "temporal"] initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: exec: command: ["pg_isready", "-U", "temporal"] initialDelaySeconds: 5 periodSeconds: 5 volumes: - name: postgres-data persistentVolumeClaim: claimName: postgres-pvc---apiVersion: v1kind: PersistentVolumeClaimmetadata: name: postgres-pvc namespace: temporalspec: accessModes: - ReadWriteOnce resources: requests: storage: 50Gi storageClassName: standard---apiVersion: v1kind: Servicemetadata: name: postgres namespace: temporalspec: selector: app: postgres ports: - port: 5432 targetPort: 5432Deployment de Elasticsearch
Elasticsearch impulsa el visibility store, el índice searchable de cada ejecución. Sin él, solo listas por ID. Con él, filtras por estado, tipo, atributos y rangos de tiempo. Despliega single-node con 100Gi y 4Gi heap JVM. Desactiva xpack.security.enabled por ahora. Revisa la guía de Elasticsearch para producción.
apiVersion: apps/v1kind: Deploymentmetadata: name: elasticsearch namespace: temporalspec: replicas: 1 selector: matchLabels: app: elasticsearch template: metadata: labels: app: elasticsearch spec: containers: - name: elasticsearch image: elasticsearch:8.14.0 env: - name: discovery.type value: single-node - name: xpack.security.enabled value: "false" - name: ES_JAVA_OPTS value: "-Xms1g -Xmx1g" - name: cluster.name value: temporal-visibility ports: - containerPort: 9200 name: http - containerPort: 9300 name: transport volumeMounts: - name: es-data mountPath: /usr/share/elasticsearch/data resources: requests: cpu: 500m memory: 2Gi limits: cpu: 1 memory: 4Gi livenessProbe: httpGet: path: /_cluster/health port: 9200 initialDelaySeconds: 60 periodSeconds: 10 volumes: - name: es-data persistentVolumeClaim: claimName: es-pvc---apiVersion: v1kind: PersistentVolumeClaimmetadata: name: es-pvc namespace: temporalspec: accessModes: - ReadWriteOnce resources: requests: storage: 100Gi storageClassName: standard---apiVersion: v1kind: Servicemetadata: name: elasticsearch namespace: temporalspec: selector: app: elasticsearch ports: - port: 9200 targetPort: 9200 name: httpPaso 3: Desplegar el Servidor de Temporal
Revisa la documentación de Temporal para opciones de configuración del servidor. Revisa la documentación de Temporal para opciones de configuración del servidor. El servidor agrupa cuatro servicios internos: Frontend (7233), History (7234), Matching (7235) y Worker (7239), en un proceso. Las variables de entorno lo conectan a PostgreSQL y Elasticsearch. La configuración dinámica se carga del ConfigMap. Recursos: 1 CPU y 2Gi memoria.
apiVersion: apps/v1kind: Deploymentmetadata: name: temporal-server namespace: temporalspec: replicas: 1 selector: matchLabels: app: temporal-server template: metadata: labels: app: temporal-server spec: containers: - name: temporal image: temporalio/server:1.25.0 env: - name: DB value: postgresql - name: DB_PORT value: "5432" - name: POSTGRES_USER valueFrom: secretKeyRef: name: temporal-secrets key: postgres-user - name: POSTGRES_PWD valueFrom: secretKeyRef: name: temporal-secrets key: postgres-password - name: POSTGRES_SEEDS value: postgres - name: POSTGRES_DB value: temporal - name: VISIBILITY_STORE value: elasticsearch - name: ELASTICSEARCH_SEEDS value: elasticsearch - name: ELASTICSEARCH_PORT value: "9200" - name: DYNAMIC_CONFIG_FILE_PATH value: /etc/temporal/dynamic_config.yaml - name: TEMPORAL_ENCRYPTION_KEY valueFrom: secretKeyRef: name: temporal-secrets key: temporal-encryption-key ports: - containerPort: 7233 name: frontend - containerPort: 7234 name: history - containerPort: 7235 name: matching - containerPort: 7239 name: worker resources: requests: cpu: 500m memory: 1Gi limits: cpu: 1 memory: 2Gi volumeMounts: - name: dynamic-config mountPath: /etc/temporal livenessProbe: httpGet: path: /health port: 7233 initialDelaySeconds: 30 periodSeconds: 10 volumes: - name: dynamic-config configMap: name: temporal-dynamic-config---apiVersion: v1kind: Servicemetadata: name: temporal-server namespace: temporalspec: selector: app: temporal-server ports: - name: frontend port: 7233 targetPort: 7233 - name: history port: 7234 targetPort: 7234 - name: matching port: 7235 targetPort: 7235Paso 4: Desplegar el Worker de Python
Los workers ejecutan tu lógica de negocio. Hacen poll a colas, ejecutan activities y reportan resultados. Aquí tienes un worker Python listo para producción con métricas de Prometheus.
Requisitos del Worker
Instala el SDK de Python de Temporal, el cliente de Kubernetes, el SDK de OpenAI y el cliente de Prometheus.
temporalio==1.7.0kubernetes==28.1.0openai==1.30.0prometheus-client==0.20.0Código del Worker
Para patrones de integración con arquitecturas event-driven, consulta Pipelines de IA Event-Driven. El worker define cuatro activities para un pipeline de contenido IA. generate_text llama a GPT-4o con el prompt del usuario. summarize_text condensa el resultado. validate_output verifica que cumpla un umbral mínimo de longitud. compensate_generate limpia datos parciales si algo falla. La clase AIProcessingWorkflow las orquesta en secuencia: generar → validar → resumir, con un bloque try-except que ejecuta el compensation handler si la summarización falla. Cada activity usa backoff exponencial comenzando en 1 segundo, duplicando hasta 10 segundos, con máximo de 3 intentos. Los contadores de Prometheus rastrean workflows completados y ejecuciones por activity.
import asyncioimport osimport loggingfrom temporalio.client import Clientfrom temporalio.worker import Workerfrom temporalio import workflowfrom temporalio.common import RetryPolicyimport openaifrom prometheus_client import start_http_server, Counter
WORKFLOW_COUNTER = Counter('ai_workflows_completed', 'Completed AI workflows')ACTIVITY_COUNTER = Counter('ai_activities_executed', 'Executed activities', ['activity'])
logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
async def generate_text(prompt: str) -> str: client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) logger.info(f"Generating text for prompt: {prompt[:50]}...") response = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], temperature=0.7 ) result = response.choices[0].message.content ACTIVITY_COUNTER.labels(activity="generate_text").inc() return result
async def summarize_text(text: str) -> str: logger.info(f"Summarizing text of length {len(text)}...") result = f"Summary: {text[:200]}..." ACTIVITY_COUNTER.labels(activity="summarize_text").inc() return result
async def validate_output(text: str) -> bool: if len(text) < 10: raise ValueError("Generated text too short") return True
async def compensate_generate(text: str) -> None: logger.info(f"Compensating: Cleaning up generated text {text[:50]}...") ACTIVITY_COUNTER.labels(activity="compensate_generate").inc()
@workflow.defnclass AIProcessingWorkflow: @workflow.run async def run(self, prompt: str) -> str: retry_policy = RetryPolicy( maximum_attempts=3, initial_interval=1, maximum_interval=10, backoff_coefficient=2.0 ) generated = await workflow.execute_activity( generate_text, prompt, retry_policy=retry_policy, task_queue="ai-tasks", start_to_close_timeout=60 ) await workflow.execute_activity( validate_output, generated, retry_policy=retry_policy, task_queue="ai-tasks" ) try: summarized = await workflow.execute_activity( summarize_text, generated, retry_policy=retry_policy, task_queue="ai-tasks", start_to_close_timeout=30 ) except Exception as e: await workflow.execute_activity( compensate_generate, generated, task_queue="ai-tasks" ) raise e WORKFLOW_COUNTER.inc() return summarized
async def main(): start_http_server(8000) client = await Client.connect("temporal-server.temporal.svc.cluster.local:7233") worker = Worker( client, task_queue="ai-tasks", workflows=[AIProcessingWorkflow], activities=[generate_text, summarize_text, validate_output, compensate_generate] ) logger.info("Worker started on ai-tasks queue, listening on :8000 for metrics...") await worker.run()
if __name__ == "__main__": asyncio.run(main())Deployment del Worker con HPA
Despliega el worker con un HorizontalPodAutoscaler que escala entre 3 y 10 réplicas. El HPA usa dos métricas: utilización de CPU al 70% como señal primaria y temporal_task_queue_depth sobre 10 por pod para picos repentinos. Este enfoque dual maneja tanto carga sostenida como spikes.
apiVersion: apps/v1kind: Deploymentmetadata: name: temporal-ai-worker namespace: temporalspec: replicas: 3 selector: matchLabels: app: temporal-ai-worker template: metadata: labels: app: temporal-ai-worker spec: containers: - name: worker image: your-registry/temporal-ai-worker:latest env: - name: TEMPORAL_HOST value: "temporal-server.temporal.svc.cluster.local:7233" - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: temporal-secrets key: openai-api-key ports: - containerPort: 8000 name: metrics resources: requests: cpu: 100m memory: 256Mi limits: cpu: 500m memory: 512Mi livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10---apiVersion: v1kind: Servicemetadata: name: temporal-ai-worker namespace: temporalspec: selector: app: temporal-ai-worker ports: - port: 8000 targetPort: 8000 name: metrics---apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: temporal-ai-worker-hpa namespace: temporalspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: temporal-ai-worker minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Pods pods: metric: name: temporal_task_queue_depth target: type: AverageValue averageValue: "10"Preguntas Frecuentes
¿Por qué usar manifests raw en lugar de Helm? Los manifests raw dan control total. Helm abstrae configuraciones que necesitas ajustar: tamaños de volumen, contextos de seguridad, límites de recursos. Para producción, los raw son más transparentes.
¿Cómo aseguro PostgreSQL en producción? Usa contraseñas seguras, habilita TLS, restringe acceso con NetworkPolicies. Nunca expongas la base de datos fuera del cluster. Consulta Seguridad en Automatización de IA para más detalles.
¿Qué pasa si el servidor de Temporal se cae? Los workers cachean estado y se reconectan al recuperarse. Las ejecuciones se pausan y reanudan automáticamente. El estado no se pierde porque Temporal persiste todo en PostgreSQL.
Continúa a la Parte 3 para configuración de persistencia, saga patterns, hardening de seguridad, monitoreo con Prometheus, el patrón híbrido con n8n y pipelines de CI/CD.