#187 Worker Affinity for Workflow Routing

closed high absurd-queue architecture backend feature Created 2025-11-30 01:17 · Updated 2025-11-30 03:02

Description

Edit
## Summary Implement worker affinity to control which workflows run on which workers. Default behavior: all workflows run on all workers. ## Problem Statement Currently, any worker can claim any task from the queue. In production environments, we need: - GPU-intensive workflows to run only on GPU-enabled workers - Region-specific workflows to stay in specific data centers - Tenant isolation (tenant X workflows only on tenant X workers) - Resource-based routing (high-memory workflows on high-memory workers) ## Current Architecture Analysis ### Claim Flow (what needs modification): 1. Worker calls `absurd.claim_task(queue_name, worker_id, timeout, qty)` 2. claim_task selects from `r_{queue}` JOIN `t_{queue}` WHERE state IN ('pending', 'sleeping') 3. Claims first available task regardless of worker characteristics ### Existing Infrastructure (can be leveraged): - `worker_registry.tags` JSONB - already stores custom labels like `{"gpu": true, "region": "us-east"}` - `workflow_definition.tags` JSONB - exists but not used for routing - `absurd.t_{queue}.headers` JSONB - task headers, workflow_run_id stored here ## Proposed Design ### 1. Schema Changes **Migration: highway_0.0.XX_worker_affinity.sql** ```sql -- Add affinity_tags to task tables (via ensure_queue_tables update) -- New column: affinity_tags JSONB DEFAULT NULL -- NULL = no affinity (runs on any worker) - CRITICAL: this is the default -- Add index for affinity matching CREATE INDEX idx_t_{queue}_affinity ON absurd.t_{queue} USING GIN (affinity_tags); -- Worker affinity stored in existing tags column (no new column needed) -- Convention: tags.affinity_tags = {"gpu": true, "region": "us-east"} ``` ### 2. Affinity Matching Logic **Match Rules:** - Task with `affinity_tags = NULL` → runs on ANY worker (default, backward compatible) - Task with `affinity_tags = {"gpu": true}` → only workers where `tags @> '{"gpu": true}'` - Task with `affinity_tags = {"region": "us-east", "gpu": true}` → workers matching ALL tags **claim_task modification:** ```sql -- In candidate CTE, add affinity filter: WHERE ( t.affinity_tags IS NULL -- No affinity = any worker OR ( -- Worker tags must contain all affinity_tags SELECT tags FROM worker_registry WHERE worker_id = $3 ) @> t.affinity_tags ) ``` ### 3. API Changes **Workflow Definition:** ```python workflow = Workflow( name="gpu_training", affinity_tags={"gpu": True, "memory": "high"}, # NEW FIELD tasks=[...] ) ``` **DSL Support:** ```python workflow = ( WorkflowBuilder("gpu_training") .with_affinity(gpu=True, memory="high") # NEW METHOD .task(...) .build() ) ``` **spawn_task modification:** - Accept `affinity_tags` in options - Store in `t_{queue}.affinity_tags` ### 4. Worker Configuration **Worker CLI:** ```bash hwe worker --tags '{"gpu": true, "region": "us-east"}' ``` **Registration update:** - Tags already passed to `register_worker_heartbeat` - No changes needed if convention followed ### 5. Implementation Phases **Phase 1: Schema & Core Logic** - [ ] Migration: Add `affinity_tags` column to queue tables - [ ] Modify `absurd.ensure_queue_tables` to include affinity_tags - [ ] Modify `absurd.spawn_task` to accept and store affinity_tags - [ ] Modify `absurd.claim_task` to filter by affinity **Phase 2: Client Integration** - [ ] Update `AbsurdClient.spawn_task()` to pass affinity_tags - [ ] Update `WorkflowBuilder` with `.with_affinity()` method - [ ] Update Highway DSL to support affinity in workflow metadata **Phase 3: Worker Integration** - [ ] Update `hwe worker` CLI to accept `--tags` JSON - [ ] Pass tags to `WorkerRegistrationService` - [ ] Ensure tags are properly stored in `worker_registry` **Phase 4: API & UI** - [ ] Update workflow submission API to accept affinity_tags - [ ] Add affinity info to workflow monitoring UI - [ ] Add worker tags display in monitoring **Phase 5: Testing** - [ ] Integration tests: task claims respect affinity - [ ] Integration tests: NULL affinity = any worker (backward compat) - [ ] Integration tests: multi-tag matching (AND logic) - [ ] Performance tests: claim_task with affinity filtering ## Edge Cases & Guarantees ### Backward Compatibility - Existing workflows have `affinity_tags = NULL` → run on any worker ✓ - Existing workers have `tags = {}` → can only claim NULL-affinity tasks ✓ - No migration of existing data needed ### Starvation Prevention - If no matching workers online, task stays pending (expected behavior) - Monitoring alert if tasks pending > threshold with no matching workers - Admin can update affinity or add matching workers ### Performance Considerations - GIN index on affinity_tags for efficient containment queries - claim_task query complexity: O(1) with index - Worker tags cached in memory (already done via registration service) ## Testing Strategy ```python # Test 1: Default behavior (NULL affinity) workflow = submit_workflow("test_default") # Any worker should claim # Test 2: Specific affinity workflow = submit_workflow("test_gpu", affinity_tags={"gpu": True}) # Only gpu=true workers should claim # Test 3: Multi-tag affinity workflow = submit_workflow("test_multi", affinity_tags={"gpu": True, "region": "us-east"}) # Only workers with BOTH tags should claim # Test 4: No matching workers workflow = submit_workflow("test_nomatch", affinity_tags={"nonexistent": True}) # Task should remain pending indefinitely ``` ## Monitoring & Observability - New metric: `highway_tasks_pending_no_affinity_match` - tasks waiting with no matching workers - Dashboard: Worker tags distribution - Alert: Task pending > 5min with affinity but no matching workers ## Future Enhancements (Out of Scope) - Soft affinity (prefer but don't require) - Anti-affinity (avoid specific workers) - Weighted affinity (prefer workers with more matching tags) - Dynamic affinity (change based on load)

Comments

Loading comments...

Context

Loading context...

Audit History

View All
Loading audit history...