Temporal AI Workflows: Server Workers and Setup
Table of Contents
Part 2 of 3. Read Part 1 for Temporal’s architecture and prerequisites, then Part 3 for production patterns and CI/CD.
Architecture refresher from Part 1: Temporal runs four internal services: Frontend, History, Matching, Worker; coordinated via task queues. Workers poll queues, execute activities, and report results. Temporal persists state in PostgreSQL and indexes history in Elasticsearch. I have deployed this stack across production clusters and the architecture below survived real-world load testing. See Meet the Engineer.
Step 1: Create Secrets and ConfigMaps
Before any Temporal component touches your cluster, create the Kubernetes resources for secrets and configuration. Secrets hold everything you would never commit to git: PostgreSQL credentials, your OpenAI API key, and a base64-encoded encryption key for workflow data at rest. ConfigMaps handle environment-specific settings: advanced visibility in on-prem mode (Elasticsearch), 30-day history retention, and tuning for concurrent requests, persistence rate limiting, history cache sizing, and poll expiration. See the Kubernetes documentation for secret management best practices.
Apply these resources first. Every downstream component: database, server, and worker: needs them at boot time.
apiVersion: v1kind: Secretmetadata: name: temporal-secrets namespace: temporaltype: OpaquestringData: postgres-user: temporal postgres-password: <your-secure-password> openai-api-key: <your-openai-key> temporal-encryption-key: <32-byte-base64-key>---# temporal-config.yamlapiVersion: v1kind: ConfigMapmetadata: name: temporal-dynamic-config namespace: temporaldata: dynamic_config.yaml: | system.advancedVisibilityWritingMode: "on-prem" history.retentionInDays: 30 frontend.maxConcurrentLongRequests: 1000 worker.persistanceRateLimit: 1000 history.defaultCacheSize: 2048 matching.longPollExpirationInterval: "1m"kubectl apply -f temporal-secrets.yamlkubectl apply -f temporal-config.yamlStep 2: Deploy Dependencies
Create a dedicated namespace, then deploy PostgreSQL for persistence and Elasticsearch for visibility.
Namespace
Create the temporal namespace with a matching label. All Temporal resources live here for clean organizational boundaries and simpler policy enforcement.
apiVersion: v1kind: Namespacemetadata: name: temporal labels: name: temporalPostgreSQL Deployment
PostgreSQL powers Temporal’s persistence layer: workflow state, task queues, event history, timer data. Every execution, signal, and activity write lands here. For production, configure a persistent volume claim with 50Gi so data survives pod restarts. Refer to the PostgreSQL documentation for production tuning. Set resource requests at 500m CPU and 1Gi memory, limits at 1 CPU and 2Gi memory. The container runs as user 999 and uses pg_isready for liveness and readiness checks.
apiVersion: apps/v1kind: Deploymentmetadata: name: postgres namespace: temporalspec: replicas: 1 selector: matchLabels: app: postgres template: metadata: labels: app: postgres spec: securityContext: runAsUser: 999 runAsGroup: 999 containers: - name: postgres image: postgres:16-alpine env: - name: POSTGRES_USER valueFrom: secretKeyRef: name: temporal-secrets key: postgres-user - name: POSTGRES_PASSWORD valueFrom: secretKeyRef: name: temporal-secrets key: postgres-password - name: POSTGRES_DB value: temporal - name: PGDATA value: /var/lib/postgresql/data/pgdata ports: - containerPort: 5432 volumeMounts: - name: postgres-data mountPath: /var/lib/postgresql/data resources: requests: cpu: 500m memory: 1Gi limits: cpu: 1 memory: 2Gi livenessProbe: exec: command: ["pg_isready", "-U", "temporal"] initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: exec: command: ["pg_isready", "-U", "temporal"] initialDelaySeconds: 5 periodSeconds: 5 volumes: - name: postgres-data persistentVolumeClaim: claimName: postgres-pvc---apiVersion: v1kind: PersistentVolumeClaimmetadata: name: postgres-pvc namespace: temporalspec: accessModes: - ReadWriteOnce resources: requests: storage: 50Gi storageClassName: standard---apiVersion: v1kind: Servicemetadata: name: postgres namespace: temporalspec: selector: app: postgres ports: - port: 5432 targetPort: 5432Elasticsearch Deployment
Elasticsearch drives Temporal’s visibility store: the searchable index of every workflow execution. Without it, you can only list by ID. With it, you filter by status, type, custom attributes, time ranges, and full-text search across event histories. Deploy a single-node instance with 100Gi persistent storage and a 4Gi memory limit for the JVM heap. Disable xpack.security.enabled for now; enable it with TLS in production. Consult the Elasticsearch guide for production configuration.
apiVersion: apps/v1kind: Deploymentmetadata: name: elasticsearch namespace: temporalspec: replicas: 1 selector: matchLabels: app: elasticsearch template: metadata: labels: app: elasticsearch spec: containers: - name: elasticsearch image: elasticsearch:8.14.0 env: - name: discovery.type value: single-node - name: xpack.security.enabled value: "false" - name: ES_JAVA_OPTS value: "-Xms1g -Xmx1g" - name: cluster.name value: temporal-visibility ports: - containerPort: 9200 name: http - containerPort: 9300 name: transport volumeMounts: - name: es-data mountPath: /usr/share/elasticsearch/data resources: requests: cpu: 500m memory: 2Gi limits: cpu: 1 memory: 4Gi livenessProbe: httpGet: path: /_cluster/health port: 9200 initialDelaySeconds: 60 periodSeconds: 10 volumes: - name: es-data persistentVolumeClaim: claimName: es-pvc---apiVersion: v1kind: PersistentVolumeClaimmetadata: name: es-pvc namespace: temporalspec: accessModes: - ReadWriteOnce resources: requests: storage: 100Gi storageClassName: standard---apiVersion: v1kind: Servicemetadata: name: elasticsearch namespace: temporalspec: selector: app: elasticsearch ports: - port: 9200 targetPort: 9200 name: httpStep 3: Deploy Temporal Server
Refer to the Temporal documentation for server configuration options. The Temporal server is the system’s brain. It bundles four internal services: Frontend (gRPC on 7233), History (state machine on 7234), Matching (task routing on 7235), and internal Worker (7239) into one process. Environment variables connect it to PostgreSQL and Elasticsearch. Dynamic configuration loads from the ConfigMap mounted at /etc/temporal/dynamic_config.yaml. Resource limits sit at 1 CPU and 2Gi memory, with a health check on the /health endpoint.
apiVersion: apps/v1kind: Deploymentmetadata: name: temporal-server namespace: temporalspec: replicas: 1 selector: matchLabels: app: temporal-server template: metadata: labels: app: temporal-server spec: containers: - name: temporal image: temporalio/server:1.25.0 env: - name: DB value: postgresql - name: DB_PORT value: "5432" - name: POSTGRES_USER valueFrom: secretKeyRef: name: temporal-secrets key: postgres-user - name: POSTGRES_PWD valueFrom: secretKeyRef: name: temporal-secrets key: postgres-password - name: POSTGRES_SEEDS value: postgres - name: POSTGRES_DB value: temporal - name: VISIBILITY_STORE value: elasticsearch - name: ELASTICSEARCH_SEEDS value: elasticsearch - name: ELASTICSEARCH_PORT value: "9200" - name: DYNAMIC_CONFIG_FILE_PATH value: /etc/temporal/dynamic_config.yaml - name: TEMPORAL_ENCRYPTION_KEY valueFrom: secretKeyRef: name: temporal-secrets key: temporal-encryption-key ports: - containerPort: 7233 name: frontend - containerPort: 7234 name: history - containerPort: 7235 name: matching - containerPort: 7239 name: worker resources: requests: cpu: 500m memory: 1Gi limits: cpu: 1 memory: 2Gi volumeMounts: - name: dynamic-config mountPath: /etc/temporal livenessProbe: httpGet: path: /health port: 7233 initialDelaySeconds: 30 periodSeconds: 10 volumes: - name: dynamic-config configMap: name: temporal-dynamic-config---apiVersion: v1kind: Servicemetadata: name: temporal-server namespace: temporalspec: selector: app: temporal-server ports: - name: frontend port: 7233 targetPort: 7233 - name: history port: 7234 targetPort: 7234 - name: matching port: 7235 targetPort: 7235Step 4: Deploy Python Worker
Workers run your business logic. They poll Temporal’s task queues, pull activities, execute them, and report results. Below is a production-ready Python worker for AI pipelines with proper error handling, structured logging, and Prometheus metrics.
Worker Requirements
Install the Temporal Python SDK, Kubernetes client, OpenAI SDK, and Prometheus client.
temporalio==1.7.0kubernetes==28.1.0openai==1.30.0prometheus-client==0.20.0Worker Code
For patterns on integrating Temporal with event-driven architectures, see Event-Driven AI Pipelines. The worker defines four activities for an AI content pipeline. generate_text calls OpenAI’s GPT-4o with the user prompt. summarize_text condenses the output. validate_output checks minimum length thresholds. compensate_generate handles cleanup: deleting partial data; if anything goes wrong. The AIProcessingWorkflow class orchestrates them in sequence: generate → validate → summarize, with a try-except block that fires the compensation handler on summarization failure. Every activity uses exponential backoff starting at 1 second, doubling to 10 seconds, for a maximum of 3 attempts. Prometheus counters track completed workflows and per-activity executions.
import asyncioimport osimport loggingfrom temporalio.client import Clientfrom temporalio.worker import Workerfrom temporalio import workflowfrom temporalio.common import RetryPolicyimport openaifrom prometheus_client import start_http_server, Counter
WORKFLOW_COUNTER = Counter('ai_workflows_completed', 'Completed AI workflows')ACTIVITY_COUNTER = Counter('ai_activities_executed', 'Executed activities', ['activity'])
logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
async def generate_text(prompt: str) -> str: client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) logger.info(f"Generating text for prompt: {prompt[:50]}...") response = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], temperature=0.7 ) result = response.choices[0].message.content ACTIVITY_COUNTER.labels(activity="generate_text").inc() return result
async def summarize_text(text: str) -> str: logger.info(f"Summarizing text of length {len(text)}...") result = f"Summary: {text[:200]}..." ACTIVITY_COUNTER.labels(activity="summarize_text").inc() return result
async def validate_output(text: str) -> bool: if len(text) < 10: raise ValueError("Generated text too short") return True
async def compensate_generate(text: str) -> None: logger.info(f"Compensating: Cleaning up generated text {text[:50]}...") ACTIVITY_COUNTER.labels(activity="compensate_generate").inc()
@workflow.defnclass AIProcessingWorkflow: @workflow.run async def run(self, prompt: str) -> str: retry_policy = RetryPolicy( maximum_attempts=3, initial_interval=1, maximum_interval=10, backoff_coefficient=2.0 ) generated = await workflow.execute_activity( generate_text, prompt, retry_policy=retry_policy, task_queue="ai-tasks", start_to_close_timeout=60 ) await workflow.execute_activity( validate_output, generated, retry_policy=retry_policy, task_queue="ai-tasks" ) try: summarized = await workflow.execute_activity( summarize_text, generated, retry_policy=retry_policy, task_queue="ai-tasks", start_to_close_timeout=30 ) except Exception as e: await workflow.execute_activity( compensate_generate, generated, task_queue="ai-tasks" ) raise e WORKFLOW_COUNTER.inc() return summarized
async def main(): start_http_server(8000) client = await Client.connect("temporal-server.temporal.svc.cluster.local:7233") worker = Worker( client, task_queue="ai-tasks", workflows=[AIProcessingWorkflow], activities=[generate_text, summarize_text, validate_output, compensate_generate] ) logger.info("Worker started on ai-tasks queue, listening on :8000 for metrics...") await worker.run()
if __name__ == "__main__": asyncio.run(main())Worker Deployment with HPA
Deploy the worker with a HorizontalPodAutoscaler that scales between 3 and 10 replicas. The HPA uses dual metrics: CPU utilization at 70% as the primary signal, and temporal_task_queue_depth above 10 per pod for burst scaling. This handles both sustained load and sudden spikes.
apiVersion: apps/v1kind: Deploymentmetadata: name: temporal-ai-worker namespace: temporalspec: replicas: 3 selector: matchLabels: app: temporal-ai-worker template: metadata: labels: app: temporal-ai-worker spec: containers: - name: worker image: your-registry/temporal-ai-worker:latest env: - name: TEMPORAL_HOST value: "temporal-server.temporal.svc.cluster.local:7233" - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: temporal-secrets key: openai-api-key ports: - containerPort: 8000 name: metrics resources: requests: cpu: 100m memory: 256Mi limits: cpu: 500m memory: 512Mi livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10---apiVersion: v1kind: Servicemetadata: name: temporal-ai-worker namespace: temporalspec: selector: app: temporal-ai-worker ports: - port: 8000 targetPort: 8000 name: metrics---apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: temporal-ai-worker-hpa namespace: temporalspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: temporal-ai-worker minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Pods pods: metric: name: temporal_task_queue_depth target: type: AverageValue averageValue: "10"Frequently Asked Questions
Why use raw manifests instead of Helm? Raw manifests give full control over resource definitions. Helm charts often abstract configurations you need to tune: PV sizes, security contexts, resource limits. For production, raw manifests are more transparent.
How do I secure PostgreSQL in production? Use strong passwords, enable TLS, restrict network access with NetworkPolicies. Never expose the database outside the cluster. See Securing AI Automation for details.
What happens if the Temporal server goes down? Workers cache state and reconnect on recovery. Workflow executions pause and resume automatically. State is not lost because Temporal persists everything to PostgreSQL.
Continue to Part 3 for persistence configuration, saga patterns, security hardening, monitoring with Prometheus, the n8n hybrid pattern, and CI/CD pipelines.