Skip to content
Skip to Content
AdvancedCustom Workers

Building Custom Workers

Build your own worker types for specialized AI tasks beyond LLM inference.

Worker Contract

All workers must implement the worker contract HTTP API.

Required Endpoints

Health Check:

GET /health

Worker Information:

GET /info

Inference:

POST /v1/infer

Implementation Guide

1. Create Worker Binary

Rust example:

use axum::{routing::{get, post}, Router, Json}; use serde::{Deserialize, Serialize}; #[derive(Deserialize)] struct InferRequest { prompt: String, max_tokens: Option<usize>, } #[derive(Serialize)] struct InferResponse { output: String, } async fn health() -> &'static str { "OK" } async fn info() -> Json<serde_json::Value> { Json(serde_json::json!({ "name": "my-custom-worker", "version": "0.1.0", "capabilities": ["text-generation"] })) } async fn infer( Json(req): Json<InferRequest> ) -> Json<InferResponse> { // Your inference logic here let output = format!("Processed: {}", req.prompt); Json(InferResponse { output }) } #[tokio::main] async fn main() { let app = Router::new() .route("/health", get(health)) .route("/info", get(info)) .route("/v1/infer", post(infer)); let listener = tokio::net::TcpListener::bind("127.0.0.1:8080") .await .unwrap(); axum::serve(listener, app).await.unwrap(); }

2. Implement Health Check

async fn health() -> &'static str { // Return "OK" if worker is healthy // Return error status if not ready "OK" }

3. Implement Info Endpoint

async fn info() -> Json<WorkerInfo> { Json(WorkerInfo { name: "my-worker".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), capabilities: vec!["text-generation".to_string()], model_loaded: Some("my-model-v1".to_string()), }) }

4. Implement Inference

async fn infer( Json(req): Json<InferRequest> ) -> Result<Json<InferResponse>, StatusCode> { // Validate request if req.prompt.is_empty() { return Err(StatusCode::BAD_REQUEST); } // Run inference let output = run_model(&req.prompt)?; // Return response Ok(Json(InferResponse { output, tokens_generated: output.split_whitespace().count(), })) }

Worker Types

Text Generation Worker

Use case: LLM inference, chat, completion

Example: llm-worker-cpu, llm-worker-cuda

Endpoints:

  • POST /v1/infer - Generate text
  • POST /v1/chat/completions - Chat API (OpenAI compatible)

Image Generation Worker

Use case: Stable Diffusion, DALL-E style generation

Example: sd-worker

Endpoints:

  • POST /v1/infer - Generate image
  • POST /v1/txt2img - Text to image
  • POST /v1/img2img - Image to image

Custom Task Worker

Use case: Specialized AI tasks

Examples:

  • Code generation
  • Translation
  • Summarization
  • Classification

Testing

Unit Tests

#[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_health() { let response = health().await; assert_eq!(response, "OK"); } #[tokio::test] async fn test_infer() { let req = InferRequest { prompt: "Hello".to_string(), max_tokens: Some(10), }; let response = infer(Json(req)).await.unwrap(); assert!(!response.0.output.is_empty()); } }

Integration Tests

# Start worker cargo run --bin my-worker -- --port 8080 & # Test health curl http://localhost:8080/health # Test info curl http://localhost:8080/info # Test inference curl -X POST http://localhost:8080/v1/infer \ -H "Content-Type: application/json" \ -d '{"prompt": "Hello, world!"}'

Packaging

Build Release Binary

# Build optimized binary cargo build --release --bin my-worker # Binary location ls -lh target/release/my-worker

Create Distribution

# Create tarball tar -czf my-worker-v0.1.0-linux-x86_64.tar.gz \ -C target/release my-worker # Or create .deb package cargo install cargo-deb cargo deb --bin my-worker

Registration

Register with Worker Catalog

// In hive startup let worker_catalog = WorkerCatalog::new()?; worker_catalog.register_worker(WorkerInfo { id: "my-worker", name: "My Custom Worker", version: "0.1.0", binary_path: "/usr/local/bin/my-worker", worker_type: WorkerType::Custom, capabilities: vec!["custom-task"], })?;

Manual Registration

# Copy binary to system path sudo cp target/release/my-worker /usr/local/bin/ # Make executable sudo chmod +x /usr/local/bin/my-worker # Test my-worker --version

Best Practices

Error Handling

use axum::http::StatusCode; async fn infer( Json(req): Json<InferRequest> ) -> Result<Json<InferResponse>, StatusCode> { // Validate input if req.prompt.len() > 10000 { return Err(StatusCode::PAYLOAD_TOO_LARGE); } // Handle errors gracefully let output = match run_inference(&req.prompt) { Ok(result) => result, Err(e) => { eprintln!("Inference error: {}", e); return Err(StatusCode::INTERNAL_SERVER_ERROR); } }; Ok(Json(InferResponse { output })) }

Resource Management

// Limit concurrent requests use tokio::sync::Semaphore; static SEMAPHORE: Semaphore = Semaphore::const_new(4); async fn infer(req: InferRequest) -> Result<InferResponse> { // Acquire permit (max 4 concurrent) let _permit = SEMAPHORE.acquire().await?; // Run inference run_model(&req.prompt).await }

Logging

use observability_narration_core::n; async fn infer(req: InferRequest) -> Result<InferResponse> { n!("infer_start", "Processing request: {} tokens", req.prompt.len()); let output = run_model(&req.prompt).await?; n!("infer_complete", "Generated {} tokens", output.len()); Ok(InferResponse { output }) }

Completed by: TEAM-427
Based on: Worker contract specification and existing worker implementations

2025 © rbee. Your private AI cloud, in one command.
GitHubrbee.dev