Building Custom Workers
Build your own worker types for specialized AI tasks beyond LLM inference.
Worker Contract
All workers must implement the worker contract HTTP API.
Required Endpoints
Health Check:
GET /healthWorker Information:
GET /infoInference:
POST /v1/inferImplementation Guide
1. Create Worker Binary
Rust example:
use axum::{routing::{get, post}, Router, Json};
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct InferRequest {
prompt: String,
max_tokens: Option<usize>,
}
#[derive(Serialize)]
struct InferResponse {
output: String,
}
async fn health() -> &'static str {
"OK"
}
async fn info() -> Json<serde_json::Value> {
Json(serde_json::json!({
"name": "my-custom-worker",
"version": "0.1.0",
"capabilities": ["text-generation"]
}))
}
async fn infer(
Json(req): Json<InferRequest>
) -> Json<InferResponse> {
// Your inference logic here
let output = format!("Processed: {}", req.prompt);
Json(InferResponse { output })
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/health", get(health))
.route("/info", get(info))
.route("/v1/infer", post(infer));
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
}2. Implement Health Check
async fn health() -> &'static str {
// Return "OK" if worker is healthy
// Return error status if not ready
"OK"
}3. Implement Info Endpoint
async fn info() -> Json<WorkerInfo> {
Json(WorkerInfo {
name: "my-worker".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
capabilities: vec!["text-generation".to_string()],
model_loaded: Some("my-model-v1".to_string()),
})
}4. Implement Inference
async fn infer(
Json(req): Json<InferRequest>
) -> Result<Json<InferResponse>, StatusCode> {
// Validate request
if req.prompt.is_empty() {
return Err(StatusCode::BAD_REQUEST);
}
// Run inference
let output = run_model(&req.prompt)?;
// Return response
Ok(Json(InferResponse {
output,
tokens_generated: output.split_whitespace().count(),
}))
}Worker Types
Text Generation Worker
Use case: LLM inference, chat, completion
Example: llm-worker-cpu, llm-worker-cuda
Endpoints:
POST /v1/infer- Generate textPOST /v1/chat/completions- Chat API (OpenAI compatible)
Image Generation Worker
Use case: Stable Diffusion, DALL-E style generation
Example: sd-worker
Endpoints:
POST /v1/infer- Generate imagePOST /v1/txt2img- Text to imagePOST /v1/img2img- Image to image
Custom Task Worker
Use case: Specialized AI tasks
Examples:
- Code generation
- Translation
- Summarization
- Classification
Testing
Unit Tests
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_health() {
let response = health().await;
assert_eq!(response, "OK");
}
#[tokio::test]
async fn test_infer() {
let req = InferRequest {
prompt: "Hello".to_string(),
max_tokens: Some(10),
};
let response = infer(Json(req)).await.unwrap();
assert!(!response.0.output.is_empty());
}
}Integration Tests
# Start worker
cargo run --bin my-worker -- --port 8080 &
# Test health
curl http://localhost:8080/health
# Test info
curl http://localhost:8080/info
# Test inference
curl -X POST http://localhost:8080/v1/infer \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello, world!"}'Packaging
Build Release Binary
# Build optimized binary
cargo build --release --bin my-worker
# Binary location
ls -lh target/release/my-workerCreate Distribution
# Create tarball
tar -czf my-worker-v0.1.0-linux-x86_64.tar.gz \
-C target/release my-worker
# Or create .deb package
cargo install cargo-deb
cargo deb --bin my-workerRegistration
Register with Worker Catalog
// In hive startup
let worker_catalog = WorkerCatalog::new()?;
worker_catalog.register_worker(WorkerInfo {
id: "my-worker",
name: "My Custom Worker",
version: "0.1.0",
binary_path: "/usr/local/bin/my-worker",
worker_type: WorkerType::Custom,
capabilities: vec!["custom-task"],
})?;Manual Registration
# Copy binary to system path
sudo cp target/release/my-worker /usr/local/bin/
# Make executable
sudo chmod +x /usr/local/bin/my-worker
# Test
my-worker --versionBest Practices
Error Handling
use axum::http::StatusCode;
async fn infer(
Json(req): Json<InferRequest>
) -> Result<Json<InferResponse>, StatusCode> {
// Validate input
if req.prompt.len() > 10000 {
return Err(StatusCode::PAYLOAD_TOO_LARGE);
}
// Handle errors gracefully
let output = match run_inference(&req.prompt) {
Ok(result) => result,
Err(e) => {
eprintln!("Inference error: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
Ok(Json(InferResponse { output }))
}Resource Management
// Limit concurrent requests
use tokio::sync::Semaphore;
static SEMAPHORE: Semaphore = Semaphore::const_new(4);
async fn infer(req: InferRequest) -> Result<InferResponse> {
// Acquire permit (max 4 concurrent)
let _permit = SEMAPHORE.acquire().await?;
// Run inference
run_model(&req.prompt).await
}Logging
use observability_narration_core::n;
async fn infer(req: InferRequest) -> Result<InferResponse> {
n!("infer_start", "Processing request: {} tokens", req.prompt.len());
let output = run_model(&req.prompt).await?;
n!("infer_complete", "Generated {} tokens", output.len());
Ok(InferResponse { output })
}Related Documentation
Worker Types Guide
Available worker types
Hive Configuration
Worker management
Catalog System
Worker catalog
Completed by: TEAM-427
Based on: Worker contract specification and existing worker implementations