#187 Worker Affinity for Workflow Routing
Description
Edit## Summary
Implement worker affinity to control which workflows run on which workers. Default behavior: all workflows run on all workers.
## Problem Statement
Currently, any worker can claim any task from the queue. In production environments, we need:
- GPU-intensive workflows to run only on GPU-enabled workers
- Region-specific workflows to stay in specific data centers
- Tenant isolation (tenant X workflows only on tenant X workers)
- Resource-based routing (high-memory workflows on high-memory workers)
## Current Architecture Analysis
### Claim Flow (what needs modification):
1. Worker calls `absurd.claim_task(queue_name, worker_id, timeout, qty)`
2. claim_task selects from `r_{queue}` JOIN `t_{queue}` WHERE state IN ('pending', 'sleeping')
3. Claims first available task regardless of worker characteristics
### Existing Infrastructure (can be leveraged):
- `worker_registry.tags` JSONB - already stores custom labels like `{"gpu": true, "region": "us-east"}`
- `workflow_definition.tags` JSONB - exists but not used for routing
- `absurd.t_{queue}.headers` JSONB - task headers, workflow_run_id stored here
## Proposed Design
### 1. Schema Changes
**Migration: highway_0.0.XX_worker_affinity.sql**
```sql
-- Add affinity_tags to task tables (via ensure_queue_tables update)
-- New column: affinity_tags JSONB DEFAULT NULL
-- NULL = no affinity (runs on any worker) - CRITICAL: this is the default
-- Add index for affinity matching
CREATE INDEX idx_t_{queue}_affinity ON absurd.t_{queue} USING GIN (affinity_tags);
-- Worker affinity stored in existing tags column (no new column needed)
-- Convention: tags.affinity_tags = {"gpu": true, "region": "us-east"}
```
### 2. Affinity Matching Logic
**Match Rules:**
- Task with `affinity_tags = NULL` → runs on ANY worker (default, backward compatible)
- Task with `affinity_tags = {"gpu": true}` → only workers where `tags @> '{"gpu": true}'`
- Task with `affinity_tags = {"region": "us-east", "gpu": true}` → workers matching ALL tags
**claim_task modification:**
```sql
-- In candidate CTE, add affinity filter:
WHERE (
t.affinity_tags IS NULL -- No affinity = any worker
OR (
-- Worker tags must contain all affinity_tags
SELECT tags FROM worker_registry WHERE worker_id = $3
) @> t.affinity_tags
)
```
### 3. API Changes
**Workflow Definition:**
```python
workflow = Workflow(
name="gpu_training",
affinity_tags={"gpu": True, "memory": "high"}, # NEW FIELD
tasks=[...]
)
```
**DSL Support:**
```python
workflow = (
WorkflowBuilder("gpu_training")
.with_affinity(gpu=True, memory="high") # NEW METHOD
.task(...)
.build()
)
```
**spawn_task modification:**
- Accept `affinity_tags` in options
- Store in `t_{queue}.affinity_tags`
### 4. Worker Configuration
**Worker CLI:**
```bash
hwe worker --tags '{"gpu": true, "region": "us-east"}'
```
**Registration update:**
- Tags already passed to `register_worker_heartbeat`
- No changes needed if convention followed
### 5. Implementation Phases
**Phase 1: Schema & Core Logic**
- [ ] Migration: Add `affinity_tags` column to queue tables
- [ ] Modify `absurd.ensure_queue_tables` to include affinity_tags
- [ ] Modify `absurd.spawn_task` to accept and store affinity_tags
- [ ] Modify `absurd.claim_task` to filter by affinity
**Phase 2: Client Integration**
- [ ] Update `AbsurdClient.spawn_task()` to pass affinity_tags
- [ ] Update `WorkflowBuilder` with `.with_affinity()` method
- [ ] Update Highway DSL to support affinity in workflow metadata
**Phase 3: Worker Integration**
- [ ] Update `hwe worker` CLI to accept `--tags` JSON
- [ ] Pass tags to `WorkerRegistrationService`
- [ ] Ensure tags are properly stored in `worker_registry`
**Phase 4: API & UI**
- [ ] Update workflow submission API to accept affinity_tags
- [ ] Add affinity info to workflow monitoring UI
- [ ] Add worker tags display in monitoring
**Phase 5: Testing**
- [ ] Integration tests: task claims respect affinity
- [ ] Integration tests: NULL affinity = any worker (backward compat)
- [ ] Integration tests: multi-tag matching (AND logic)
- [ ] Performance tests: claim_task with affinity filtering
## Edge Cases & Guarantees
### Backward Compatibility
- Existing workflows have `affinity_tags = NULL` → run on any worker ✓
- Existing workers have `tags = {}` → can only claim NULL-affinity tasks ✓
- No migration of existing data needed
### Starvation Prevention
- If no matching workers online, task stays pending (expected behavior)
- Monitoring alert if tasks pending > threshold with no matching workers
- Admin can update affinity or add matching workers
### Performance Considerations
- GIN index on affinity_tags for efficient containment queries
- claim_task query complexity: O(1) with index
- Worker tags cached in memory (already done via registration service)
## Testing Strategy
```python
# Test 1: Default behavior (NULL affinity)
workflow = submit_workflow("test_default")
# Any worker should claim
# Test 2: Specific affinity
workflow = submit_workflow("test_gpu", affinity_tags={"gpu": True})
# Only gpu=true workers should claim
# Test 3: Multi-tag affinity
workflow = submit_workflow("test_multi", affinity_tags={"gpu": True, "region": "us-east"})
# Only workers with BOTH tags should claim
# Test 4: No matching workers
workflow = submit_workflow("test_nomatch", affinity_tags={"nonexistent": True})
# Task should remain pending indefinitely
```
## Monitoring & Observability
- New metric: `highway_tasks_pending_no_affinity_match` - tasks waiting with no matching workers
- Dashboard: Worker tags distribution
- Alert: Task pending > 5min with affinity but no matching workers
## Future Enhancements (Out of Scope)
- Soft affinity (prefer but don't require)
- Anti-affinity (avoid specific workers)
- Weighted affinity (prefer workers with more matching tags)
- Dynamic affinity (change based on load)
Comments
Loading comments...
Context
Loading context...
Audit History
View AllLoading audit history...