Job-Based Architecture
Every operation in rbee follows a job-based pattern with Server-Sent Events (SSE) streaming for real-time progress.
The Pattern
All operations follow this flow:
- Submit job → Get
job_idandsse_url - Connect to SSE stream → Receive real-time events
- Process events → Handle progress, results, errors
- Job completes → Stream ends with
[DONE]
This pattern applies to ALL operations: inference, worker spawn, model download, etc.
Example: Submit Job
curl -X POST http://localhost:7833/v1/jobs \\
-H "Content-Type: application/json" \\
-d '{
"operation": "infer",
"model": "llama-3-8b",
"prompt": "Hello, how are you?",
"max_tokens": 100,
"temperature": 0.7,
"top_p": 0.9,
"stream": true
}'Response:
{
"job_id": "abc-123-def-456",
"sse_url": "/v1/jobs/abc-123-def-456/stream"
}Example: Connect to SSE Stream
curl http://localhost:7833/v1/jobs/abc-123-def-456/streamStream Output:
SSE Stream
data: {"action":"infer_start","message":"Starting inference..."}
data: {"action":"token","message":"Hello"}
data: {"action":"token","message":"!"}
data: {"action":"token","message":" I"}
data: {"action":"token","message":"'m"}
data: {"action":"infer_complete","message":"Inference complete"}
data: [DONE]
SSE Event Format
Each event is a JSON object with:
action- Event type (e.g., “token”, “infer_start”, “error”)message- Human-readable message or token content- Optional fields depending on event type
Event Format
Events are sent as data: {...} lines. The stream ends with data: [DONE].
Client Implementation
Python
import requests
import json
# 1. Submit job
response = requests.post(
'http://localhost:7833/v1/jobs',
json={
'operation': 'infer',
'model': 'llama-3-8b',
'prompt': 'Hello!',
'max_tokens': 50
}
)
job_data = response.json()
job_id = job_data['job_id']
sse_url = job_data['sse_url']
# 2. Connect to SSE stream
stream_response = requests.get(
'http://localhost:7833' + sse_url,
stream=True
)
# 3. Process events
for line in stream_response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
event = json.loads(data)
if event['action'] == 'token':
print(event['message'], end='', flush=True)
elif event['action'] == 'error':
print("Error: " + event['message'])
break
print() # Final newlineJavaScript
// 1. Submit job
const response = await fetch('http://localhost:7833/v1/jobs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
operation: 'infer',
model: 'llama-3-8b',
prompt: 'Hello!',
max_tokens: 50
})
});
const jobData = await response.json();
const jobId = jobData.job_id;
const sseUrl = jobData.sse_url;
// 2. Connect to SSE stream
const eventSource = new EventSource('http://localhost:7833' + sseUrl);
// 3. Process events
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
if (data.action === 'token') {
process.stdout.write(data.message);
} else if (data.action === 'error') {
console.error('Error:', data.message);
eventSource.close();
}
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};cURL
# 1. Submit job and capture response
RESPONSE=$(curl -s -X POST http://localhost:7833/v1/jobs \\
-H "Content-Type: application/json" \\
-d '{
"operation": "infer",
"model": "llama-3-8b",
"prompt": "Hello!",
"max_tokens": 50
}')
# 2. Extract SSE URL
SSE_URL=$(echo $RESPONSE | jq -r '.sse_url')
# 3. Connect to stream
curl -N http://localhost:7833$SSE_URLJob Lifecycle
┌─────────────────┐
│ Submit Job │
│ POST /v1/jobs │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Job Queued │
│ (in memory) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Job Processing │
│ SSE events │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Job Complete │
│ data: [DONE] │
└─────────────────┘Common Event Types
Inference Events
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
| infer_start | event | Required | — | Inference has started |
| token | event | Required | — | Generated token (message contains the token text) |
| infer_complete | event | Required | — | Inference finished successfully |
| error | event | Optional | — | An error occurred (message contains error details) |
Worker Spawn Events
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
| worker_spawn_start | event | Required | — | Worker spawn initiated |
| worker_spawn_health_check | event | Required | — | Waiting for worker health check |
| worker_spawn_complete | event | Required | — | Worker spawned successfully (message contains PID and port) |
Error Handling
Always Handle Errors
Jobs can fail at submission OR during execution. Always check for error events in the stream.
Submission Errors (HTTP 400/500)
{
"error": "Invalid operation",
"details": "Unknown operation: invalid_op"
}Execution Errors (SSE Stream)
data: {"action":"error","message":"Worker crashed during inference"}
data: [DONE]Best Practices
1. Always Connect to SSE Stream
Even for operations that seem “instant”, always connect to the SSE stream to get completion confirmation and error details.
2. Handle Disconnections
# Implement retry logic
max_retries = 3
for attempt in range(max_retries):
try:
# Connect to SSE stream
stream_response = requests.get(sse_url, stream=True, timeout=300)
# Process events...
break
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise3. Set Appropriate Timeouts
# Long-running operations (model download, inference)
stream_response = requests.get(sse_url, stream=True, timeout=600)
# Quick operations (status check)
stream_response = requests.get(sse_url, stream=True, timeout=30)