AI SRE Agent MCP Server Code and Safety Guide

2026.01.07
Technology
562 Words
AI SRE Agent MCP Server Code and Safety Guide

This is Part 2 of a 3-part series by Eduardo. Part 1 covered the MCP protocol and the 4-layer architecture. Here we write the server code and safety guardrails. Part 3 covers production deployment.

Step 1: Project Setup

Create the project directory and virtual environment:

Terminal window
mkdir -p ~/k8s-mcp-server
cd ~/k8s-mcp-server
uv venv
source .venv/bin/activate

Install dependencies:

Terminal window
uv pip install fastmcp kubernetes pydantic

Step 2: Create the Server Core

Create server.py. This file defines the MCP server using FastMCP, connects to your Kubernetes cluster through the Kubernetes Python client, and exposes diagnostic tools.

server.py
# Production MCP server for Kubernetes SRE operations
import os
import json
import subprocess
from typing import Optional, Literal
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from kubernetes import client, config
from fastmcp import FastMCP, Context
# ─────────────────────────────────────────────────────────────
# Configuration & Safety
# ─────────────────────────────────────────────────────────────
# Default to read-only. Set K8S_MCP_MODE=readwrite only if you understand
# the risks and have approval gates in place.
OPERATION_MODE = os.environ.get("K8S_MCP_MODE", "readonly")
ALLOWED_NAMESPACES = os.environ.get("K8S_MCP_NAMESPACES", "").split(",")
ALLOWED_NAMESPACES = [ns.strip() for ns in ALLOWED_NAMESPACES if ns.strip()]
# Dry-run mode: log what would be executed without executing
DRY_RUN = os.environ.get("K8S_MCP_DRY_RUN", "false").lower() == "true"
def _check_namespace(namespace: str) -> None:
"""Enforce namespace allowlist if configured."""
if ALLOWED_NAMESPACES and namespace not in ALLOWED_NAMESPACES:
raise PermissionError(
f"Namespace '{namespace}' is not in the allowed list: {ALLOWED_NAMESPACES}"
)
def _check_readwrite() -> None:
"""Enforce read-only default."""
if OPERATION_MODE != "readwrite":
raise PermissionError(
"This operation requires readwrite mode. "
"Set K8S_MCP_MODE=readwrite to enable, but use with caution."
)
def _log_action(action: str, details: dict) -> None:
"""Audit logging for all operations."""
log_entry = {"action": action, "details": details, "dry_run": DRY_RUN}
print(json.dumps(log_entry, default=str))
# ─────────────────────────────────────────────────────────────
# Kubernetes Client Lifecycle
# ─────────────────────────────────────────────────────────────
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[dict]:
"""Manage the Kubernetes client lifecycle."""
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
yield {"v1": v1, "apps_v1": apps_v1}
mcp = FastMCP("k8s-sre", lifespan=app_lifespan)
# ─────────────────────────────────────────────────────────────
# Tools: Read-Only Diagnostics
# ─────────────────────────────────────────────────────────────
@mcp.tool()
async def list_pods(
namespace: str = "default",
ctx: Context = None,
) -> str:
"""List all pods in a namespace with their status and restart counts.
Args:
namespace: The Kubernetes namespace to query.
"""
_check_namespace(namespace)
_log_action("list_pods", {"namespace": namespace})
if DRY_RUN:
return f"[DRY RUN] Would list pods in namespace '{namespace}'"
v1 = ctx.request_context.lifespan_context["v1"]
pods = v1.list_namespaced_pod(namespace=namespace)
lines = [f"{'NAME':<40} {'STATUS':<12} {'RESTARTS':<10} {'AGE'}"]
lines.append("-" * 80)
for pod in pods.items:
restarts = sum(cs.restart_count for cs in (pod.status.container_statuses or []))
lines.append(
f"{pod.metadata.name:<40} {pod.status.phase:<12} {restarts:<10} {pod.metadata.creation_timestamp}"
)
return "\n".join(lines)
@mcp.tool()
async def get_pod_logs(
pod_name: str,
namespace: str = "default",
tail_lines: int = 100,
ctx: Context = None,
) -> str:
"""Fetch the logs for a specific pod.
Args:
pod_name: The name of the pod.
namespace: The pod's namespace.
tail_lines: Number of recent log lines to return (max 1000).
"""
_check_namespace(namespace)
_log_action("get_pod_logs", {"pod_name": pod_name, "namespace": namespace, "tail_lines": tail_lines})
if DRY_RUN:
return f"[DRY RUN] Would fetch logs for pod '{pod_name}' in '{namespace}'"
tail_lines = min(tail_lines, 1000)
v1 = ctx.request_context.lifespan_context["v1"]
logs = v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=tail_lines,
)
return logs
@mcp.tool()
async def describe_pod(
pod_name: str,
namespace: str = "default",
ctx: Context = None,
) -> str:
"""Describe a pod, including events, conditions, and container details.
Args:
pod_name: The name of the pod.
namespace: The pod's namespace.
"""
_check_namespace(namespace)
_log_action("describe_pod", {"pod_name": pod_name, "namespace": namespace})
if DRY_RUN:
return f"[DRY RUN] Would describe pod '{pod_name}' in '{namespace}'"
v1 = ctx.request_context.lifespan_context["v1"]
pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
lines = [
f"Name: {pod.metadata.name}",
f"Namespace: {pod.metadata.namespace}",
f"Node: {pod.spec.node_name}",
f"Status: {pod.status.phase}",
f"IP: {pod.status.pod_ip}",
"",
"Conditions:",
]
for cond in pod.status.conditions or []:
lines.append(f" {cond.type}: {cond.status} ({cond.reason or 'N/A'})")
lines.extend(["", "Container Statuses:"])
for cs in pod.status.container_statuses or []:
state = "unknown"
if cs.state.running:
state = f"running (started: {cs.state.running.started_at})"
elif cs.state.waiting:
state = f"waiting ({cs.state.waiting.reason}: {cs.state.waiting.message})"
elif cs.state.terminated:
state = f"terminated (exit: {cs.state.terminated.exit_code})"
lines.append(f" {cs.name}: {state} (restarts: {cs.restart_count})")
return "\n".join(lines)
@mcp.tool()
async def list_deployments(
namespace: str = "default",
ctx: Context = None,
) -> str:
"""List all deployments in a namespace with replica status.
Args:
namespace: The Kubernetes namespace to query.
"""
_check_namespace(namespace)
_log_action("list_deployments", {"namespace": namespace})
if DRY_RUN:
return f"[DRY RUN] Would list deployments in namespace '{namespace}'"
apps_v1 = ctx.request_context.lifespan_context["apps_v1"]
deps = apps_v1.list_namespaced_deployment(namespace=namespace)
lines = [f"{'NAME':<40} {'READY':<10} {'UP-TO-DATE':<12} {'AVAILABLE':<12} {'AGE'}"]
lines.append("-" * 90)
for dep in deps.items:
ready = f"{dep.status.ready_replicas or 0}/{dep.spec.replicas}"
lines.append(
f"{dep.metadata.name:<40} {ready:<10} "
f"{dep.status.updated_replicas or 0:<12} {dep.status.available_replicas or 0:<12} "
f"{dep.metadata.creation_timestamp}"
)
return "\n".join(lines)
@mcp.tool()
async def get_node_status(ctx: Context = None) -> str:
"""List all cluster nodes with their capacity and conditions."""
_log_action("get_node_status", {})
if DRY_RUN:
return "[DRY RUN] Would list all cluster nodes"
v1 = ctx.request_context.lifespan_context["v1"]
nodes = v1.list_node()
lines = [f"{'NAME':<30} {'STATUS':<12} {'CPU':<10} {'MEMORY':<15} {'AGE'}"]
lines.append("-" * 85)
for node in nodes.items:
status = "Ready"
for cond in node.status.conditions:
if cond.type == "Ready":
status = "Ready" if cond.status == "True" else "NotReady"
cpu = node.status.capacity.get("cpu", "N/A")
mem = node.status.capacity.get("memory", "N/A")
lines.append(
f"{node.metadata.name:<30} {status:<12} {cpu:<10} {mem:<15} {node.metadata.creation_timestamp}"
)
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────
# Tool: Remediation (Read-Write, Guarded)
# ─────────────────────────────────────────────────────────────
@mcp.tool()
async def restart_deployment(
deployment_name: str,
namespace: str = "default",
ctx: Context = None,
) -> str:
"""Trigger a rolling restart of a deployment by updating an annotation.
This is safe and non-destructive, but requires readwrite mode.
Args:
deployment_name: The deployment to restart.
namespace: The deployment's namespace.
"""
_check_namespace(namespace)
_check_readwrite()
_log_action("restart_deployment", {"deployment_name": deployment_name, "namespace": namespace})
if DRY_RUN:
return (
f"[DRY RUN] Would restart deployment '{deployment_name}' "
f"in namespace '{namespace}' by patching rollout timestamp"
)
apps_v1 = ctx.request_context.lifespan_context["apps_v1"]
now = json.dumps(json.dumps(str(__import__("datetime").datetime.utcnow())))
patch = {
"spec": {
"template": {
"metadata": {
"annotations": {
"kubectl.kubernetes.io/restartedAt": str(__import__("datetime").datetime.utcnow())
}
}
}
}
}
apps_v1.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=patch,
)
return f"Deployment '{deployment_name}' in '{namespace}' restarted successfully."
if __name__ == "__main__":
mcp.run(transport="stdio")

Step 3: Safety Guardrails Explained

Here is how the safety mechanisms work, building on the architecture from Part 1.

Read-Only by Default: The OPERATION_MODE environment variable defaults to readonly. The restart_deployment tool explicitly calls _check_readwrite(), which raises a PermissionError unless the mode is set to readwrite. Even if Claude determines (incorrectly) that it should restart something, the server refuses unless an operator has explicitly enabled write access.

Namespace Allowlisting: The ALLOWED_NAMESPACES environment variable restricts which namespaces the agent can see. Want the agent to only diagnose your production namespace? Set K8S_MCP_NAMESPACES=production. The server rejects any request targeting a different namespace before it reaches the Kubernetes API.

Dry-Run Mode: Set K8S_MCP_DRY_RUN=true and every tool returns a description of what it would do instead of executing. Invaluable for testing new workflows and demonstrating capabilities to security teams.

Audit Logging: Every tool call logs a JSON line to stdout. In production, I ship these logs to Loki or Elasticsearch so I can trace exactly what the agent did, when, and why.

Step 4: Connect to Claude Desktop

Claude Desktop supports MCP servers through a configuration file. On macOS, that is ~/Library/Application Support/Claude/claude_desktop_config.json. On Linux, it is ~/.config/Claude/claude_desktop_config.json.

{
"mcpServers": {
"k8s-sre": {
"command": "/home/<USER>/.local/bin/uv",
"args": [
"run",
"--project",
"/home/<USER>/k8s-mcp-server",
"python",
"server.py"
],
"env": {
"K8S_MCP_MODE": "readonly",
"K8S_MCP_NAMESPACES": "default,staging",
"K8S_MCP_DRY_RUN": "false"
}
}
}
}

Replace <USER> with your username and adjust paths as needed. Restart Claude Desktop after saving.

To verify the connection, open Claude and type:

Show me the pods in the default namespace.

Claude recognizes the tool, calls list_pods, and presents the results in a formatted table.

Step 5: Approval Queue for Write Operations

For production, I never rely solely on the readwrite environment variable. I add an external approval queue. Here is a simplified file-based pattern:

approval_queue.py
import os
import json
import time
from pathlib import Path
APPROVAL_DIR = Path(os.environ.get("K8S_MCP_APPROVAL_DIR", "/var/lib/k8s-mcp/approvals"))
APPROVAL_DIR.mkdir(parents=True, exist_ok=True)
def request_approval(action: str, details: dict) -> str:
"""Request human approval for a write operation."""
approval_id = f"{int(time.time())}-{action}"
request_file = APPROVAL_DIR / f"{approval_id}.pending"
request_file.write_text(json.dumps({"action": action, "details": details}))
return (
f"Approval requested: {approval_id}\n"
f"Action: {action}\n"
f"Details: {json.dumps(details)}\n"
f"To approve, rename {approval_id}.pending to {approval_id}.approved"
)
def is_approved(approval_id: str) -> bool:
"""Check if an approval has been granted."""
return (APPROVAL_DIR / f"{approval_id}.approved").exists()

Wire this into restart_deployment before it patches anything. In a real system, I use a Slack bot or PagerDuty integration instead of file-based approval, but the pattern holds: never let an AI agent perform destructive actions without explicit human consent.

FAQ

How do I test the server without a real Kubernetes cluster?

Use dry-run mode by setting K8S_MCP_DRY_RUN=true. Every tool returns a description of what it would do without executing anything. This is invaluable for testing new workflows and demonstrating capabilities to security teams.

What if I need to add a new diagnostic tool?

Define a new function decorated with @mcp.tool() and add the corresponding Kubernetes API call inside the body. Claude discovers the tool automatically on the next connection; no server restart required for the client to pick it up.

Can I use this with multiple Kubernetes clusters?

Yes. Either switch kubeconfig contexts manually or run separate MCP server instances, each configured with its own namespace allowlist and cluster credentials.

Does dry-run mode guarantee safety?

It logs what would happen but does not guarantee success at execution time; cluster state changes between dry-run and actual execution. Combine it with read-only mode and namespace allowlisting, not as a standalone safety measure.

Continue to Part 3 where we deploy to production with Docker, Kubernetes, and RBAC, then explore multi-tool workflows.

# Mcp # ai-agent # sre # Kubernetes # fastmcp # agentic # DevOps