docs/Cookbook/Eval Pipeline

Eval Pipeline

Chain agents into a sequential review pipeline: generate → evaluate → approve.

A common pattern in AI systems: one agent generates output, another evaluates it, and a third makes the final decision. Subway makes this easy with RPC chains — each stage calls the next.

Architecture#

trigger.relayRPCgenerate.relayRPCevaluate.relayRPCapprove.relaybroadcastpipeline.done

Generator (TypeScript)#

The first stage: produce output from a prompt. Connects via WebSocket to receive RPC calls.

import WebSocket from "ws";
 
const ws = new WebSocket("ws://localhost:9002/ws");
 
ws.on("open", () => {
  ws.send(JSON.stringify({ type: "register", name: "generate.relay" }));
  console.log("generator ready");
});
 
ws.on("message", async (data) => {
  const msg = JSON.parse(data.toString());
 
  if (msg.type === "inbound_call" && msg.method === "generate") {
    const { prompt, context } = JSON.parse(msg.payload);
    console.log(`generating for: "${prompt}"`);
 
    // Your LLM call here
    const output = await callLLM(prompt, context);
 
    ws.send(JSON.stringify({
      type: "call_response",
      correlation_id: msg.correlation_id,
      payload: JSON.stringify({
        prompt,
        output,
        model: "gpt-4",
        generatedAt: new Date().toISOString(),
      }),
    }));
  }
});

Evaluator#

The second stage: score the output and flag issues.

import WebSocket from "ws";
 
const ws = new WebSocket("ws://localhost:9002/ws");
 
ws.on("open", () => {
  ws.send(JSON.stringify({ type: "register", name: "evaluate.relay" }));
  console.log("evaluator ready");
});
 
ws.on("message", async (data) => {
  const msg = JSON.parse(data.toString());
 
  if (msg.type === "inbound_call" && msg.method === "evaluate") {
    const generation = JSON.parse(msg.payload);
    console.log(`evaluating output for: "${generation.prompt}"`);
 
    const evaluation = await evaluateOutput(generation);
 
    ws.send(JSON.stringify({
      type: "call_response",
      correlation_id: msg.correlation_id,
      payload: JSON.stringify({
        ...generation,
        evaluation: {
          score: evaluation.score,
          issues: evaluation.issues,
          recommendation: evaluation.score > 0.7 ? "approve" : "reject",
          evaluatedAt: new Date().toISOString(),
        },
      }),
    }));
  }
});

Approver#

The final stage: make the decision and broadcast the result.

import WebSocket from "ws";
 
const RELAY = "http://localhost:9002";
const ws = new WebSocket("ws://localhost:9002/ws");
 
ws.on("open", () => {
  ws.send(JSON.stringify({ type: "register", name: "approve.relay" }));
  console.log("approver ready");
});
 
ws.on("message", async (data) => {
  const msg = JSON.parse(data.toString());
 
  if (msg.type === "inbound_call" && msg.method === "approve") {
    const evaluated = JSON.parse(msg.payload);
    const { evaluation } = evaluated;
 
    console.log(`approval decision: score=${evaluation.score}`);
 
    const decision = {
      ...evaluated,
      decision: {
        approved: evaluation.recommendation === "approve",
        reason: evaluation.recommendation === "approve"
          ? "Quality threshold met"
          : `Rejected: ${evaluation.issues.join(", ")}`,
        decidedAt: new Date().toISOString(),
      },
    };
 
    // Broadcast the final result via REST
    await fetch(`${RELAY}/v1/broadcast`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        topic: "pipeline.done",
        payload: JSON.stringify(decision),
      }),
    });
 
    ws.send(JSON.stringify({
      type: "call_response",
      correlation_id: msg.correlation_id,
      payload: JSON.stringify(decision),
    }));
  }
});

Trigger (runs the pipeline)#

The trigger uses the REST bridge — no persistent connection needed:

const RELAY = "http://localhost:9002";
 
async function rpcCall(to: string, method: string, payload: string) {
  const res = await fetch(`${RELAY}/v1/call`, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ to, method, payload }),
  });
  return res.json();
}
 
async function runPipeline(prompt: string) {
  console.log(`\n--- Pipeline: "${prompt}" ---`);
 
  // Stage 1: generate
  const generated = await rpcCall(
    "generate.relay", "generate",
    JSON.stringify({ prompt, context: {} })
  );
  console.log("✓ generated");
 
  // Stage 2: evaluate
  const evaluated = await rpcCall(
    "evaluate.relay", "evaluate",
    JSON.stringify(generated)
  );
  console.log("✓ evaluated");
 
  // Stage 3: approve
  const result = await rpcCall(
    "approve.relay", "approve",
    JSON.stringify(evaluated)
  );
  console.log("✓ decided");
 
  console.log(`result: ${result.decision.approved ? "✅ APPROVED" : "❌ REJECTED"}`);
  console.log(`reason: ${result.decision.reason}`);
 
  return result;
}
 
await runPipeline("Write a product description for a P2P networking library");

Retry on rejection#

If the evaluator rejects, loop back to the generator with feedback:

async function runPipelineWithRetry(prompt: string, maxRetries = 3) {
  let context: Record<string, unknown> = {};
 
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    console.log(`\nattempt ${attempt}/${maxRetries}`);
 
    const generated = await rpcCall(
      "generate.relay", "generate",
      JSON.stringify({ prompt, context })
    );
 
    const evaluated = await rpcCall(
      "evaluate.relay", "evaluate",
      JSON.stringify(generated)
    );
 
    if (evaluated.evaluation.recommendation === "approve") {
      return await rpcCall("approve.relay", "approve", JSON.stringify(evaluated));
    }
 
    console.log(`rejected (score: ${evaluated.evaluation.score}), retrying...`);
    context = {
      previousAttempt: generated.output,
      issues: evaluated.evaluation.issues,
      instruction: "Address the issues listed and try again.",
    };
  }
 
  throw new Error(`pipeline failed after ${maxRetries} attempts`);
}

Monitoring the pipeline#

Any agent can subscribe to pipeline results via SSE:

// A dashboard or logger agent
const events = new EventSource(
  `${RELAY}/v1/subscribe?topic=pipeline.done`
);
 
events.onmessage = (e) => {
  const result = JSON.parse(e.data);
  console.log(
    `[pipeline] ${result.decision.approved ? "✅" : "❌"} ` +
    `"${result.prompt}" (score: ${result.evaluation.score})`
  );
};

When to use this pattern#

✅ LLM output that needs quality checks before use ✅ Content generation with editorial review ✅ Any process with sequential approval stages ✅ When you want to swap evaluation logic without touching generation

❌ Real-time chat (latency of 3 sequential RPCs adds up) ❌ Simple tasks that don't need quality checks