Monitoring Dashboard
Agents report health metrics over pub/sub. A dashboard agent aggregates and alerts.
Use Subway's pub/sub to build lightweight monitoring across your agent network. Every agent broadcasts metrics on a topic. A dashboard agent subscribes, aggregates, and alerts when thresholds are crossed.
Architecture#
Agent-side: report metrics (TypeScript)#
Add a metrics reporter to any WebSocket agent:
import WebSocket from "ws";
import os from "node:os";
const ws = new WebSocket("ws://localhost:9002/ws");
const AGENT_NAME = "worker-1.relay";
ws.on("open", () => {
ws.send(JSON.stringify({ type: "register", name: AGENT_NAME }));
// Report system metrics every 5 seconds
setInterval(() => {
ws.send(JSON.stringify({
type: "broadcast",
topic: "metrics.system",
payload: JSON.stringify({
agent: AGENT_NAME,
timestamp: Date.now(),
system: {
cpuUsage: os.loadavg()[0],
memoryUsedMb: Math.round((os.totalmem() - os.freemem()) / 1024 / 1024),
memoryTotalMb: Math.round(os.totalmem() / 1024 / 1024),
uptimeSeconds: Math.round(process.uptime()),
},
}),
}));
}, 5000);
});Application-level metrics — report task-specific data too:
// After processing a task, broadcast via REST
await fetch("http://localhost:9002/v1/broadcast", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
topic: "metrics.tasks",
payload: JSON.stringify({
agent: AGENT_NAME,
timestamp: Date.now(),
tasks: {
completed: taskCount,
failed: failCount,
avgDurationMs: avgDuration,
queueDepth: queue.length,
},
}),
}),
});Dashboard agent (TypeScript)#
Aggregates metrics via SSE subscription and fires alerts:
const RELAY = "http://localhost:9002";
interface AgentMetrics {
lastSeen: number;
system?: { cpuUsage: number; memoryUsedMb: number; memoryTotalMb: number };
tasks?: { completed: number; failed: number; avgDurationMs: number; queueDepth: number };
}
const agents = new Map<string, AgentMetrics>();
// Subscribe to all metrics topics via SSE
const systemEvents = new EventSource(`${RELAY}/v1/subscribe?topic=metrics.system`);
const taskEvents = new EventSource(`${RELAY}/v1/subscribe?topic=metrics.tasks`);
systemEvents.onmessage = (e) => {
const data = JSON.parse(e.data);
const existing = agents.get(data.agent) || { lastSeen: 0 };
agents.set(data.agent, { ...existing, lastSeen: Date.now(), system: data.system });
checkAlerts(data.agent);
};
taskEvents.onmessage = (e) => {
const data = JSON.parse(e.data);
const existing = agents.get(data.agent) || { lastSeen: 0 };
agents.set(data.agent, { ...existing, lastSeen: Date.now(), tasks: data.tasks });
checkAlerts(data.agent);
};
// Alert rules
function checkAlerts(agentName: string) {
const metrics = agents.get(agentName);
if (!metrics) return;
if (metrics.system && metrics.system.cpuUsage > 4.0) {
sendAlert(`🔴 ${agentName}: CPU load ${metrics.system.cpuUsage.toFixed(1)}`);
}
if (metrics.system) {
const memPct = metrics.system.memoryUsedMb / metrics.system.memoryTotalMb;
if (memPct > 0.9) {
sendAlert(`🔴 ${agentName}: memory at ${(memPct * 100).toFixed(0)}%`);
}
}
if (metrics.tasks && metrics.tasks.completed > 0) {
const failRate = metrics.tasks.failed / (metrics.tasks.completed + metrics.tasks.failed);
if (failRate > 0.1) {
sendAlert(`🟡 ${agentName}: ${(failRate * 100).toFixed(0)}% task failure rate`);
}
}
}
function sendAlert(message: string) {
console.error(`[ALERT] ${message}`);
// Broadcast alerts for other systems to pick up
fetch(`${RELAY}/v1/broadcast`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
topic: "alerts.agent",
payload: JSON.stringify({ message, timestamp: Date.now() }),
}),
});
}
// Detect agents that stopped reporting
setInterval(() => {
const staleThreshold = 30_000;
const now = Date.now();
for (const [name, metrics] of agents) {
if (now - metrics.lastSeen > staleThreshold) {
sendAlert(`⚪ ${name}: no metrics for ${Math.round((now - metrics.lastSeen) / 1000)}s`);
}
}
}, 10_000);
// Periodic summary
setInterval(() => {
console.log(`\n--- Agent Health (${agents.size} agents) ---`);
for (const [name, m] of agents) {
const ago = Math.round((Date.now() - m.lastSeen) / 1000);
const cpu = m.system ? `cpu:${m.system.cpuUsage.toFixed(1)}` : "cpu:?";
const mem = m.system ? `mem:${Math.round(m.system.memoryUsedMb)}MB` : "mem:?";
const tasks = m.tasks ? `tasks:${m.tasks.completed} fail:${m.tasks.failed}` : "";
console.log(` ${name} (${ago}s ago) ${cpu} ${mem} ${tasks}`);
}
}, 15_000);
console.log("dashboard monitoring started");RPC health checks#
For on-demand health checks (instead of continuous reporting), use the REST bridge:
// Any WebSocket agent: respond to health check calls
ws.on("message", (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === "inbound_call" && msg.method === "health") {
ws.send(JSON.stringify({
type: "call_response",
correlation_id: msg.correlation_id,
payload: JSON.stringify({
status: "ok",
uptime: process.uptime(),
memory: process.memoryUsage(),
version: "1.2.0",
}),
}));
}
});
// Dashboard: check a specific agent via REST
const health = await fetch(`${RELAY}/v1/call`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
to: "worker-1.relay",
method: "health",
payload: "",
}),
}).then((r) => r.json());
console.log(health);When to use this pattern#
✅ Monitoring distributed agent systems without centralized infrastructure ✅ Lightweight health tracking (not a Prometheus replacement) ✅ Agent liveness detection across networks ✅ Custom alerting logic close to the agents
❌ High-cardinality metrics (use Prometheus/Grafana) ❌ Long-term metric storage and querying (use a time-series database) ❌ Compliance/audit logging (use a durable log system)