Skip to content
Skip to Content
Architecture & ConceptsHeartbeat Architecture

Heartbeat Architecture

Real-time system monitoring through SSE-based telemetry streaming.

Overview

The telemetry system provides real-time visibility into the entire rbee cluster:

  • Hive monitors worker processes via cgroup (Linux) or process monitoring
  • Hive sends one-time discovery callback to Queen
  • Queen subscribes to Hive’s SSE stream
  • Hive streams telemetry (CPU, memory, GPU stats) every 1 second
  • Queen broadcasts aggregated status to clients every 2.5 seconds
  • Queen uses telemetry for scheduling decisions

Key principles:

  • Pull-based: Queen subscribes to Hive SSE streams (not push-based POST)
  • Bidirectional handshake: Either Queen or Hive can start first
  • Standalone capable: Hive can run without Queen
  • Process monitoring: Hive tracks workers via cgroup filesystem (Linux)

Architecture

Discovery & Telemetry Flow

┌─────────────────────────────────────────────────────┐ │ Hive (Telemetry Source) │ │ 1. POST /v1/hive/ready (one-time discovery) │ │ 2. Streams telemetry via SSE (every 1s) │ │ - GET /v1/heartbeats/stream │ │ - Includes worker process stats │ └──────────────────┬──────────────────────────────────┘ │ 1. Discovery callback ┌─────────────────────────────────────────────────────┐ │ Queen (Subscriber + Aggregator) │ │ - Receives discovery callback │ │ - Subscribes to Hive SSE stream │ │ - Receives HiveTelemetry events │ │ - Updates TelemetryRegistry (RAM) │ │ - Broadcasts to clients (every 2.5s) │ └──────────────────┬──────────────────────────────────┘ │ 2. SSE subscription Hive │ 3. Telemetry stream Queen │ 4. Aggregated broadcast ┌─────────────────────────────────────────────────────┐ │ Clients (Web UI, CLI) │ │ - Subscribe: GET /v1/heartbeats/stream (on Queen) │ │ - Receive HiveTelemetry + Queen events │ └─────────────────────────────────────────────────────┘

Discovery Handshake

Bidirectional Discovery Protocol

Queen and Hive can start in any order. The system handles both scenarios:

Scenario 1: Hive starts first

  1. Hive starts and attempts to send POST /v1/hive/ready to Queen
  2. If Queen is not running: Exponential backoff (0s, 2s, 4s, 8s, 16s)
  3. After 5 failed attempts: Hive waits for Queen to discover it
  4. When Queen starts: Queen can discover Hive via other means
  5. Once connected: Queen subscribes to Hive’s SSE stream

Scenario 2: Queen starts first

  1. Queen starts and waits for Hive callbacks
  2. When Hive starts: Hive sends POST /v1/hive/ready
  3. Queen receives callback and immediately subscribes to Hive SSE
  4. Telemetry flows continuously

Scenario 3: Standalone Hive

  1. Hive starts without --queen-url flag
  2. Hive runs in standalone mode (no discovery attempts)
  3. Hive still monitors workers and serves local API
  4. Can be connected to Queen later

Discovery Callback

Endpoint: POST /v1/hive/ready (on Queen)

Request:

{ "hive_id": "gpu-0", "hive_url": "http://192.168.1.100:7835" }

Response:

{ "status": "ok", "message": "Subscribed to hive gpu-0 SSE stream" }

What happens:

  1. Queen receives callback
  2. Queen validates Hive URL
  3. Queen spawns background task to subscribe to GET {hive_url}/v1/heartbeats/stream
  4. Subscription remains active until connection drops
  5. Queen auto-reconnects if connection fails

Worker Process Monitoring

Cgroup-Based Monitoring (Linux)

Hive monitors worker processes using the cgroup filesystem:

Cgroup Structure:

/sys/fs/cgroup/rbee.slice/ ├── llm/ │ ├── 8080/ # Worker instance (port-based) │ │ ├── cgroup.procs │ │ ├── cpu.stat │ │ ├── memory.current │ │ └── io.stat │ └── 8081/ └── vllm/ └── 8000/

What Hive Monitors:

  • CPU usage - Percentage from cpu.stat
  • Memory (RSS) - Resident memory from memory.current
  • I/O rates - Read/write MB/s from io.stat
  • GPU utilization - Via nvidia-smi or similar
  • VRAM usage - GPU memory consumption
  • Process uptime - Time since worker started
  • Model name - Extracted from command line args

Collection Frequency: Every 1 second

Worker Lifecycle

When Hive spawns a worker:

  1. Create cgroup: /sys/fs/cgroup/rbee.slice/{group}/{instance}/
  2. Start worker process in cgroup
  3. Worker process begins loading model
  4. Hive monitors via cgroup filesystem
  5. Telemetry includes worker in next broadcast

When worker stops:

  1. Process exits (graceful or crash)
  2. Hive detects via cgroup (no PIDs in cgroup.procs)
  3. Worker removed from telemetry
  4. Queen detects absence in next telemetry event
  5. Queen stops routing requests to that worker

Event Types

HiveTelemetry Event

Hives stream telemetry events every 1 second, including worker process stats.

{ "type": "hive_telemetry", "hive_id": "gpu-0", "timestamp": "2025-10-29T12:00:00Z", "workers": [ { "pid": 1234, "model": "meta-llama/Llama-3.2-1B", "device": "GPU-0", "port": 9301, "status": "running", "cpu_percent": 45.2, "memory_mb": 2048 }, { "pid": 1235, "model": "meta-llama/Llama-3.2-3B", "device": "GPU-1", "port": 9302, "status": "running", "cpu_percent": 67.8, "memory_mb": 4096 } ] }

Fields:

  • hive_id - Hive identifier
  • timestamp - Telemetry timestamp
  • workers - Array of worker process stats (from ps monitoring)

Worker Process Stats:

  • pid - Process ID
  • model - Model being served
  • device - GPU/CPU device
  • port - HTTP port
  • status - Process status (running, stopped)
  • cpu_percent - CPU usage percentage
  • memory_mb - Memory usage in MB

Queen Heartbeat

Queen sends its own heartbeat every 2.5 seconds with aggregated cluster status.

{ "type": "queen", "workers_online": 2, "workers_available": 2, "hives_online": 1, "hives_available": 1, "worker_ids": ["worker-1", "worker-2"], "hive_ids": ["gpu-0"], "timestamp": "2025-10-29T12:00:00Z" }

Fields:

  • workers_online - Total workers from all hive telemetry
  • workers_available - Workers in running status
  • hives_online - Total hives streaming telemetry
  • hives_available - Hives with healthy status
  • worker_ids - List of all worker IDs
  • hive_ids - List of all hive IDs

SSE Endpoint

Subscribe to Heartbeats

curl -N http://localhost:7833/v1/heartbeats/stream

Response (SSE stream):

Heartbeat Stream
data: {"type":"queen","workers_online":2,"workers_available":2,"hives_online":1,"timestamp":"2025-10-29T12:00:00Z"} data: {"type":"hive_telemetry","hive_id":"gpu-0","timestamp":"2025-10-29T12:00:01Z","workers":[{"pid":1234,"model":"llama-3.2-1b","status":"running"}]} data: {"type":"queen","workers_online":2,"workers_available":2,"hives_online":1,"timestamp":"2025-10-29T12:00:02.5Z"} data: {"type":"hive_telemetry","hive_id":"gpu-0","timestamp":"2025-10-29T12:00:02Z","workers":[{"pid":1234,"model":"llama-3.2-1b","status":"running"}]}

Client Implementation

JavaScript (EventSource)

// Connect to heartbeat stream const eventSource = new EventSource('http://localhost:7833/v1/heartbeats/stream'); // Handle all events eventSource.onmessage = (event) => { const data = JSON.parse(event.data); switch (data.type) { case 'queen': console.log(`Queen: ${data.workers_online} workers online`); break; case 'hive_telemetry': console.log(`Hive ${data.hive_id}: ${data.workers.length} workers`); data.workers.forEach(w => { console.log(` - PID ${w.pid}: ${w.model} (${w.status})`); }); break; } }; // Handle errors eventSource.onerror = (error) => { console.error('SSE error:', error); eventSource.close(); }; // Close when done // eventSource.close();

Python (requests)

import requests import json # Connect to stream response = requests.get( 'http://localhost:7833/v1/heartbeats/stream', stream=True ) # Process events for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): data = json.loads(line[6:]) # Remove 'data: ' prefix if data['type'] == 'queen': print(f"Queen: {data['workers_online']} workers online") elif data['type'] == 'hive_telemetry': print(f"Hive {data['hive_id']}: {len(data['workers'])} workers") for worker in data['workers']: print(f" - PID {worker['pid']}: {worker['model']}")

cURL (Testing)

# Subscribe and watch curl -N http://localhost:7833/v1/heartbeats/stream # With timeout timeout 10 curl -N http://localhost:7833/v1/heartbeats/stream

Telemetry Timing

Frequencies

ParameterTypeRequiredDefaultDescription
Hive Telemetry StreamintervalRequiredEvery 1 second - Hive streams worker stats to Queen
Queen BroadcastintervalRequiredEvery 2.5 seconds - Queen broadcasts aggregated status
Discovery CallbackeventRequiredOne-time - Hive sends POST /v1/hive/ready on startup
Connection TimeoutdurationOptionalSSE connection drops - Hive marked offline, auto-reconnect attempted

Connection Behavior

When SSE connection drops:

  1. Immediate: Hive marked as offline in TelemetryRegistry
  2. Auto-reconnect: Queen attempts to reconnect to Hive SSE stream
  3. Exponential backoff: Retry with increasing delays
  4. Registry update: Hive marked online when reconnected

Queen automatically:

  • Detects SSE connection drops
  • Marks hive as offline
  • Stops routing to workers on that hive
  • Attempts reconnection
  • Updates registry when reconnected

Registry Updates

TelemetryRegistry

Queen maintains an in-memory telemetry registry updated from Hive SSE streams:

// Hive streams telemetry via SSE // Queen receives HiveTelemetry event { "type": "hive_telemetry", "hive_id": "gpu-0", "workers": [ {"pid": 1234, "model": "llama-3-8b", "status": "running"} ] } // Queen updates TelemetryRegistry hive_registry.update_workers(&hive_id, workers); // Query for scheduling let available_workers = hive_registry.list_available_workers(); let online_hives = hive_registry.list_online_hives();

Registry characteristics:

  • Stored in RAM only (ephemeral)
  • Lost on Queen restart
  • Rebuilt from Hive SSE streams automatically
  • Updated every 1 second from each Hive
  • Contains both Hive and Worker information

Data flow:

  1. Hive monitors worker processes (ps command)
  2. Hive streams worker stats via SSE (every 1s)
  3. Queen receives HiveTelemetry events
  4. Queen updates TelemetryRegistry
  5. Queen uses registry for scheduling decisions

Scheduling Use Case

How Queen Uses Telemetry for Scheduling

Queen maintains a TelemetryRegistry updated from Hive SSE streams. This registry is used for all scheduling decisions.

When inference request arrives:

// 1. Query TelemetryRegistry for available workers let available_workers = telemetry_registry .list_available_workers() .filter(|w| w.model == requested_model) .filter(|w| w.gpu_util_pct < 80.0) // Not overloaded .collect(); // 2. Select best worker (lowest GPU utilization) let worker = available_workers .iter() .min_by_key(|w| w.gpu_util_pct as u32)?; // 3. Route request directly to worker route_to_worker(worker.port, request).await;

Scheduling Criteria:

  • Model match - Worker must have the requested model loaded
  • Worker status - Must be in “running” state
  • GPU utilization - Prefer workers with lower GPU load
  • VRAM availability - Ensure worker has enough VRAM
  • Hive online - Hive must be streaming telemetry

Load Balancing:

  • Queen distributes requests across multiple workers for same model
  • Selects worker with lowest GPU utilization
  • Avoids overloaded workers (>80% GPU util)
  • Considers VRAM usage for large models

Failure Handling:

  • If worker stops responding: Removed from telemetry automatically
  • If Hive disconnects: All workers on that Hive marked offline
  • If no workers available: Queen can trigger worker spawn on Hive

Use Cases

Real-Time Monitoring

Best Practices

1. Handle Reconnections

SSE connections can drop. Implement automatic reconnection:

function connectHeartbeat() { const eventSource = new EventSource('/v1/heartbeats/stream'); eventSource.onerror = (error) => { console.error('Connection lost, reconnecting in 5s...'); eventSource.close(); setTimeout(connectHeartbeat, 5000); }; return eventSource; } const eventSource = connectHeartbeat();

2. Filter Events

Process only events you need:

eventSource.onmessage = (event) => { const data = JSON.parse(event.data); // Only process worker events if (data.type === 'worker') { handleWorkerUpdate(data); } };

3. Batch Updates

Don’t update UI on every event. Batch updates:

let pendingUpdates = []; eventSource.onmessage = (event) => { pendingUpdates.push(JSON.parse(event.data)); }; // Update UI every second setInterval(() => { if (pendingUpdates.length > 0) { updateUI(pendingUpdates); pendingUpdates = []; } }, 1000);

Troubleshooting

Next Steps

2025 © rbee. Your private AI cloud, in one command.
GitHubrbee.dev