docs/Cookbook/Monitoring

Monitoring Dashboard

Agents report health metrics over pub/sub. A dashboard agent aggregates and alerts.

Use Subway's pub/sub to build lightweight monitoring across your agent network. Every agent broadcasts metrics on a topic. A dashboard agent subscribes, aggregates, and alerts when thresholds are crossed.

Architecture#

worker-1.relayworker-2.relayworker-3.relaymetrics.*pub/subdashboard.relaysubscribebroadcastsubscribe

Agent-side: report metrics (TypeScript)#

Add a metrics reporter to any WebSocket agent:

import WebSocket from "ws";
import os from "node:os";
 
const ws = new WebSocket("ws://localhost:9002/ws");
const AGENT_NAME = "worker-1.relay";
 
ws.on("open", () => {
  ws.send(JSON.stringify({ type: "register", name: AGENT_NAME }));
 
  // Report system metrics every 5 seconds
  setInterval(() => {
    ws.send(JSON.stringify({
      type: "broadcast",
      topic: "metrics.system",
      payload: JSON.stringify({
        agent: AGENT_NAME,
        timestamp: Date.now(),
        system: {
          cpuUsage: os.loadavg()[0],
          memoryUsedMb: Math.round((os.totalmem() - os.freemem()) / 1024 / 1024),
          memoryTotalMb: Math.round(os.totalmem() / 1024 / 1024),
          uptimeSeconds: Math.round(process.uptime()),
        },
      }),
    }));
  }, 5000);
});

Application-level metrics — report task-specific data too:

// After processing a task, broadcast via REST
await fetch("http://localhost:9002/v1/broadcast", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({
    topic: "metrics.tasks",
    payload: JSON.stringify({
      agent: AGENT_NAME,
      timestamp: Date.now(),
      tasks: {
        completed: taskCount,
        failed: failCount,
        avgDurationMs: avgDuration,
        queueDepth: queue.length,
      },
    }),
  }),
});

Dashboard agent (TypeScript)#

Aggregates metrics via SSE subscription and fires alerts:

const RELAY = "http://localhost:9002";
 
interface AgentMetrics {
  lastSeen: number;
  system?: { cpuUsage: number; memoryUsedMb: number; memoryTotalMb: number };
  tasks?: { completed: number; failed: number; avgDurationMs: number; queueDepth: number };
}
 
const agents = new Map<string, AgentMetrics>();
 
// Subscribe to all metrics topics via SSE
const systemEvents = new EventSource(`${RELAY}/v1/subscribe?topic=metrics.system`);
const taskEvents = new EventSource(`${RELAY}/v1/subscribe?topic=metrics.tasks`);
 
systemEvents.onmessage = (e) => {
  const data = JSON.parse(e.data);
  const existing = agents.get(data.agent) || { lastSeen: 0 };
  agents.set(data.agent, { ...existing, lastSeen: Date.now(), system: data.system });
  checkAlerts(data.agent);
};
 
taskEvents.onmessage = (e) => {
  const data = JSON.parse(e.data);
  const existing = agents.get(data.agent) || { lastSeen: 0 };
  agents.set(data.agent, { ...existing, lastSeen: Date.now(), tasks: data.tasks });
  checkAlerts(data.agent);
};
 
// Alert rules
function checkAlerts(agentName: string) {
  const metrics = agents.get(agentName);
  if (!metrics) return;
 
  if (metrics.system && metrics.system.cpuUsage > 4.0) {
    sendAlert(`🔴 ${agentName}: CPU load ${metrics.system.cpuUsage.toFixed(1)}`);
  }
 
  if (metrics.system) {
    const memPct = metrics.system.memoryUsedMb / metrics.system.memoryTotalMb;
    if (memPct > 0.9) {
      sendAlert(`🔴 ${agentName}: memory at ${(memPct * 100).toFixed(0)}%`);
    }
  }
 
  if (metrics.tasks && metrics.tasks.completed > 0) {
    const failRate = metrics.tasks.failed / (metrics.tasks.completed + metrics.tasks.failed);
    if (failRate > 0.1) {
      sendAlert(`🟡 ${agentName}: ${(failRate * 100).toFixed(0)}% task failure rate`);
    }
  }
}
 
function sendAlert(message: string) {
  console.error(`[ALERT] ${message}`);
  // Broadcast alerts for other systems to pick up
  fetch(`${RELAY}/v1/broadcast`, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      topic: "alerts.agent",
      payload: JSON.stringify({ message, timestamp: Date.now() }),
    }),
  });
}
 
// Detect agents that stopped reporting
setInterval(() => {
  const staleThreshold = 30_000;
  const now = Date.now();
  for (const [name, metrics] of agents) {
    if (now - metrics.lastSeen > staleThreshold) {
      sendAlert(`⚪ ${name}: no metrics for ${Math.round((now - metrics.lastSeen) / 1000)}s`);
    }
  }
}, 10_000);
 
// Periodic summary
setInterval(() => {
  console.log(`\n--- Agent Health (${agents.size} agents) ---`);
  for (const [name, m] of agents) {
    const ago = Math.round((Date.now() - m.lastSeen) / 1000);
    const cpu = m.system ? `cpu:${m.system.cpuUsage.toFixed(1)}` : "cpu:?";
    const mem = m.system ? `mem:${Math.round(m.system.memoryUsedMb)}MB` : "mem:?";
    const tasks = m.tasks ? `tasks:${m.tasks.completed} fail:${m.tasks.failed}` : "";
    console.log(`  ${name} (${ago}s ago) ${cpu} ${mem} ${tasks}`);
  }
}, 15_000);
 
console.log("dashboard monitoring started");

RPC health checks#

For on-demand health checks (instead of continuous reporting), use the REST bridge:

// Any WebSocket agent: respond to health check calls
ws.on("message", (data) => {
  const msg = JSON.parse(data.toString());
  if (msg.type === "inbound_call" && msg.method === "health") {
    ws.send(JSON.stringify({
      type: "call_response",
      correlation_id: msg.correlation_id,
      payload: JSON.stringify({
        status: "ok",
        uptime: process.uptime(),
        memory: process.memoryUsage(),
        version: "1.2.0",
      }),
    }));
  }
});
 
// Dashboard: check a specific agent via REST
const health = await fetch(`${RELAY}/v1/call`, {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({
    to: "worker-1.relay",
    method: "health",
    payload: "",
  }),
}).then((r) => r.json());
 
console.log(health);

When to use this pattern#

✅ Monitoring distributed agent systems without centralized infrastructure ✅ Lightweight health tracking (not a Prometheus replacement) ✅ Agent liveness detection across networks ✅ Custom alerting logic close to the agents

❌ High-cardinality metrics (use Prometheus/Grafana) ❌ Long-term metric storage and querying (use a time-series database) ❌ Compliance/audit logging (use a durable log system)