Python
Use Subway from Python via the REST and WebSocket bridge.
New to Subway? Start with the Python quickstart — you'll be connected in 5 minutes. This guide covers the full protocol, advanced patterns, and production usage.
Subway's core is Rust, but you can use Subway from Python in two ways: the official subway-sdk package (recommended) or raw HTTP/WebSocket calls to the bridge.
Prerequisites#
- Python 3.9+
- A running Subway relay with bridge (the public relay at
relay.subway.devworks)
Option 1: Official SDK (recommended)
pip install subway-sdkThe SDK provides both a stateless REST client and a persistent WebSocket agent. See the subway-py README for the full API.
Option 2: Raw HTTP/WebSocket
pip install requests websocketsREST API: fire-and-forget#
The REST bridge is the fastest way to integrate. No persistent connection needed.
Send a message
import requests
RELAY = "http://localhost:9002"
# Send a direct message to a named agent
resp = requests.post(f"{RELAY}/v1/send", json={
"to": "worker.relay",
"message_type": "task",
"payload": "process this document",
"metadata": {"priority": "high"}
})
resp.raise_for_status()
print("sent:", resp.json())RPC call (request-response)
import requests
RELAY = "http://localhost:9002"
# Call an agent and wait for a response
resp = requests.post(f"{RELAY}/v1/call", json={
"to": "worker.relay",
"method": "summarize",
"payload": "The quick brown fox jumps over the lazy dog.",
"timeout_ms": 5000
})
resp.raise_for_status()
result = resp.json()
if result["success"]:
print("response:", result["payload"])
else:
print("error:", result["error"])Broadcast to a topic
import requests
RELAY = "http://localhost:9002"
# Broadcast to all subscribers on a topic
resp = requests.post(f"{RELAY}/v1/broadcast", json={
"topic": "metrics.cpu",
"message_type": "metric",
"payload": "usage: 42%"
})
resp.raise_for_status()Subscribe via Server-Sent Events
import requests
RELAY = "http://localhost:9002"
# Stream broadcast messages on a topic (SSE)
resp = requests.get(
f"{RELAY}/v1/subscribe",
params={"topic": "metrics.*"},
stream=True
)
for line in resp.iter_lines():
if line:
text = line.decode("utf-8")
if text.startswith("data: "):
print("broadcast:", text[6:])Resolve a name
import requests
RELAY = "http://localhost:9002"
resp = requests.get(f"{RELAY}/v1/resolve/worker.relay")
resp.raise_for_status()
print(resp.json()) # {"name": "worker.relay", "peer_id": "12D3KooW..."}WebSocket: persistent agents#
The WebSocket bridge gives you a full agent identity. Your Python process becomes a first-class Subway agent that can send, receive, handle RPCs, and subscribe to topics — all over a single connection.
The examples below use ws://localhost:9002/ws for local development. For the public relay, use wss://relay.subway.dev/ws.
Connect and register
import asyncio
import json
import websockets
async def main():
async with websockets.connect("ws://localhost:9002/ws") as ws:
# Register your agent
await ws.send(json.dumps({
"type": "register",
"name": "py-agent.relay"
}))
# Wait for confirmation
msg = json.loads(await ws.recv())
assert msg["type"] == "registered"
print(f"connected as {msg['name']} ({msg['peer_id']})")
# Keep listening for messages
async for raw in ws:
msg = json.loads(raw)
print(f"[{msg['type']}] {msg}")
asyncio.run(main())Send and call from WebSocket
import asyncio
import json
import uuid
import websockets
async def main():
async with websockets.connect("ws://localhost:9002/ws") as ws:
await ws.send(json.dumps({"type": "register", "name": "sender.relay"}))
reg = json.loads(await ws.recv())
print(f"registered: {reg['name']}")
# Send a direct message
await ws.send(json.dumps({
"type": "send",
"to": "worker.relay",
"message_type": "task",
"payload": "hello from python"
}))
# Make an RPC call
cid = str(uuid.uuid4())
await ws.send(json.dumps({
"type": "call",
"to": "worker.relay",
"method": "echo",
"payload": "ping",
"correlation_id": cid
}))
# Wait for the RPC response
async for raw in ws:
msg = json.loads(raw)
if msg["type"] == "call_result" and msg["correlation_id"] == cid:
print(f"rpc response: {msg['payload']}")
break
asyncio.run(main())Handle inbound RPCs
import asyncio
import json
import websockets
async def main():
async with websockets.connect("ws://localhost:9002/ws") as ws:
await ws.send(json.dumps({"type": "register", "name": "service.relay"}))
reg = json.loads(await ws.recv())
print(f"service ready: {reg['name']}")
async for raw in ws:
msg = json.loads(raw)
if msg["type"] == "inbound_call":
print(f"rpc from {msg['from_name']}: {msg['method']}")
# Process the request and respond
result = f"processed: {msg['payload']}"
await ws.send(json.dumps({
"type": "call_response",
"correlation_id": msg["correlation_id"],
"success": True,
"payload": result
}))
elif msg["type"] == "message":
print(f"message from {msg['from_name']}: {msg['payload']}")
elif msg["type"] == "broadcast_message":
print(f"[{msg['topic']}] {msg['from_name']}: {msg['payload']}")
asyncio.run(main())Subscribe to topics
import asyncio
import json
import websockets
async def main():
async with websockets.connect("ws://localhost:9002/ws") as ws:
await ws.send(json.dumps({"type": "register", "name": "monitor.relay"}))
await ws.recv() # registered
# Subscribe with wildcard
await ws.send(json.dumps({"type": "subscribe", "topic": "metrics.*"}))
sub = json.loads(await ws.recv())
print(f"subscribed to: {sub['topic']}")
# Listen for broadcasts
async for raw in ws:
msg = json.loads(raw)
if msg["type"] == "broadcast_message":
print(f"[{msg['topic']}] {msg['payload']}")
asyncio.run(main())Patterns#
Worker pool
Multiple Python agents can register with different names and handle RPC calls in parallel:
import asyncio
import json
import websockets
async def worker(worker_id: int):
async with websockets.connect("ws://localhost:9002/ws") as ws:
name = f"worker-{worker_id}.relay"
await ws.send(json.dumps({"type": "register", "name": name}))
await ws.recv()
print(f"{name} ready")
async for raw in ws:
msg = json.loads(raw)
if msg["type"] == "inbound_call":
result = f"worker-{worker_id} handled: {msg['payload']}"
await ws.send(json.dumps({
"type": "call_response",
"correlation_id": msg["correlation_id"],
"success": True,
"payload": result
}))
async def main():
await asyncio.gather(
worker(1),
worker(2),
worker(3),
)
asyncio.run(main())Event-driven pipeline
Chain agents together: one publishes, others subscribe and react:
import asyncio
import json
import websockets
async def publisher():
async with websockets.connect("ws://localhost:9002/ws") as ws:
await ws.send(json.dumps({"type": "register", "name": "ingest.relay"}))
await ws.recv()
for i in range(10):
await ws.send(json.dumps({
"type": "broadcast",
"topic": "events.raw",
"message_type": "event",
"payload": json.dumps({"event_id": i, "data": f"event-{i}"})
}))
await asyncio.sleep(1)
async def consumer():
async with websockets.connect("ws://localhost:9002/ws") as ws:
await ws.send(json.dumps({"type": "register", "name": "processor.relay"}))
await ws.recv()
await ws.send(json.dumps({"type": "subscribe", "topic": "events.*"}))
await ws.recv()
async for raw in ws:
msg = json.loads(raw)
if msg["type"] == "broadcast_message":
event = json.loads(msg["payload"])
print(f"processing event {event['event_id']}")
async def main():
await asyncio.gather(publisher(), consumer())
asyncio.run(main())When to use REST vs WebSocket#
| Use REST when... | Use WebSocket when... |
|---|---|
| Sending one-off messages | Your agent needs to receive messages |
| Simple RPC calls | Handling inbound RPCs |
| Scripts and cron jobs | Long-running services |
| No agent identity needed | You need a named agent on the mesh |