Pub/Sub
Topic-based broadcast with wildcard matching.
Broadcasting#
let msg = node.new_agent_message("alert", b"cpu spike on worker-3".to_vec());
node.broadcast("metrics.cpu", msg).await?;Broadcasts go to all agents subscribed to a matching topic.
Subscribing#
node.subscribe("metrics.*", |msg: AgentMessage| {
println!("[{}] {} says: {}",
msg.metadata.get("broadcast_topic").unwrap_or(&"unknown".to_string()),
msg.sender_name,
String::from_utf8_lossy(&msg.payload)
);
});Wildcard matching#
| Pattern | Matches | Doesn't match |
|---|---|---|
metrics.cpu | metrics.cpu | metrics.mem |
metrics.* | metrics.cpu, metrics.mem, metrics.disk | metrics.cpu.temp |
* | Everything | — |
Wildcards match a single level. metrics.* catches metrics.cpu but not metrics.cpu.temp.
Unsubscribing#
node.unsubscribe("metrics.*");How it works#
Subway uses gossipsub for pub/sub. All agents share a single gossipsub topic (subway/agent). Application-level topics (like metrics.cpu) are embedded in the message metadata:
broadcast("metrics.cpu", msg)setsmetadata["broadcast_topic"] = "metrics.cpu"and publishes to the shared gossipsub topic- All connected agents receive the gossipsub message
- Each agent's subscriber checks if its pattern matches the
broadcast_topic - Matching subscribers fire their handlers
This design means the gossipsub mesh forms naturally — all agents subscribe to the same underlying topic, so the mesh always has peers.
No self-delivery#
Broadcasters never receive their own messages. The dispatcher checks sender_peer_id and skips messages from the local agent.
The gossipsub mesh needs a moment to form after agents connect. If you broadcast immediately after .build(), subscribers might not receive it. In practice, ~500ms is enough.