#358 Implement storage tool with S3 backend and user upload capability
Description
Edit## Summary
Implement a new `storage` tool that enables workflows to:
1. Store workflow-generated files persistently (e.g., generated reports, summaries)
2. Request file uploads from users (human-in-the-loop) with presigned URLs
3. Retrieve and manage stored files with configurable retention policies
## Background
Current state:
- Artifacts: ZIP files only, 16MB limit, local filesystem, designed for code bundles
- DataShard: Already has S3 configured (s3.rodmena.co.uk/datashard) for logs
- No capability for workflows to store arbitrary files or collect user uploads
- ForEach operator already supports dynamic parallel fan-out (no DSL changes needed)
Use cases:
- Workflow generates 100-page summary document -> needs persistent storage
- KYC workflow needs user to upload ID documents -> needs upload capability
- Both cases require flexible retention policies defined by workflow
## Tool API Design
### Store workflow-generated files
```python
tools.storage.store(
name="summary_report",
data=bytes,
content_type="application/pdf",
retention_policy="days:90"
) -> storage_ref
```
### Retrieve stored files
```python
tools.storage.retrieve(storage_ref) -> bytes
tools.storage.get_url(storage_ref, expires_in=3600) -> url
tools.storage.delete(storage_ref)
```
### Request user upload (workflow sleeps until complete)
```python
tools.storage.request_upload(
name="kyc_documents",
allowed_types=["pdf", "jpg", "png"],
max_size_mb=50,
max_files=5,
prompt="Please upload your ID documents",
retention_policy="days:90",
expires_in=86400,
callback_url="https://bank.com/webhook",
redirect_url="https://bank.com/next-step",
) -> {
"upload_id": "uuid",
"upload_token": "Xk9mP2qR",
"upload_url": "https://highway.rodmena.app/u/Xk9mP2qR",
"presigned_urls": [...],
"expires_at": "2025-12-11T12:00:00Z"
}
```
## Storage Reference Format
**IMPORTANT: Use SHA-256 of file content for filename (content-addressable)**
```
storage_ref = "hwy://uploads/{tenant_id}/{workflow_run_id}/{sha256}.{ext}"
```
Example: `hwy://uploads/test/abc123/e3b0c44298fc1c149afbf4c8996fb924.pdf`
Benefits:
- Deduplication (same file = same key)
- No collision possible
- Consistent with existing artifact design
Internally stored as:
```json
{
"storage_type": "s3",
"bucket": "highway-uploads",
"key": "test/abc123/e3b0c44298fc1c149afbf4c8996fb924.pdf",
"content_type": "application/pdf",
"size_bytes": 1048576,
"checksum_sha256": "e3b0c44298fc1c149afbf4c8996fb924...",
"original_filename": "passport.pdf",
"created_at": "2025-12-11T10:00:00Z",
"retention_policy": "days:90",
"workflow_run_id": "abc123",
"tenant_id": "test"
}
```
## Upload Token Design
- Opaque 8-char alphanumeric token with DB lookup
- Revocable, auditable, short URLs
- Example: https://highway.rodmena.app/u/Xk9mP2qR
## Upload Flow
### Workflow Side:
1. tools.storage.request_upload(...) creates pending_upload record
2. Workflow sends upload_url to user (email/SMS/in-app)
3. ctx.wait_for_event("upload_{upload_id}") - workflow sleeps
### Client Side (Two Options):
Option A - Highway Upload Page:
- User visits /u/{token}, uploads via form, auto-completes
Option B - API Integration (for banks AND testing):
- GET /api/v1/uploads/{token}/info -> constraints, presigned_urls
- PUT to presigned_url with file body (direct S3 upload)
- POST /api/v1/uploads/{token}/complete -> triggers workflow
### Edge Cases:
- **Page refresh**: Regenerate presigned URLs on each GET /info call
- **Concurrent uploads**: First-complete-wins, but includes ALL uploaded files
- **Expired link**: Workflow handles via timeout + new request_upload()
- **Completion idempotency**: Second /complete returns 409 with existing files
### Workflow Resumes with:
```json
{
"files": [
{
"storage_ref": "hwy://uploads/test/wf123/e3b0c44298fc1c14.pdf",
"filename": "passport.pdf",
"content_type": "application/pdf",
"size_bytes": 2048576,
"checksum_sha256": "e3b0c44298fc1c14..."
}
],
"uploaded_at": "2025-12-11T11:30:00Z"
}
```
## Configuration (config.ini)
```ini
[uploads]
storage_type = s3
s3_endpoint = https://s3.rodmena.co.uk
s3_bucket = highway-uploads
s3_region = us-east-1
default_retention_days = 30
default_expiry_hours = 24
max_file_size_mb = 0
```
## Database Schema
### stored_files table
- file_id UUID PRIMARY KEY
- tenant_id VARCHAR(255) NOT NULL
- workflow_run_id UUID
- storage_ref VARCHAR(1024) UNIQUE NOT NULL
- s3_bucket VARCHAR(255) NOT NULL
- s3_key VARCHAR(1024) NOT NULL
- name VARCHAR(255) NOT NULL
- original_filename VARCHAR(255)
- content_type VARCHAR(255)
- size_bytes BIGINT
- checksum_sha256 VARCHAR(64) NOT NULL
- retention_policy VARCHAR(100)
- expires_at TIMESTAMPTZ
- created_at TIMESTAMPTZ DEFAULT NOW()
- created_by VARCHAR(255)
### pending_uploads table
- upload_id UUID PRIMARY KEY
- upload_token VARCHAR(32) UNIQUE NOT NULL
- tenant_id VARCHAR(255) NOT NULL
- workflow_run_id UUID NOT NULL
- event_name VARCHAR(255) NOT NULL
- name VARCHAR(255) NOT NULL
- allowed_types TEXT[]
- max_size_bytes BIGINT
- max_files INTEGER DEFAULT 1
- prompt TEXT
- s3_bucket VARCHAR(255) NOT NULL
- s3_key_prefix VARCHAR(1024) NOT NULL
- status VARCHAR(50) DEFAULT 'pending'
- retention_policy VARCHAR(100)
- callback_url TEXT
- redirect_url TEXT
- created_at TIMESTAMPTZ DEFAULT NOW()
- expires_at TIMESTAMPTZ NOT NULL
- completed_at TIMESTAMPTZ
### upload_files table (junction for multi-file)
- upload_id UUID REFERENCES pending_uploads
- file_id UUID REFERENCES stored_files
- file_index INTEGER NOT NULL
## API Endpoints
- GET /api/v1/uploads/{token}/info - Get constraints + fresh presigned URLs
- POST /api/v1/uploads/{token}/complete - Mark complete, trigger workflow
- POST /api/v1/uploads/{token}/cancel - Cancel pending upload
- GET /u/{token} - HTML upload page (built-in)
- GET /api/v1/storage/{ref}/url - Get presigned download URL
- DELETE /api/v1/storage/{ref} - Delete file
## LLM Tool Enhancement (Dependency)
The LLM tool must accept storage_ref in attachments:
```python
tools.llm.call(
model="gpt-4",
prompt="Summarize this document",
attachments=[
{"type": "storage_ref", "ref": "hwy://uploads/test/wf123/abc123.pdf"}
]
)
```
Tool internally:
1. Resolves storage_ref to presigned URL via storage service
2. Downloads content or passes URL to LLM provider
3. Processes as normal attachment
## Security
- 8-char token = 62^8 = 218 trillion combinations
- Presigned URL separate expiry (regenerated on each /info call)
- Server-side file type/size validation on /complete
- SHA-256 filename = content-addressable, no collision
- Tenant isolation on all queries
- RBAC: manage_uploads, view_uploads permissions
## ACCEPTANCE CRITERIA - MANDATORY WORKFLOW TEST
After implementation, the following workflow MUST work end-to-end:
### Article Summarization Workflow
```python
from highway_dsl import WorkflowBuilder
def get_workflow():
builder = WorkflowBuilder(name="arxiv_summarization")
# Step 1: Request PDF upload (3 arxiv papers)
builder.task(
"request_pdfs",
"tools.storage.request_upload",
kwargs={
"name": "arxiv_papers",
"allowed_types": ["pdf"],
"max_files": 3,
"prompt": "Upload 3 arxiv PDF papers for summarization",
"retention_policy": "days:7",
"expires_in": 3600
},
result_key="upload_request"
)
# Step 2: Wait for upload (test will upload programmatically)
builder.task(
"wait_for_pdfs",
"tools.workflow.wait_for_event",
kwargs={
"event_name": "{{upload_request.event_name}}",
"timeout_seconds": 3600
},
result_key="uploaded_files",
dependencies=["request_pdfs"]
)
# Step 3: ForEach - Summarize each PDF in parallel
builder.foreach(
"summarize_papers",
items="{{uploaded_files.files}}",
loop_body=lambda fb: fb.task(
"summarize_pdf",
"tools.llm.call",
kwargs={
"model": "gpt-4",
"prompt": "Summarize this academic paper in 200 words. Include: title, main contribution, methodology, and key findings.",
"attachments": [{"type": "storage_ref", "ref": "{{item.storage_ref}}"}]
},
result_key="summary"
),
dependencies=["wait_for_pdfs"]
)
# Step 4: Combine all summaries into single document
builder.task(
"combine_summaries",
"tools.llm.call",
kwargs={
"model": "gpt-4",
"prompt": "Combine these paper summaries into a cohesive research digest document:\n\n{{summarize_papers.results}}"
},
result_key="combined_summary",
dependencies=["summarize_papers"]
)
# Step 5: Store combined summary as txt file
builder.task(
"store_summary",
"tools.storage.store",
kwargs={
"name": "research_digest",
"data": "{{combined_summary.content}}",
"content_type": "text/plain",
"retention_policy": "days:30"
},
result_key="stored_file",
dependencies=["combine_summaries"]
)
# Step 6: Get public download URL
builder.task(
"get_public_url",
"tools.storage.get_url",
kwargs={
"storage_ref": "{{stored_file.storage_ref}}",
"expires_in": 86400
},
result_key="public_url",
dependencies=["store_summary"]
)
# Step 7: Return final result
builder.task(
"finalize",
"tools.shell.run",
args=["echo 'Workflow complete. Download your summary at: {{public_url}}'"],
result_key="final_result",
dependencies=["get_public_url"]
)
return builder.build()
```
### Test Procedure
1. Submit workflow via API
2. Get upload_token from workflow variables
3. Download test PDFs:
- https://arxiv.org/pdf/2511.00079
- https://arxiv.org/pdf/2411.19485
- https://arxiv.org/pdf/2411.05451
4. Upload via API:
- GET /api/v1/uploads/{token}/info -> presigned_urls
- PUT each PDF to presigned URL
- POST /api/v1/uploads/{token}/complete
5. Wait for workflow completion
6. Verify:
- 3 papers summarized (foreach executed 3 times)
- Combined summary stored
- Public URL is accessible and returns txt content
- All storage_refs use SHA-256 naming
### Success Criteria
- [ ] Workflow completes with status "completed"
- [ ] ForEach spawned 3 parallel summarization tasks
- [ ] Combined summary file stored in S3
- [ ] Public URL returns valid text content
- [ ] File names are SHA-256 hashes (no collision possible)
- [ ] All operations done via API (no manual DB changes)
## Implementation Phases
- Phase 1: Core storage tool (store/retrieve/get_url/delete) + S3 provider
- Phase 2: User upload capability (request_upload, API, upload page)
- Phase 3: Retention management and cleanup job
- Phase 4: LLM tool enhancement (storage_ref support)
- Phase 5: Acceptance workflow test
## Dependencies
- boto3 (S3 client)
- Existing S3 infrastructure (s3.rodmena.co.uk)
- Vault for S3 credentials
- LLM tool enhancement (Phase 4)
Comments
Loading comments...
Context
Loading context...
Audit History
View AllLoading audit history...