#254 Internal Queue Utilization: Event Triggers, Cron Scheduler, and Workflow Templates
Description
Edit## Overview
The `highway_internal` queue exists with a dedicated worker but is completely unused. This ticket proposes utilizing it to enable self-hosted automation features using the engine's own capabilities.
## Current State
- **Internal worker**: `highway-internal-worker.service` listens on `highway_internal` queue
- **Queue status**: Empty, never used
- **Tools available**: email, http, shell, python, sleep_for, sleep_until, conditionals
## Gap Analysis
| Gap | Description | Impact |
|-----|-------------|--------|
| No trigger mechanism | Nothing auto-submits workflows | Can't react to events |
| No cron scheduler | Can't schedule recurring workflows | No automated maintenance |
| No event hooks | Can't trigger on user_created, workflow_failed, etc. | No reactive automation |
| No workflow templates | Must submit full JSON each time | Hard to reuse |
## Proposed Components
### 1. Cron Scheduler Service
A lightweight service that reads schedule definitions and submits workflows at specified times.
**Storage**: New table `scheduled_workflows`
```sql
CREATE TABLE scheduled_workflows (
schedule_id UUID PRIMARY KEY,
tenant_id VARCHAR(64) NOT NULL,
name VARCHAR(255) NOT NULL,
cron_expression VARCHAR(100) NOT NULL, -- e.g., '0 3 * * *' (daily 3am)
workflow_definition JSONB NOT NULL,
enabled BOOLEAN DEFAULT true,
last_run_at TIMESTAMPTZ,
next_run_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
**Implementation**: Background thread in internal worker or separate `hwe scheduler` command.
### 2. Event Trigger System
Hook into key system events to auto-submit workflows.
**Events to support**:
- `user_created` - Welcome email sequence
- `workflow_completed` - Post-processing, notifications
- `workflow_failed` - Alert, auto-retry logic
- `approval_pending` - Reminder emails
- `api_key_created` - Security audit workflow
**Storage**: New table `event_triggers`
```sql
CREATE TABLE event_triggers (
trigger_id UUID PRIMARY KEY,
tenant_id VARCHAR(64) NOT NULL,
event_type VARCHAR(100) NOT NULL,
filter_expression JSONB, -- e.g., {"workflow_name": "critical_*"}
workflow_definition JSONB NOT NULL,
enabled BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
**Implementation**: Emit events via NOTIFY, internal worker listens and submits matching workflows.
### 3. Workflow Template Registry
Store reusable workflow definitions that can be triggered by name with parameters.
**Storage**: New table `workflow_templates`
```sql
CREATE TABLE workflow_templates (
template_id UUID PRIMARY KEY,
tenant_id VARCHAR(64) NOT NULL,
name VARCHAR(255) UNIQUE NOT NULL,
description TEXT,
workflow_definition JSONB NOT NULL,
input_schema JSONB, -- Pydantic schema for parameters
category VARCHAR(100), -- 'maintenance', 'notifications', 'etl'
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
**API**: `POST /api/v1/templates/{name}/run` with input parameters.
## Use Cases Enabled
| Use Case | Trigger Type | Workflow |
|----------|-------------|----------|
| Welcome email + 2-day follow-up | Event: user_created | Send email → sleep 2 days → check login → conditional follow-up |
| Worker health monitoring | Cron: */5 * * * * | HTTP check workers → alert on failure |
| Website uptime | Cron: * * * * * | HTTP GET → log result → alert if down |
| Daily database backup | Cron: 0 2 * * * | Shell: pg_dump → upload to S3 |
| Weekly cleanup | Cron: 0 3 * * 0 | Run cleanup tools |
| Failed workflow alerts | Event: workflow_failed | Send Slack/email notification |
| ETL pipeline | Cron: 0 4 * * * | Extract → Transform → Load |
| Approval reminders | Event: approval_pending + Cron | Check pending > 24h → send reminder |
## Implementation Priority
1. **Phase 1**: Workflow Templates + API (simplest, immediate value)
2. **Phase 2**: Cron Scheduler (enables maintenance automation)
3. **Phase 3**: Event Triggers (enables reactive workflows)
## API Endpoints Needed
```
# Templates
POST /api/v1/templates # Create template
GET /api/v1/templates # List templates
GET /api/v1/templates/{name} # Get template
PUT /api/v1/templates/{name} # Update template
DELETE /api/v1/templates/{name} # Delete template
POST /api/v1/templates/{name}/run # Execute template with params
# Schedules
POST /api/v1/schedules # Create schedule
GET /api/v1/schedules # List schedules
PUT /api/v1/schedules/{id} # Update schedule
DELETE /api/v1/schedules/{id} # Delete schedule
POST /api/v1/schedules/{id}/pause # Pause schedule
POST /api/v1/schedules/{id}/resume # Resume schedule
# Triggers
POST /api/v1/triggers # Create trigger
GET /api/v1/triggers # List triggers
PUT /api/v1/triggers/{id} # Update trigger
DELETE /api/v1/triggers/{id} # Delete trigger
```
## Example: Welcome Email Workflow
```python
from highway_dsl import WorkflowBuilder, task
workflow = (
WorkflowBuilder("welcome_email_sequence")
.add(task("send_welcome")
.tool("tools.email.send")
.input(to="${input.email}", subject="Welcome to Highway\!", template="welcome"))
.add(task("wait_2_days")
.tool("tools.time.sleep_for")
.input(duration="2d"))
.add(task("check_login")
.tool("tools.http.request")
.input(url="${env.API_URL}/api/v1/users/${input.email}/last_login"))
.add(task("send_followup")
.tool("tools.email.send")
.input(to="${input.email}", subject="Need help getting started?", template="followup")
.when("${steps.check_login.output.has_logged_in} == false"))
.build()
)
```
**Triggered by**: Event trigger on `user_created` event.
## Notes
- All workflows submitted to `highway_internal` queue
- Internal worker already exists and is running
- No new workers needed, just scheduling/trigger logic
- Tenant isolation maintained throughout
Comments
Loading comments...
Context
Loading context...
Audit History
View AllLoading audit history...