>_
.issue.db
/highway-workflow-engine
Dashboard
Issues
Memory
Lessons
Audit Log
New Issue
Edit Issue #358
Update issue details
Title *
Description
## Summary Implement a new `storage` tool that enables workflows to: 1. Store workflow-generated files persistently (e.g., generated reports, summaries) 2. Request file uploads from users (human-in-the-loop) with presigned URLs 3. Retrieve and manage stored files with configurable retention policies ## Background Current state: - Artifacts: ZIP files only, 16MB limit, local filesystem, designed for code bundles - DataShard: Already has S3 configured (s3.rodmena.co.uk/datashard) for logs - No capability for workflows to store arbitrary files or collect user uploads - ForEach operator already supports dynamic parallel fan-out (no DSL changes needed) Use cases: - Workflow generates 100-page summary document -> needs persistent storage - KYC workflow needs user to upload ID documents -> needs upload capability - Both cases require flexible retention policies defined by workflow ## Tool API Design ### Store workflow-generated files ```python tools.storage.store( name="summary_report", data=bytes, content_type="application/pdf", retention_policy="days:90" ) -> storage_ref ``` ### Retrieve stored files ```python tools.storage.retrieve(storage_ref) -> bytes tools.storage.get_url(storage_ref, expires_in=3600) -> url tools.storage.delete(storage_ref) ``` ### Request user upload (workflow sleeps until complete) ```python tools.storage.request_upload( name="kyc_documents", allowed_types=["pdf", "jpg", "png"], max_size_mb=50, max_files=5, prompt="Please upload your ID documents", retention_policy="days:90", expires_in=86400, callback_url="https://bank.com/webhook", redirect_url="https://bank.com/next-step", ) -> { "upload_id": "uuid", "upload_token": "Xk9mP2qR", "upload_url": "https://highway.rodmena.app/u/Xk9mP2qR", "presigned_urls": [...], "expires_at": "2025-12-11T12:00:00Z" } ``` ## Storage Reference Format **IMPORTANT: Use SHA-256 of file content for filename (content-addressable)** ``` storage_ref = "hwy://uploads/{tenant_id}/{workflow_run_id}/{sha256}.{ext}" ``` Example: `hwy://uploads/test/abc123/e3b0c44298fc1c149afbf4c8996fb924.pdf` Benefits: - Deduplication (same file = same key) - No collision possible - Consistent with existing artifact design Internally stored as: ```json { "storage_type": "s3", "bucket": "highway-uploads", "key": "test/abc123/e3b0c44298fc1c149afbf4c8996fb924.pdf", "content_type": "application/pdf", "size_bytes": 1048576, "checksum_sha256": "e3b0c44298fc1c149afbf4c8996fb924...", "original_filename": "passport.pdf", "created_at": "2025-12-11T10:00:00Z", "retention_policy": "days:90", "workflow_run_id": "abc123", "tenant_id": "test" } ``` ## Upload Token Design - Opaque 8-char alphanumeric token with DB lookup - Revocable, auditable, short URLs - Example: https://highway.rodmena.app/u/Xk9mP2qR ## Upload Flow ### Workflow Side: 1. tools.storage.request_upload(...) creates pending_upload record 2. Workflow sends upload_url to user (email/SMS/in-app) 3. ctx.wait_for_event("upload_{upload_id}") - workflow sleeps ### Client Side (Two Options): Option A - Highway Upload Page: - User visits /u/{token}, uploads via form, auto-completes Option B - API Integration (for banks AND testing): - GET /api/v1/uploads/{token}/info -> constraints, presigned_urls - PUT to presigned_url with file body (direct S3 upload) - POST /api/v1/uploads/{token}/complete -> triggers workflow ### Edge Cases: - **Page refresh**: Regenerate presigned URLs on each GET /info call - **Concurrent uploads**: First-complete-wins, but includes ALL uploaded files - **Expired link**: Workflow handles via timeout + new request_upload() - **Completion idempotency**: Second /complete returns 409 with existing files ### Workflow Resumes with: ```json { "files": [ { "storage_ref": "hwy://uploads/test/wf123/e3b0c44298fc1c14.pdf", "filename": "passport.pdf", "content_type": "application/pdf", "size_bytes": 2048576, "checksum_sha256": "e3b0c44298fc1c14..." } ], "uploaded_at": "2025-12-11T11:30:00Z" } ``` ## Configuration (config.ini) ```ini [uploads] storage_type = s3 s3_endpoint = https://s3.rodmena.co.uk s3_bucket = highway-uploads s3_region = us-east-1 default_retention_days = 30 default_expiry_hours = 24 max_file_size_mb = 0 ``` ## Database Schema ### stored_files table - file_id UUID PRIMARY KEY - tenant_id VARCHAR(255) NOT NULL - workflow_run_id UUID - storage_ref VARCHAR(1024) UNIQUE NOT NULL - s3_bucket VARCHAR(255) NOT NULL - s3_key VARCHAR(1024) NOT NULL - name VARCHAR(255) NOT NULL - original_filename VARCHAR(255) - content_type VARCHAR(255) - size_bytes BIGINT - checksum_sha256 VARCHAR(64) NOT NULL - retention_policy VARCHAR(100) - expires_at TIMESTAMPTZ - created_at TIMESTAMPTZ DEFAULT NOW() - created_by VARCHAR(255) ### pending_uploads table - upload_id UUID PRIMARY KEY - upload_token VARCHAR(32) UNIQUE NOT NULL - tenant_id VARCHAR(255) NOT NULL - workflow_run_id UUID NOT NULL - event_name VARCHAR(255) NOT NULL - name VARCHAR(255) NOT NULL - allowed_types TEXT[] - max_size_bytes BIGINT - max_files INTEGER DEFAULT 1 - prompt TEXT - s3_bucket VARCHAR(255) NOT NULL - s3_key_prefix VARCHAR(1024) NOT NULL - status VARCHAR(50) DEFAULT 'pending' - retention_policy VARCHAR(100) - callback_url TEXT - redirect_url TEXT - created_at TIMESTAMPTZ DEFAULT NOW() - expires_at TIMESTAMPTZ NOT NULL - completed_at TIMESTAMPTZ ### upload_files table (junction for multi-file) - upload_id UUID REFERENCES pending_uploads - file_id UUID REFERENCES stored_files - file_index INTEGER NOT NULL ## API Endpoints - GET /api/v1/uploads/{token}/info - Get constraints + fresh presigned URLs - POST /api/v1/uploads/{token}/complete - Mark complete, trigger workflow - POST /api/v1/uploads/{token}/cancel - Cancel pending upload - GET /u/{token} - HTML upload page (built-in) - GET /api/v1/storage/{ref}/url - Get presigned download URL - DELETE /api/v1/storage/{ref} - Delete file ## LLM Tool Enhancement (Dependency) The LLM tool must accept storage_ref in attachments: ```python tools.llm.call( model="gpt-4", prompt="Summarize this document", attachments=[ {"type": "storage_ref", "ref": "hwy://uploads/test/wf123/abc123.pdf"} ] ) ``` Tool internally: 1. Resolves storage_ref to presigned URL via storage service 2. Downloads content or passes URL to LLM provider 3. Processes as normal attachment ## Security - 8-char token = 62^8 = 218 trillion combinations - Presigned URL separate expiry (regenerated on each /info call) - Server-side file type/size validation on /complete - SHA-256 filename = content-addressable, no collision - Tenant isolation on all queries - RBAC: manage_uploads, view_uploads permissions ## ACCEPTANCE CRITERIA - MANDATORY WORKFLOW TEST After implementation, the following workflow MUST work end-to-end: ### Article Summarization Workflow ```python from highway_dsl import WorkflowBuilder def get_workflow(): builder = WorkflowBuilder(name="arxiv_summarization") # Step 1: Request PDF upload (3 arxiv papers) builder.task( "request_pdfs", "tools.storage.request_upload", kwargs={ "name": "arxiv_papers", "allowed_types": ["pdf"], "max_files": 3, "prompt": "Upload 3 arxiv PDF papers for summarization", "retention_policy": "days:7", "expires_in": 3600 }, result_key="upload_request" ) # Step 2: Wait for upload (test will upload programmatically) builder.task( "wait_for_pdfs", "tools.workflow.wait_for_event", kwargs={ "event_name": "{{upload_request.event_name}}", "timeout_seconds": 3600 }, result_key="uploaded_files", dependencies=["request_pdfs"] ) # Step 3: ForEach - Summarize each PDF in parallel builder.foreach( "summarize_papers", items="{{uploaded_files.files}}", loop_body=lambda fb: fb.task( "summarize_pdf", "tools.llm.call", kwargs={ "model": "gpt-4", "prompt": "Summarize this academic paper in 200 words. Include: title, main contribution, methodology, and key findings.", "attachments": [{"type": "storage_ref", "ref": "{{item.storage_ref}}"}] }, result_key="summary" ), dependencies=["wait_for_pdfs"] ) # Step 4: Combine all summaries into single document builder.task( "combine_summaries", "tools.llm.call", kwargs={ "model": "gpt-4", "prompt": "Combine these paper summaries into a cohesive research digest document:\n\n{{summarize_papers.results}}" }, result_key="combined_summary", dependencies=["summarize_papers"] ) # Step 5: Store combined summary as txt file builder.task( "store_summary", "tools.storage.store", kwargs={ "name": "research_digest", "data": "{{combined_summary.content}}", "content_type": "text/plain", "retention_policy": "days:30" }, result_key="stored_file", dependencies=["combine_summaries"] ) # Step 6: Get public download URL builder.task( "get_public_url", "tools.storage.get_url", kwargs={ "storage_ref": "{{stored_file.storage_ref}}", "expires_in": 86400 }, result_key="public_url", dependencies=["store_summary"] ) # Step 7: Return final result builder.task( "finalize", "tools.shell.run", args=["echo 'Workflow complete. Download your summary at: {{public_url}}'"], result_key="final_result", dependencies=["get_public_url"] ) return builder.build() ``` ### Test Procedure 1. Submit workflow via API 2. Get upload_token from workflow variables 3. Download test PDFs: - https://arxiv.org/pdf/2511.00079 - https://arxiv.org/pdf/2411.19485 - https://arxiv.org/pdf/2411.05451 4. Upload via API: - GET /api/v1/uploads/{token}/info -> presigned_urls - PUT each PDF to presigned URL - POST /api/v1/uploads/{token}/complete 5. Wait for workflow completion 6. Verify: - 3 papers summarized (foreach executed 3 times) - Combined summary stored - Public URL is accessible and returns txt content - All storage_refs use SHA-256 naming ### Success Criteria - [ ] Workflow completes with status "completed" - [ ] ForEach spawned 3 parallel summarization tasks - [ ] Combined summary file stored in S3 - [ ] Public URL returns valid text content - [ ] File names are SHA-256 hashes (no collision possible) - [ ] All operations done via API (no manual DB changes) ## Implementation Phases - Phase 1: Core storage tool (store/retrieve/get_url/delete) + S3 provider - Phase 2: User upload capability (request_upload, API, upload page) - Phase 3: Retention management and cleanup job - Phase 4: LLM tool enhancement (storage_ref support) - Phase 5: Acceptance workflow test ## Dependencies - boto3 (S3 client) - Existing S3 infrastructure (s3.rodmena.co.uk) - Vault for S3 credentials - LLM tool enhancement (Phase 4)
Priority
Low
Medium
High
Critical
Status
Open
In Progress
Closed
Won't Do
Due Date (YYYY-MM-DD)
Tags (comma separated)
Related Issues (IDs)
Enter IDs of issues related to this one. They will be linked as 'related'.
Update Issue
Cancel