docs/Core/Pub Sub

Pub/Sub

Topic-based broadcast with wildcard matching.

Broadcasting#

let msg = node.new_agent_message("alert", b"cpu spike on worker-3".to_vec());
node.broadcast("metrics.cpu", msg).await?;

Broadcasts go to all agents subscribed to a matching topic.

Subscribing#

node.subscribe("metrics.*", |msg: AgentMessage| {
    println!("[{}] {} says: {}",
        msg.metadata.get("broadcast_topic").unwrap_or(&"unknown".to_string()),
        msg.sender_name,
        String::from_utf8_lossy(&msg.payload)
    );
});

Wildcard matching#

PatternMatchesDoesn't match
metrics.cpumetrics.cpumetrics.mem
metrics.*metrics.cpu, metrics.mem, metrics.diskmetrics.cpu.temp
*Everything

Wildcards match a single level. metrics.* catches metrics.cpu but not metrics.cpu.temp.

Unsubscribing#

node.unsubscribe("metrics.*");

How it works#

Subway uses gossipsub for pub/sub. All agents share a single gossipsub topic (subway/agent). Application-level topics (like metrics.cpu) are embedded in the message metadata:

  1. broadcast("metrics.cpu", msg) sets metadata["broadcast_topic"] = "metrics.cpu" and publishes to the shared gossipsub topic
  2. All connected agents receive the gossipsub message
  3. Each agent's subscriber checks if its pattern matches the broadcast_topic
  4. Matching subscribers fire their handlers

This design means the gossipsub mesh forms naturally — all agents subscribe to the same underlying topic, so the mesh always has peers.

No self-delivery#

Broadcasters never receive their own messages. The dispatcher checks sender_peer_id and skips messages from the local agent.

Info

The gossipsub mesh needs a moment to form after agents connect. If you broadcast immediately after .build(), subscribers might not receive it. In practice, ~500ms is enough.