docs/Guides/Python

Python

Use Subway from Python via the REST and WebSocket bridge.

Tip

New to Subway? Start with the Python quickstart — you'll be connected in 5 minutes. This guide covers the full protocol, advanced patterns, and production usage.

Subway's core is Rust, but you can use Subway from Python in two ways: the official subway-sdk package (recommended) or raw HTTP/WebSocket calls to the bridge.

Prerequisites#

  • Python 3.9+
  • A running Subway relay with bridge (the public relay at relay.subway.dev works)
pip install subway-sdk

The SDK provides both a stateless REST client and a persistent WebSocket agent. See the subway-py README for the full API.

Option 2: Raw HTTP/WebSocket

pip install requests websockets

REST API: fire-and-forget#

The REST bridge is the fastest way to integrate. No persistent connection needed.

Send a message

import requests
 
RELAY = "http://localhost:9002"
 
# Send a direct message to a named agent
resp = requests.post(f"{RELAY}/v1/send", json={
    "to": "worker.relay",
    "message_type": "task",
    "payload": "process this document",
    "metadata": {"priority": "high"}
})
resp.raise_for_status()
print("sent:", resp.json())

RPC call (request-response)

import requests
 
RELAY = "http://localhost:9002"
 
# Call an agent and wait for a response
resp = requests.post(f"{RELAY}/v1/call", json={
    "to": "worker.relay",
    "method": "summarize",
    "payload": "The quick brown fox jumps over the lazy dog.",
    "timeout_ms": 5000
})
resp.raise_for_status()
 
result = resp.json()
if result["success"]:
    print("response:", result["payload"])
else:
    print("error:", result["error"])

Broadcast to a topic

import requests
 
RELAY = "http://localhost:9002"
 
# Broadcast to all subscribers on a topic
resp = requests.post(f"{RELAY}/v1/broadcast", json={
    "topic": "metrics.cpu",
    "message_type": "metric",
    "payload": "usage: 42%"
})
resp.raise_for_status()

Subscribe via Server-Sent Events

import requests
 
RELAY = "http://localhost:9002"
 
# Stream broadcast messages on a topic (SSE)
resp = requests.get(
    f"{RELAY}/v1/subscribe",
    params={"topic": "metrics.*"},
    stream=True
)
 
for line in resp.iter_lines():
    if line:
        text = line.decode("utf-8")
        if text.startswith("data: "):
            print("broadcast:", text[6:])

Resolve a name

import requests
 
RELAY = "http://localhost:9002"
 
resp = requests.get(f"{RELAY}/v1/resolve/worker.relay")
resp.raise_for_status()
print(resp.json())  # {"name": "worker.relay", "peer_id": "12D3KooW..."}

WebSocket: persistent agents#

The WebSocket bridge gives you a full agent identity. Your Python process becomes a first-class Subway agent that can send, receive, handle RPCs, and subscribe to topics — all over a single connection.

Tip

The examples below use ws://localhost:9002/ws for local development. For the public relay, use wss://relay.subway.dev/ws.

Connect and register

import asyncio
import json
import websockets
 
async def main():
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        # Register your agent
        await ws.send(json.dumps({
            "type": "register",
            "name": "py-agent.relay"
        }))
 
        # Wait for confirmation
        msg = json.loads(await ws.recv())
        assert msg["type"] == "registered"
        print(f"connected as {msg['name']} ({msg['peer_id']})")
 
        # Keep listening for messages
        async for raw in ws:
            msg = json.loads(raw)
            print(f"[{msg['type']}] {msg}")
 
asyncio.run(main())

Send and call from WebSocket

import asyncio
import json
import uuid
import websockets
 
async def main():
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        await ws.send(json.dumps({"type": "register", "name": "sender.relay"}))
        reg = json.loads(await ws.recv())
        print(f"registered: {reg['name']}")
 
        # Send a direct message
        await ws.send(json.dumps({
            "type": "send",
            "to": "worker.relay",
            "message_type": "task",
            "payload": "hello from python"
        }))
 
        # Make an RPC call
        cid = str(uuid.uuid4())
        await ws.send(json.dumps({
            "type": "call",
            "to": "worker.relay",
            "method": "echo",
            "payload": "ping",
            "correlation_id": cid
        }))
 
        # Wait for the RPC response
        async for raw in ws:
            msg = json.loads(raw)
            if msg["type"] == "call_result" and msg["correlation_id"] == cid:
                print(f"rpc response: {msg['payload']}")
                break
 
asyncio.run(main())

Handle inbound RPCs

import asyncio
import json
import websockets
 
async def main():
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        await ws.send(json.dumps({"type": "register", "name": "service.relay"}))
        reg = json.loads(await ws.recv())
        print(f"service ready: {reg['name']}")
 
        async for raw in ws:
            msg = json.loads(raw)
 
            if msg["type"] == "inbound_call":
                print(f"rpc from {msg['from_name']}: {msg['method']}")
 
                # Process the request and respond
                result = f"processed: {msg['payload']}"
                await ws.send(json.dumps({
                    "type": "call_response",
                    "correlation_id": msg["correlation_id"],
                    "success": True,
                    "payload": result
                }))
 
            elif msg["type"] == "message":
                print(f"message from {msg['from_name']}: {msg['payload']}")
 
            elif msg["type"] == "broadcast_message":
                print(f"[{msg['topic']}] {msg['from_name']}: {msg['payload']}")
 
asyncio.run(main())

Subscribe to topics

import asyncio
import json
import websockets
 
async def main():
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        await ws.send(json.dumps({"type": "register", "name": "monitor.relay"}))
        await ws.recv()  # registered
 
        # Subscribe with wildcard
        await ws.send(json.dumps({"type": "subscribe", "topic": "metrics.*"}))
        sub = json.loads(await ws.recv())
        print(f"subscribed to: {sub['topic']}")
 
        # Listen for broadcasts
        async for raw in ws:
            msg = json.loads(raw)
            if msg["type"] == "broadcast_message":
                print(f"[{msg['topic']}] {msg['payload']}")
 
asyncio.run(main())

Patterns#

Worker pool

Multiple Python agents can register with different names and handle RPC calls in parallel:

import asyncio
import json
import websockets
 
async def worker(worker_id: int):
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        name = f"worker-{worker_id}.relay"
        await ws.send(json.dumps({"type": "register", "name": name}))
        await ws.recv()
        print(f"{name} ready")
 
        async for raw in ws:
            msg = json.loads(raw)
            if msg["type"] == "inbound_call":
                result = f"worker-{worker_id} handled: {msg['payload']}"
                await ws.send(json.dumps({
                    "type": "call_response",
                    "correlation_id": msg["correlation_id"],
                    "success": True,
                    "payload": result
                }))
 
async def main():
    await asyncio.gather(
        worker(1),
        worker(2),
        worker(3),
    )
 
asyncio.run(main())

Event-driven pipeline

Chain agents together: one publishes, others subscribe and react:

import asyncio
import json
import websockets
 
async def publisher():
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        await ws.send(json.dumps({"type": "register", "name": "ingest.relay"}))
        await ws.recv()
 
        for i in range(10):
            await ws.send(json.dumps({
                "type": "broadcast",
                "topic": "events.raw",
                "message_type": "event",
                "payload": json.dumps({"event_id": i, "data": f"event-{i}"})
            }))
            await asyncio.sleep(1)
 
async def consumer():
    async with websockets.connect("ws://localhost:9002/ws") as ws:
        await ws.send(json.dumps({"type": "register", "name": "processor.relay"}))
        await ws.recv()
 
        await ws.send(json.dumps({"type": "subscribe", "topic": "events.*"}))
        await ws.recv()
 
        async for raw in ws:
            msg = json.loads(raw)
            if msg["type"] == "broadcast_message":
                event = json.loads(msg["payload"])
                print(f"processing event {event['event_id']}")
 
async def main():
    await asyncio.gather(publisher(), consumer())
 
asyncio.run(main())

When to use REST vs WebSocket#

Use REST when...Use WebSocket when...
Sending one-off messagesYour agent needs to receive messages
Simple RPC callsHandling inbound RPCs
Scripts and cron jobsLong-running services
No agent identity neededYou need a named agent on the mesh

Next steps#