Skip to content
Skip to Content
Architecture & ConceptsJob-Based Architecture

Job-Based Architecture

Every operation in rbee follows a job-based pattern with Server-Sent Events (SSE) streaming for real-time progress.


The Pattern

All operations follow this flow:

  1. Submit job → Get job_id and sse_url
  2. Connect to SSE stream → Receive real-time events
  3. Process events → Handle progress, results, errors
  4. Job completes → Stream ends with [DONE]

Example: Submit Job

curl -X POST http://localhost:7833/v1/jobs \\ -H "Content-Type: application/json" \\ -d '{ "operation": "infer", "model": "llama-3-8b", "prompt": "Hello, how are you?", "max_tokens": 100, "temperature": 0.7, "top_p": 0.9, "stream": true }'

Response:

{ "job_id": "abc-123-def-456", "sse_url": "/v1/jobs/abc-123-def-456/stream" }

Example: Connect to SSE Stream

curl http://localhost:7833/v1/jobs/abc-123-def-456/stream

Stream Output:

SSE Stream
data: {"action":"infer_start","message":"Starting inference..."} data: {"action":"token","message":"Hello"} data: {"action":"token","message":"!"} data: {"action":"token","message":" I"} data: {"action":"token","message":"'m"} data: {"action":"infer_complete","message":"Inference complete"} data: [DONE]

SSE Event Format

Each event is a JSON object with:

  • action - Event type (e.g., “token”, “infer_start”, “error”)
  • message - Human-readable message or token content
  • Optional fields depending on event type

Client Implementation

Python

import requests import json # 1. Submit job response = requests.post( 'http://localhost:7833/v1/jobs', json={ 'operation': 'infer', 'model': 'llama-3-8b', 'prompt': 'Hello!', 'max_tokens': 50 } ) job_data = response.json() job_id = job_data['job_id'] sse_url = job_data['sse_url'] # 2. Connect to SSE stream stream_response = requests.get( 'http://localhost:7833' + sse_url, stream=True ) # 3. Process events for line in stream_response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith('data: '): data = line[6:] # Remove 'data: ' prefix if data == '[DONE]': break event = json.loads(data) if event['action'] == 'token': print(event['message'], end='', flush=True) elif event['action'] == 'error': print("Error: " + event['message']) break print() # Final newline

JavaScript

// 1. Submit job const response = await fetch('http://localhost:7833/v1/jobs', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ operation: 'infer', model: 'llama-3-8b', prompt: 'Hello!', max_tokens: 50 }) }); const jobData = await response.json(); const jobId = jobData.job_id; const sseUrl = jobData.sse_url; // 2. Connect to SSE stream const eventSource = new EventSource('http://localhost:7833' + sseUrl); // 3. Process events eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); return; } const data = JSON.parse(event.data); if (data.action === 'token') { process.stdout.write(data.message); } else if (data.action === 'error') { console.error('Error:', data.message); eventSource.close(); } }; eventSource.onerror = (error) => { console.error('SSE error:', error); eventSource.close(); };

cURL

# 1. Submit job and capture response RESPONSE=$(curl -s -X POST http://localhost:7833/v1/jobs \\ -H "Content-Type: application/json" \\ -d '{ "operation": "infer", "model": "llama-3-8b", "prompt": "Hello!", "max_tokens": 50 }') # 2. Extract SSE URL SSE_URL=$(echo $RESPONSE | jq -r '.sse_url') # 3. Connect to stream curl -N http://localhost:7833$SSE_URL

Job Lifecycle

┌─────────────────┐ │ Submit Job │ │ POST /v1/jobs │ └────────┬────────┘ ┌─────────────────┐ │ Job Queued │ │ (in memory) │ └────────┬────────┘ ┌─────────────────┐ │ Job Processing │ │ SSE events │ └────────┬────────┘ ┌─────────────────┐ │ Job Complete │ │ data: [DONE] │ └─────────────────┘

Common Event Types

Inference Events

ParameterTypeRequiredDefaultDescription
infer_starteventRequiredInference has started
tokeneventRequiredGenerated token (message contains the token text)
infer_completeeventRequiredInference finished successfully
erroreventOptionalAn error occurred (message contains error details)

Worker Spawn Events

ParameterTypeRequiredDefaultDescription
worker_spawn_starteventRequiredWorker spawn initiated
worker_spawn_health_checkeventRequiredWaiting for worker health check
worker_spawn_completeeventRequiredWorker spawned successfully (message contains PID and port)

Error Handling

Submission Errors (HTTP 400/500)

{ "error": "Invalid operation", "details": "Unknown operation: invalid_op" }

Execution Errors (SSE Stream)

data: {"action":"error","message":"Worker crashed during inference"} data: [DONE]

Best Practices

1. Always Connect to SSE Stream

Even for operations that seem “instant”, always connect to the SSE stream to get completion confirmation and error details.

2. Handle Disconnections

# Implement retry logic max_retries = 3 for attempt in range(max_retries): try: # Connect to SSE stream stream_response = requests.get(sse_url, stream=True, timeout=300) # Process events... break except requests.exceptions.RequestException as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) # Exponential backoff else: raise

3. Set Appropriate Timeouts

# Long-running operations (model download, inference) stream_response = requests.get(sse_url, stream=True, timeout=600) # Quick operations (status check) stream_response = requests.get(sse_url, stream=True, timeout=30)

Next Steps

2025 © rbee. Your private AI cloud, in one command.
GitHubrbee.dev