Agente SRE IA MCP: Servidor y Seguridad
Tabla de contenidos
Esta es la Parte 2 de una serie de 3 partes por Eduardo. La Parte 1 explicó el protocolo MCP y la arquitectura de 4 capas. Aquí escribimos el código del servidor y los guardarrailes de seguridad. La Parte 3 cubre el despliegue en producción.
Paso 1: Configuración del proyecto
Crea el directorio del proyecto y el entorno virtual:
mkdir -p ~/k8s-mcp-servercd ~/k8s-mcp-serveruv venvsource .venv/bin/activateInstala las dependencias:
uv pip install fastmcp kubernetes pydanticPaso 2: Crear el núcleo del servidor
Crea server.py. Este archivo define el servidor MCP usando FastMCP, se conecta a tu clúster de Kubernetes mediante el cliente Python de Kubernetes, y expone las herramientas de diagnóstico.
# Servidor MCP de producción para operaciones SRE de Kubernetes
import osimport jsonimport subprocessfrom typing import Optional, Literalfrom contextlib import asynccontextmanagerfrom collections.abc import AsyncIterator
from kubernetes import client, configfrom fastmcp import FastMCP, Context
# ─────────────────────────────────────────────────────────────# Configuración y seguridad# ─────────────────────────────────────────────────────────────
# Por defecto solo lectura. Establece K8S_MCP_MODE=readwrite solo si entiendes# los riesgos y tienes compuertas de aprobación implementadas.OPERATION_MODE = os.environ.get("K8S_MCP_MODE", "readonly")ALLOWED_NAMESPACES = os.environ.get("K8S_MCP_NAMESPACES", "").split(",")ALLOWED_NAMESPACES = [ns.strip() for ns in ALLOWED_NAMESPACES if ns.strip()]
# Modo dry-run: registra lo que se ejecutaría sin ejecutarDRY_RUN = os.environ.get("K8S_MCP_DRY_RUN", "false").lower() == "true"
def _check_namespace(namespace: str) -> None: """Verifica la lista blanca de namespaces si está configurada.""" if ALLOWED_NAMESPACES and namespace not in ALLOWED_NAMESPACES: raise PermissionError( f"El namespace '{namespace}' no está en la lista permitida: {ALLOWED_NAMESPACES}" )
def _check_readwrite() -> None: """Aplica solo lectura por defecto.""" if OPERATION_MODE != "readwrite": raise PermissionError( "Esta operación requiere modo readwrite. " "Establece K8S_MCP_MODE=readwrite para habilitar, pero usa con precaución." )
def _log_action(action: str, details: dict) -> None: """Registro de auditoría para todas las operaciones.""" log_entry = {"action": action, "details": details, "dry_run": DRY_RUN} print(json.dumps(log_entry, default=str))
# ─────────────────────────────────────────────────────────────# Ciclo de vida del cliente de Kubernetes# ─────────────────────────────────────────────────────────────
@asynccontextmanagerasync def app_lifespan(server: FastMCP) -> AsyncIterator[dict]: """Gestiona el ciclo de vida del cliente de Kubernetes.""" try: config.load_incluster_config() except config.ConfigException: config.load_kube_config()
v1 = client.CoreV1Api() apps_v1 = client.AppsV1Api()
yield {"v1": v1, "apps_v1": apps_v1}
mcp = FastMCP("k8s-sre", lifespan=app_lifespan)
# ─────────────────────────────────────────────────────────────# Herramientas: Diagnóstico de solo lectura# ─────────────────────────────────────────────────────────────
@mcp.tool()async def list_pods( namespace: str = "default", ctx: Context = None,) -> str: """Lista todos los pods en un namespace con su estado y conteo de reinicios.
Args: namespace: El namespace de Kubernetes a consultar. """ _check_namespace(namespace) _log_action("list_pods", {"namespace": namespace})
if DRY_RUN: return f"[DRY RUN] Listaría pods en el namespace '{namespace}'"
v1 = ctx.request_context.lifespan_context["v1"] pods = v1.list_namespaced_pod(namespace=namespace)
lines = [f"{'NAME':<40} {'STATUS':<12} {'RESTARTS':<10} {'AGE'}"] lines.append("-" * 80) for pod in pods.items: restarts = sum(cs.restart_count for cs in (pod.status.container_statuses or [])) lines.append( f"{pod.metadata.name:<40} {pod.status.phase:<12} {restarts:<10} {pod.metadata.creation_timestamp}" )
return "\n".join(lines)
@mcp.tool()async def get_pod_logs( pod_name: str, namespace: str = "default", tail_lines: int = 100, ctx: Context = None,) -> str: """Obtiene los logs de un pod específico.
Args: pod_name: El nombre del pod. namespace: El namespace del pod. tail_lines: Número de líneas recientes a devolver (max 1000). """ _check_namespace(namespace) _log_action("get_pod_logs", {"pod_name": pod_name, "namespace": namespace, "tail_lines": tail_lines})
if DRY_RUN: return f"[DRY RUN] Obtendría logs del pod '{pod_name}' en '{namespace}'"
tail_lines = min(tail_lines, 1000) v1 = ctx.request_context.lifespan_context["v1"] logs = v1.read_namespaced_pod_log( name=pod_name, namespace=namespace, tail_lines=tail_lines, ) return logs
@mcp.tool()async def describe_pod( pod_name: str, namespace: str = "default", ctx: Context = None,) -> str: """Describe un pod, incluyendo eventos, condiciones y detalles de contenedores.
Args: pod_name: El nombre del pod. namespace: El namespace del pod. """ _check_namespace(namespace) _log_action("describe_pod", {"pod_name": pod_name, "namespace": namespace})
if DRY_RUN: return f"[DRY RUN] Describiría el pod '{pod_name}' en '{namespace}'"
v1 = ctx.request_context.lifespan_context["v1"] pod = v1.read_namespaced_pod(name=pod_name, namespace=namespace)
lines = [ f"Name: {pod.metadata.name}", f"Namespace: {pod.metadata.namespace}", f"Node: {pod.spec.node_name}", f"Status: {pod.status.phase}", f"IP: {pod.status.pod_ip}", "", "Conditions:", ] for cond in pod.status.conditions or []: lines.append(f" {cond.type}: {cond.status} ({cond.reason or 'N/A'})")
lines.extend(["", "Container Statuses:"]) for cs in pod.status.container_statuses or []: state = "unknown" if cs.state.running: state = f"running (started: {cs.state.running.started_at})" elif cs.state.waiting: state = f"waiting ({cs.state.waiting.reason}: {cs.state.waiting.message})" elif cs.state.terminated: state = f"terminated (exit: {cs.state.terminated.exit_code})" lines.append(f" {cs.name}: {state} (restarts: {cs.restart_count})")
return "\n".join(lines)
@mcp.tool()async def list_deployments( namespace: str = "default", ctx: Context = None,) -> str: """Lista todos los despliegues en un namespace con estado de réplicas.
Args: namespace: El namespace de Kubernetes a consultar. """ _check_namespace(namespace) _log_action("list_deployments", {"namespace": namespace})
if DRY_RUN: return f"[DRY RUN] Listaría despliegues en el namespace '{namespace}'"
apps_v1 = ctx.request_context.lifespan_context["apps_v1"] deps = apps_v1.list_namespaced_deployment(namespace=namespace)
lines = [f"{'NAME':<40} {'READY':<10} {'UP-TO-DATE':<12} {'AVAILABLE':<12} {'AGE'}"] lines.append("-" * 90) for dep in deps.items: ready = f"{dep.status.ready_replicas or 0}/{dep.spec.replicas}" lines.append( f"{dep.metadata.name:<40} {ready:<10} " f"{dep.status.updated_replicas or 0:<12} {dep.status.available_replicas or 0:<12} " f"{dep.metadata.creation_timestamp}" )
return "\n".join(lines)
@mcp.tool()async def get_node_status(ctx: Context = None) -> str: """Lista todos los nodos del cluster con su capacidad y condiciones.""" _log_action("get_node_status", {})
if DRY_RUN: return "[DRY RUN] Listaría todos los nodos del cluster"
v1 = ctx.request_context.lifespan_context["v1"] nodes = v1.list_node()
lines = [f"{'NAME':<30} {'STATUS':<12} {'CPU':<10} {'MEMORY':<15} {'AGE'}"] lines.append("-" * 85) for node in nodes.items: status = "Ready" for cond in node.status.conditions: if cond.type == "Ready": status = "Ready" if cond.status == "True" else "NotReady"
cpu = node.status.capacity.get("cpu", "N/A") mem = node.status.capacity.get("memory", "N/A") lines.append( f"{node.metadata.name:<30} {status:<12} {cpu:<10} {mem:<15} {node.metadata.creation_timestamp}" )
return "\n".join(lines)
# ─────────────────────────────────────────────────────────────# Herramienta: Remediación (Lectura-escritura, protegida)# ─────────────────────────────────────────────────────────────
@mcp.tool()async def restart_deployment( deployment_name: str, namespace: str = "default", ctx: Context = None,) -> str: """Dispara un reinicio gradual de un despliegue actualizando una anotación.
Esto es seguro y no destructivo, pero requiere modo readwrite.
Args: deployment_name: El despliegue a reiniciar. namespace: El namespace del despliegue. """ _check_namespace(namespace) _check_readwrite() _log_action("restart_deployment", {"deployment_name": deployment_name, "namespace": namespace})
if DRY_RUN: return ( f"[DRY RUN] Reiniciaría el despliegue '{deployment_name}' " f"en el namespace '{namespace}' parcheando la marca de tiempo de despliegue" )
apps_v1 = ctx.request_context.lifespan_context["apps_v1"] now = json.dumps(json.dumps(str(__import__("datetime").datetime.utcnow())))
patch = { "spec": { "template": { "metadata": { "annotations": { "kubectl.kubernetes.io/restartedAt": str(__import__("datetime").datetime.utcnow()) } } } } }
apps_v1.patch_namespaced_deployment( name=deployment_name, namespace=namespace, body=patch, )
return f"Despliegue '{deployment_name}' en '{namespace}' reiniciado exitosamente."
if __name__ == "__main__": mcp.run(transport="stdio")Paso 3: Guardarrailes de seguridad explicados
Así funcionan los mecanismos de seguridad integrados en este servidor, basados en la arquitectura de la Parte 1.
Solo lectura por defecto: La variable OPERATION_MODE se establece por defecto en readonly. La herramienta restart_deployment llama explícitamente a _check_readwrite(), que lanza un PermissionError a menos que el modo sea readwrite. Incluso si Claude determina incorrectamente que debe reiniciar algo, el servidor se niega a menos que un operador haya habilitado explícitamente el acceso de escritura.
Lista blanca de namespaces: La variable ALLOWED_NAMESPACES restringe qué namespaces puede ver el agente. ¿Quieres que solo diagnostique tu namespace de production? Establece K8S_MCP_NAMESPACES=production. El servidor rechaza cualquier solicitud a otro namespace antes de que llegue a la API de Kubernetes.
Modo Dry-Run: Establece K8S_MCP_DRY_RUN=true y cada herramienta devuelve una descripción de lo que haría en lugar de ejecutar la acción. Imprescindible para probar nuevos flujos de trabajo y demostrar capacidades a los equipos de seguridad.
Registro de auditoría: Cada llamada a una herramienta genera una línea JSON en stdout. En producción, envío estos registros a Loki o Elasticsearch para rastrear exactamente qué hizo el agente, cuándo y por qué.
Paso 4: Conectar a Claude Desktop
Claude Desktop soporta servidores MCP mediante un archivo de configuración. En macOS es ~/Library/Application Support/Claude/claude_desktop_config.json. En Linux, la ruta es ~/.config/Claude/claude_desktop_config.json.
{ "mcpServers": { "k8s-sre": { "command": "/home/<USER>/.local/bin/uv", "args": [ "run", "--project", "/home/<USER>/k8s-mcp-server", "python", "server.py" ], "env": { "K8S_MCP_MODE": "readonly", "K8S_MCP_NAMESPACES": "default,staging", "K8S_MCP_DRY_RUN": "false" } } }}Reemplaza <USER> con tu nombre de usuario y ajusta las rutas según sea necesario. Reinicia Claude Desktop después de guardar.
Para verificar la conexión, abre Claude y escribe:
Muéstrame los pods en el namespace default.Claude reconoce la herramienta, llama a list_pods y presenta los resultados en una tabla formateada.
Paso 5: Cola de aprobación para operaciones de escritura
Para producción, nunca confío solo en la variable readwrite. Agrego una cola de aprobación externa. Aquí hay un patrón simplificado basado en archivos:
import osimport jsonimport timefrom pathlib import Path
APPROVAL_DIR = Path(os.environ.get("K8S_MCP_APPROVAL_DIR", "/var/lib/k8s-mcp/approvals"))APPROVAL_DIR.mkdir(parents=True, exist_ok=True)
def request_approval(action: str, details: dict) -> str: """Solicita aprobación humana para una operación de escritura.""" approval_id = f"{int(time.time())}-{action}" request_file = APPROVAL_DIR / f"{approval_id}.pending"
request_file.write_text(json.dumps({"action": action, "details": details}))
return ( f"Aprobación solicitada: {approval_id}\n" f"Acción: {action}\n" f"Detalles: {json.dumps(details)}\n" f"Para aprobar, renombra {approval_id}.pending a {approval_id}.approved" )
def is_approved(approval_id: str) -> bool: """Verifica si se ha otorgado aprobación.""" return (APPROVAL_DIR / f"{approval_id}.approved").exists()Integra esto en restart_deployment antes de que aplique el parche. En un sistema real, uso un bot de Slack o integración con PagerDuty en lugar de aprobación por archivos, pero el patrón se mantiene: nunca dejes que un agente de IA realice acciones destructivas sin consentimiento humano explícito.
Preguntas Frecuentes
¿Cómo pruebo el servidor sin un clúster de Kubernetes real?
Usa el modo dry-run con K8S_MCP_DRY_RUN=true. Cada herramienta devuelve una descripción de lo que haría sin ejecutar nada. Es ideal para probar flujos nuevos y demostrar capacidades a equipos de seguridad.
¿Qué hago si necesito agregar una nueva herramienta de diagnóstico?
Define una función nueva decorada con @mcp.tool() y agrega la llamada a la API de Kubernetes correspondiente. Claude descubre la herramienta automáticamente en la siguiente conexión; no necesitas reiniciar el servidor.
¿Puedo usar esto con múltiples clústeres de Kubernetes?
Sí. Cambia de contexto kubeconfig manualmente o ejecuta instancias separadas del servidor MCP, cada una con su propia lista blanca de namespaces y credenciales.
¿El modo dry-run garantiza seguridad total?
Registra lo que sucedería pero no garantiza éxito en el momento de ejecución; el estado del clúster cambia entre el dry-run y la ejecución real. Combínalo con modo solo lectura y lista blanca de namespaces.
Continúa con la Parte 3 donde desplegamos en producción con Docker, Kubernetes y RBAC, luego exploramos flujos de trabajo multi-herramienta.