#254 Internal Queue Utilization: Event Triggers, Cron Scheduler, and Workflow Templates

open medium Created 2025-12-04 11:55 · Updated 2025-12-04 11:55

Description

Edit
## Overview The `highway_internal` queue exists with a dedicated worker but is completely unused. This ticket proposes utilizing it to enable self-hosted automation features using the engine's own capabilities. ## Current State - **Internal worker**: `highway-internal-worker.service` listens on `highway_internal` queue - **Queue status**: Empty, never used - **Tools available**: email, http, shell, python, sleep_for, sleep_until, conditionals ## Gap Analysis | Gap | Description | Impact | |-----|-------------|--------| | No trigger mechanism | Nothing auto-submits workflows | Can't react to events | | No cron scheduler | Can't schedule recurring workflows | No automated maintenance | | No event hooks | Can't trigger on user_created, workflow_failed, etc. | No reactive automation | | No workflow templates | Must submit full JSON each time | Hard to reuse | ## Proposed Components ### 1. Cron Scheduler Service A lightweight service that reads schedule definitions and submits workflows at specified times. **Storage**: New table `scheduled_workflows` ```sql CREATE TABLE scheduled_workflows ( schedule_id UUID PRIMARY KEY, tenant_id VARCHAR(64) NOT NULL, name VARCHAR(255) NOT NULL, cron_expression VARCHAR(100) NOT NULL, -- e.g., '0 3 * * *' (daily 3am) workflow_definition JSONB NOT NULL, enabled BOOLEAN DEFAULT true, last_run_at TIMESTAMPTZ, next_run_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` **Implementation**: Background thread in internal worker or separate `hwe scheduler` command. ### 2. Event Trigger System Hook into key system events to auto-submit workflows. **Events to support**: - `user_created` - Welcome email sequence - `workflow_completed` - Post-processing, notifications - `workflow_failed` - Alert, auto-retry logic - `approval_pending` - Reminder emails - `api_key_created` - Security audit workflow **Storage**: New table `event_triggers` ```sql CREATE TABLE event_triggers ( trigger_id UUID PRIMARY KEY, tenant_id VARCHAR(64) NOT NULL, event_type VARCHAR(100) NOT NULL, filter_expression JSONB, -- e.g., {"workflow_name": "critical_*"} workflow_definition JSONB NOT NULL, enabled BOOLEAN DEFAULT true, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` **Implementation**: Emit events via NOTIFY, internal worker listens and submits matching workflows. ### 3. Workflow Template Registry Store reusable workflow definitions that can be triggered by name with parameters. **Storage**: New table `workflow_templates` ```sql CREATE TABLE workflow_templates ( template_id UUID PRIMARY KEY, tenant_id VARCHAR(64) NOT NULL, name VARCHAR(255) UNIQUE NOT NULL, description TEXT, workflow_definition JSONB NOT NULL, input_schema JSONB, -- Pydantic schema for parameters category VARCHAR(100), -- 'maintenance', 'notifications', 'etl' created_at TIMESTAMPTZ DEFAULT NOW() ); ``` **API**: `POST /api/v1/templates/{name}/run` with input parameters. ## Use Cases Enabled | Use Case | Trigger Type | Workflow | |----------|-------------|----------| | Welcome email + 2-day follow-up | Event: user_created | Send email → sleep 2 days → check login → conditional follow-up | | Worker health monitoring | Cron: */5 * * * * | HTTP check workers → alert on failure | | Website uptime | Cron: * * * * * | HTTP GET → log result → alert if down | | Daily database backup | Cron: 0 2 * * * | Shell: pg_dump → upload to S3 | | Weekly cleanup | Cron: 0 3 * * 0 | Run cleanup tools | | Failed workflow alerts | Event: workflow_failed | Send Slack/email notification | | ETL pipeline | Cron: 0 4 * * * | Extract → Transform → Load | | Approval reminders | Event: approval_pending + Cron | Check pending > 24h → send reminder | ## Implementation Priority 1. **Phase 1**: Workflow Templates + API (simplest, immediate value) 2. **Phase 2**: Cron Scheduler (enables maintenance automation) 3. **Phase 3**: Event Triggers (enables reactive workflows) ## API Endpoints Needed ``` # Templates POST /api/v1/templates # Create template GET /api/v1/templates # List templates GET /api/v1/templates/{name} # Get template PUT /api/v1/templates/{name} # Update template DELETE /api/v1/templates/{name} # Delete template POST /api/v1/templates/{name}/run # Execute template with params # Schedules POST /api/v1/schedules # Create schedule GET /api/v1/schedules # List schedules PUT /api/v1/schedules/{id} # Update schedule DELETE /api/v1/schedules/{id} # Delete schedule POST /api/v1/schedules/{id}/pause # Pause schedule POST /api/v1/schedules/{id}/resume # Resume schedule # Triggers POST /api/v1/triggers # Create trigger GET /api/v1/triggers # List triggers PUT /api/v1/triggers/{id} # Update trigger DELETE /api/v1/triggers/{id} # Delete trigger ``` ## Example: Welcome Email Workflow ```python from highway_dsl import WorkflowBuilder, task workflow = ( WorkflowBuilder("welcome_email_sequence") .add(task("send_welcome") .tool("tools.email.send") .input(to="${input.email}", subject="Welcome to Highway\!", template="welcome")) .add(task("wait_2_days") .tool("tools.time.sleep_for") .input(duration="2d")) .add(task("check_login") .tool("tools.http.request") .input(url="${env.API_URL}/api/v1/users/${input.email}/last_login")) .add(task("send_followup") .tool("tools.email.send") .input(to="${input.email}", subject="Need help getting started?", template="followup") .when("${steps.check_login.output.has_logged_in} == false")) .build() ) ``` **Triggered by**: Event trigger on `user_created` event. ## Notes - All workflows submitted to `highway_internal` queue - Internal worker already exists and is running - No new workers needed, just scheduling/trigger logic - Tenant isolation maintained throughout

Comments

Loading comments...

Context

Loading context...

Audit History

View All
Loading audit history...