Heartbeat Architecture
Real-time system monitoring through SSE-based telemetry streaming.
rbee uses a pull-based telemetry system where Queen subscribes to Hive SSE streams after a one-time discovery handshake. Hives stream worker telemetry to Queen, which aggregates and broadcasts to clients.
Overview
The telemetry system provides real-time visibility into the entire rbee cluster:
- Hive monitors worker processes via cgroup (Linux) or process monitoring
- Hive sends one-time discovery callback to Queen
- Queen subscribes to Hive’s SSE stream
- Hive streams telemetry (CPU, memory, GPU stats) every 1 second
- Queen broadcasts aggregated status to clients every 2.5 seconds
- Queen uses telemetry for scheduling decisions
Key principles:
- Pull-based: Queen subscribes to Hive SSE streams (not push-based POST)
- Bidirectional handshake: Either Queen or Hive can start first
- Standalone capable: Hive can run without Queen
- Process monitoring: Hive tracks workers via cgroup filesystem (Linux)
Architecture
Discovery & Telemetry Flow
┌─────────────────────────────────────────────────────┐
│ Hive (Telemetry Source) │
│ 1. POST /v1/hive/ready (one-time discovery) │
│ 2. Streams telemetry via SSE (every 1s) │
│ - GET /v1/heartbeats/stream │
│ - Includes worker process stats │
└──────────────────┬──────────────────────────────────┘
│
│ 1. Discovery callback
▼
┌─────────────────────────────────────────────────────┐
│ Queen (Subscriber + Aggregator) │
│ - Receives discovery callback │
│ - Subscribes to Hive SSE stream │
│ - Receives HiveTelemetry events │
│ - Updates TelemetryRegistry (RAM) │
│ - Broadcasts to clients (every 2.5s) │
└──────────────────┬──────────────────────────────────┘
│
│ 2. SSE subscription
▼
Hive
│
│ 3. Telemetry stream
▼
Queen
│
│ 4. Aggregated broadcast
▼
┌─────────────────────────────────────────────────────┐
│ Clients (Web UI, CLI) │
│ - Subscribe: GET /v1/heartbeats/stream (on Queen) │
│ - Receive HiveTelemetry + Queen events │
└─────────────────────────────────────────────────────┘Queen SUBSCRIBES to Hive SSE streams (pull), rather than receiving POST requests (push). This enables efficient real-time streaming without polling.
Discovery Handshake
Bidirectional Discovery Protocol
Queen and Hive can start in any order. The system handles both scenarios:
Scenario 1: Hive starts first
- Hive starts and attempts to send
POST /v1/hive/readyto Queen - If Queen is not running: Exponential backoff (0s, 2s, 4s, 8s, 16s)
- After 5 failed attempts: Hive waits for Queen to discover it
- When Queen starts: Queen can discover Hive via other means
- Once connected: Queen subscribes to Hive’s SSE stream
Scenario 2: Queen starts first
- Queen starts and waits for Hive callbacks
- When Hive starts: Hive sends
POST /v1/hive/ready - Queen receives callback and immediately subscribes to Hive SSE
- Telemetry flows continuously
Scenario 3: Standalone Hive
- Hive starts without
--queen-urlflag - Hive runs in standalone mode (no discovery attempts)
- Hive still monitors workers and serves local API
- Can be connected to Queen later
The system is designed to handle any startup order. Hive can run standalone, or Queen and Hive can start independently and discover each other.
Discovery Callback
Endpoint: POST /v1/hive/ready (on Queen)
Request:
{
"hive_id": "gpu-0",
"hive_url": "http://192.168.1.100:7835"
}Response:
{
"status": "ok",
"message": "Subscribed to hive gpu-0 SSE stream"
}What happens:
- Queen receives callback
- Queen validates Hive URL
- Queen spawns background task to subscribe to
GET {hive_url}/v1/heartbeats/stream - Subscription remains active until connection drops
- Queen auto-reconnects if connection fails
Worker Process Monitoring
Cgroup-Based Monitoring (Linux)
Hive monitors worker processes using the cgroup filesystem:
Cgroup Structure:
/sys/fs/cgroup/rbee.slice/
├── llm/
│ ├── 8080/ # Worker instance (port-based)
│ │ ├── cgroup.procs
│ │ ├── cpu.stat
│ │ ├── memory.current
│ │ └── io.stat
│ └── 8081/
└── vllm/
└── 8000/What Hive Monitors:
- CPU usage - Percentage from
cpu.stat - Memory (RSS) - Resident memory from
memory.current - I/O rates - Read/write MB/s from
io.stat - GPU utilization - Via nvidia-smi or similar
- VRAM usage - GPU memory consumption
- Process uptime - Time since worker started
- Model name - Extracted from command line args
Collection Frequency: Every 1 second
On Linux, Hive uses cgroup for accurate resource monitoring. On macOS/Windows, it falls back to process monitoring tools (ps, tasklist) with best-effort accuracy.
Worker Lifecycle
When Hive spawns a worker:
- Create cgroup:
/sys/fs/cgroup/rbee.slice/{group}/{instance}/ - Start worker process in cgroup
- Worker process begins loading model
- Hive monitors via cgroup filesystem
- Telemetry includes worker in next broadcast
When worker stops:
- Process exits (graceful or crash)
- Hive detects via cgroup (no PIDs in
cgroup.procs) - Worker removed from telemetry
- Queen detects absence in next telemetry event
- Queen stops routing requests to that worker
Event Types
HiveTelemetry Event
Hives stream telemetry events every 1 second, including worker process stats.
{
"type": "hive_telemetry",
"hive_id": "gpu-0",
"timestamp": "2025-10-29T12:00:00Z",
"workers": [
{
"pid": 1234,
"model": "meta-llama/Llama-3.2-1B",
"device": "GPU-0",
"port": 9301,
"status": "running",
"cpu_percent": 45.2,
"memory_mb": 2048
},
{
"pid": 1235,
"model": "meta-llama/Llama-3.2-3B",
"device": "GPU-1",
"port": 9302,
"status": "running",
"cpu_percent": 67.8,
"memory_mb": 4096
}
]
}Fields:
hive_id- Hive identifiertimestamp- Telemetry timestampworkers- Array of worker process stats (frompsmonitoring)
Worker Process Stats:
pid- Process IDmodel- Model being serveddevice- GPU/CPU deviceport- HTTP portstatus- Process status (running, stopped)cpu_percent- CPU usage percentagememory_mb- Memory usage in MB
Queen Heartbeat
Queen sends its own heartbeat every 2.5 seconds with aggregated cluster status.
{
"type": "queen",
"workers_online": 2,
"workers_available": 2,
"hives_online": 1,
"hives_available": 1,
"worker_ids": ["worker-1", "worker-2"],
"hive_ids": ["gpu-0"],
"timestamp": "2025-10-29T12:00:00Z"
}Fields:
workers_online- Total workers from all hive telemetryworkers_available- Workers in running statushives_online- Total hives streaming telemetryhives_available- Hives with healthy statusworker_ids- List of all worker IDshive_ids- List of all hive IDs
The system only has two event types: HiveTelemetry (from hives) and Queen (from queen). Workers are included in HiveTelemetry, not as separate events.
SSE Endpoint
Subscribe to Heartbeats
curl -N http://localhost:7833/v1/heartbeats/streamResponse (SSE stream):
The SSE stream remains open indefinitely. Clients receive updates as they happen in real-time.
Client Implementation
JavaScript (EventSource)
// Connect to heartbeat stream
const eventSource = new EventSource('http://localhost:7833/v1/heartbeats/stream');
// Handle all events
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'queen':
console.log(`Queen: ${data.workers_online} workers online`);
break;
case 'hive_telemetry':
console.log(`Hive ${data.hive_id}: ${data.workers.length} workers`);
data.workers.forEach(w => {
console.log(` - PID ${w.pid}: ${w.model} (${w.status})`);
});
break;
}
};
// Handle errors
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};
// Close when done
// eventSource.close();Python (requests)
import requests
import json
# Connect to stream
response = requests.get(
'http://localhost:7833/v1/heartbeats/stream',
stream=True
)
# Process events
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = json.loads(line[6:]) # Remove 'data: ' prefix
if data['type'] == 'queen':
print(f"Queen: {data['workers_online']} workers online")
elif data['type'] == 'hive_telemetry':
print(f"Hive {data['hive_id']}: {len(data['workers'])} workers")
for worker in data['workers']:
print(f" - PID {worker['pid']}: {worker['model']}")cURL (Testing)
# Subscribe and watch
curl -N http://localhost:7833/v1/heartbeats/stream
# With timeout
timeout 10 curl -N http://localhost:7833/v1/heartbeats/streamTelemetry Timing
Frequencies
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
| Hive Telemetry Stream | interval | Required | — | Every 1 second - Hive streams worker stats to Queen |
| Queen Broadcast | interval | Required | — | Every 2.5 seconds - Queen broadcasts aggregated status |
| Discovery Callback | event | Required | — | One-time - Hive sends POST /v1/hive/ready on startup |
| Connection Timeout | duration | Optional | — | SSE connection drops - Hive marked offline, auto-reconnect attempted |
Connection Behavior
When SSE connection drops:
- Immediate: Hive marked as offline in TelemetryRegistry
- Auto-reconnect: Queen attempts to reconnect to Hive SSE stream
- Exponential backoff: Retry with increasing delays
- Registry update: Hive marked online when reconnected
Queen automatically:
- Detects SSE connection drops
- Marks hive as offline
- Stops routing to workers on that hive
- Attempts reconnection
- Updates registry when reconnected
Queen automatically attempts to reconnect to Hive SSE streams if connections drop. No manual intervention required.
Registry Updates
TelemetryRegistry
Queen maintains an in-memory telemetry registry updated from Hive SSE streams:
// Hive streams telemetry via SSE
// Queen receives HiveTelemetry event
{
"type": "hive_telemetry",
"hive_id": "gpu-0",
"workers": [
{"pid": 1234, "model": "llama-3-8b", "status": "running"}
]
}
// Queen updates TelemetryRegistry
hive_registry.update_workers(&hive_id, workers);
// Query for scheduling
let available_workers = hive_registry.list_available_workers();
let online_hives = hive_registry.list_online_hives();Registry characteristics:
- Stored in RAM only (ephemeral)
- Lost on Queen restart
- Rebuilt from Hive SSE streams automatically
- Updated every 1 second from each Hive
- Contains both Hive and Worker information
Data flow:
- Hive monitors worker processes (
pscommand) - Hive streams worker stats via SSE (every 1s)
- Queen receives HiveTelemetry events
- Queen updates TelemetryRegistry
- Queen uses registry for scheduling decisions
Scheduling Use Case
How Queen Uses Telemetry for Scheduling
Queen maintains a TelemetryRegistry updated from Hive SSE streams. This registry is used for all scheduling decisions.
When inference request arrives:
// 1. Query TelemetryRegistry for available workers
let available_workers = telemetry_registry
.list_available_workers()
.filter(|w| w.model == requested_model)
.filter(|w| w.gpu_util_pct < 80.0) // Not overloaded
.collect();
// 2. Select best worker (lowest GPU utilization)
let worker = available_workers
.iter()
.min_by_key(|w| w.gpu_util_pct as u32)?;
// 3. Route request directly to worker
route_to_worker(worker.port, request).await;Scheduling Criteria:
- ✅ Model match - Worker must have the requested model loaded
- ✅ Worker status - Must be in “running” state
- ✅ GPU utilization - Prefer workers with lower GPU load
- ✅ VRAM availability - Ensure worker has enough VRAM
- ✅ Hive online - Hive must be streaming telemetry
Load Balancing:
- Queen distributes requests across multiple workers for same model
- Selects worker with lowest GPU utilization
- Avoids overloaded workers (>80% GPU util)
- Considers VRAM usage for large models
Failure Handling:
- If worker stops responding: Removed from telemetry automatically
- If Hive disconnects: All workers on that Hive marked offline
- If no workers available: Queen can trigger worker spawn on Hive
Telemetry updates every 1 second enable near-real-time load balancing. Queen always routes to the least-loaded worker for optimal performance.
Use Cases
Real-Time Monitoring
Best Practices
1. Handle Reconnections
SSE connections can drop. Implement automatic reconnection:
function connectHeartbeat() {
const eventSource = new EventSource('/v1/heartbeats/stream');
eventSource.onerror = (error) => {
console.error('Connection lost, reconnecting in 5s...');
eventSource.close();
setTimeout(connectHeartbeat, 5000);
};
return eventSource;
}
const eventSource = connectHeartbeat();2. Filter Events
Process only events you need:
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
// Only process worker events
if (data.type === 'worker') {
handleWorkerUpdate(data);
}
};3. Batch Updates
Don’t update UI on every event. Batch updates:
let pendingUpdates = [];
eventSource.onmessage = (event) => {
pendingUpdates.push(JSON.parse(event.data));
};
// Update UI every second
setInterval(() => {
if (pendingUpdates.length > 0) {
updateUI(pendingUpdates);
pendingUpdates = [];
}
}, 1000);