Eval Pipeline
Chain agents into a sequential review pipeline: generate → evaluate → approve.
A common pattern in AI systems: one agent generates output, another evaluates it, and a third makes the final decision. Subway makes this easy with RPC chains — each stage calls the next.
Architecture#
Generator (TypeScript)#
The first stage: produce output from a prompt. Connects via WebSocket to receive RPC calls.
import WebSocket from "ws";
const ws = new WebSocket("ws://localhost:9002/ws");
ws.on("open", () => {
ws.send(JSON.stringify({ type: "register", name: "generate.relay" }));
console.log("generator ready");
});
ws.on("message", async (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === "inbound_call" && msg.method === "generate") {
const { prompt, context } = JSON.parse(msg.payload);
console.log(`generating for: "${prompt}"`);
// Your LLM call here
const output = await callLLM(prompt, context);
ws.send(JSON.stringify({
type: "call_response",
correlation_id: msg.correlation_id,
payload: JSON.stringify({
prompt,
output,
model: "gpt-4",
generatedAt: new Date().toISOString(),
}),
}));
}
});Evaluator#
The second stage: score the output and flag issues.
import WebSocket from "ws";
const ws = new WebSocket("ws://localhost:9002/ws");
ws.on("open", () => {
ws.send(JSON.stringify({ type: "register", name: "evaluate.relay" }));
console.log("evaluator ready");
});
ws.on("message", async (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === "inbound_call" && msg.method === "evaluate") {
const generation = JSON.parse(msg.payload);
console.log(`evaluating output for: "${generation.prompt}"`);
const evaluation = await evaluateOutput(generation);
ws.send(JSON.stringify({
type: "call_response",
correlation_id: msg.correlation_id,
payload: JSON.stringify({
...generation,
evaluation: {
score: evaluation.score,
issues: evaluation.issues,
recommendation: evaluation.score > 0.7 ? "approve" : "reject",
evaluatedAt: new Date().toISOString(),
},
}),
}));
}
});Approver#
The final stage: make the decision and broadcast the result.
import WebSocket from "ws";
const RELAY = "http://localhost:9002";
const ws = new WebSocket("ws://localhost:9002/ws");
ws.on("open", () => {
ws.send(JSON.stringify({ type: "register", name: "approve.relay" }));
console.log("approver ready");
});
ws.on("message", async (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === "inbound_call" && msg.method === "approve") {
const evaluated = JSON.parse(msg.payload);
const { evaluation } = evaluated;
console.log(`approval decision: score=${evaluation.score}`);
const decision = {
...evaluated,
decision: {
approved: evaluation.recommendation === "approve",
reason: evaluation.recommendation === "approve"
? "Quality threshold met"
: `Rejected: ${evaluation.issues.join(", ")}`,
decidedAt: new Date().toISOString(),
},
};
// Broadcast the final result via REST
await fetch(`${RELAY}/v1/broadcast`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
topic: "pipeline.done",
payload: JSON.stringify(decision),
}),
});
ws.send(JSON.stringify({
type: "call_response",
correlation_id: msg.correlation_id,
payload: JSON.stringify(decision),
}));
}
});Trigger (runs the pipeline)#
The trigger uses the REST bridge — no persistent connection needed:
const RELAY = "http://localhost:9002";
async function rpcCall(to: string, method: string, payload: string) {
const res = await fetch(`${RELAY}/v1/call`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ to, method, payload }),
});
return res.json();
}
async function runPipeline(prompt: string) {
console.log(`\n--- Pipeline: "${prompt}" ---`);
// Stage 1: generate
const generated = await rpcCall(
"generate.relay", "generate",
JSON.stringify({ prompt, context: {} })
);
console.log("✓ generated");
// Stage 2: evaluate
const evaluated = await rpcCall(
"evaluate.relay", "evaluate",
JSON.stringify(generated)
);
console.log("✓ evaluated");
// Stage 3: approve
const result = await rpcCall(
"approve.relay", "approve",
JSON.stringify(evaluated)
);
console.log("✓ decided");
console.log(`result: ${result.decision.approved ? "✅ APPROVED" : "❌ REJECTED"}`);
console.log(`reason: ${result.decision.reason}`);
return result;
}
await runPipeline("Write a product description for a P2P networking library");Retry on rejection#
If the evaluator rejects, loop back to the generator with feedback:
async function runPipelineWithRetry(prompt: string, maxRetries = 3) {
let context: Record<string, unknown> = {};
for (let attempt = 1; attempt <= maxRetries; attempt++) {
console.log(`\nattempt ${attempt}/${maxRetries}`);
const generated = await rpcCall(
"generate.relay", "generate",
JSON.stringify({ prompt, context })
);
const evaluated = await rpcCall(
"evaluate.relay", "evaluate",
JSON.stringify(generated)
);
if (evaluated.evaluation.recommendation === "approve") {
return await rpcCall("approve.relay", "approve", JSON.stringify(evaluated));
}
console.log(`rejected (score: ${evaluated.evaluation.score}), retrying...`);
context = {
previousAttempt: generated.output,
issues: evaluated.evaluation.issues,
instruction: "Address the issues listed and try again.",
};
}
throw new Error(`pipeline failed after ${maxRetries} attempts`);
}Monitoring the pipeline#
Any agent can subscribe to pipeline results via SSE:
// A dashboard or logger agent
const events = new EventSource(
`${RELAY}/v1/subscribe?topic=pipeline.done`
);
events.onmessage = (e) => {
const result = JSON.parse(e.data);
console.log(
`[pipeline] ${result.decision.approved ? "✅" : "❌"} ` +
`"${result.prompt}" (score: ${result.evaluation.score})`
);
};When to use this pattern#
✅ LLM output that needs quality checks before use ✅ Content generation with editorial review ✅ Any process with sequential approval stages ✅ When you want to swap evaluation logic without touching generation
❌ Real-time chat (latency of 3 sequential RPCs adds up) ❌ Simple tasks that don't need quality checks