#240 MCP Server Integration for Highway DSL
Description
Edit# MCP (Model Context Protocol) Server Integration for Highway DSL
## Overview
Implement comprehensive MCP server support for Highway Workflow Engine, enabling workflows to connect to and invoke tools from any MCP-compliant server. This brings the entire MCP ecosystem (thousands of servers) to Highway workflows.
## Background
MCP (Model Context Protocol) is Anthropic's open standard for connecting AI systems to external tools and data sources. Key features:
- JSON-RPC 2.0 protocol over stdio or HTTP/SSE
- Tool discovery and invocation
- Resource access (files, APIs, data)
- Prompt templates
- Adopted by OpenAI, Google, and major AI frameworks
## Design Goals
1. **Seamless DSL Integration**: Use MCP tools as naturally as built-in tools
2. **Multi-Server Support**: Connect to multiple MCP servers per workflow
3. **Durability**: Full checkpoint support for MCP calls
4. **Tenant Isolation**: Per-tenant MCP server configurations
5. **Discovery**: Dynamic tool discovery from connected servers
6. **Security**: Secure credential handling via Vault
---
## Architecture
### Component 1: MCP Server Registry (Tenant-Scoped)
Location: engine/mcp/registry.py
Store MCP server configurations per tenant:
```python
@dataclass
class MCPServerConfig:
server_id: str # Unique identifier
name: str # Display name
transport: Literal["stdio", "http"]
command: str | None # For stdio
args: list[str] | None
url: str | None # For HTTP
env: dict[str, str] # Secrets from Vault
tenant_id: str
is_active: bool
```
### Component 2: MCP Client Wrapper (Durable)
Location: engine/mcp/client.py
```python
class DurableMCPClient:
async def connect(ctx, server_id) -> MCPSession
async def list_tools(ctx, server_id) -> list[MCPTool]
async def invoke_tool(ctx, server_id, tool_name, arguments, timeout) -> MCPToolResult
async def list_resources(ctx, server_id) -> list[MCPResource]
async def read_resource(ctx, server_id, uri) -> bytes
```
### Component 3: Highway Tool Bridge
Location: engine/tools/mcp_tools.py
```python
# Tool: tools.mcp.invoke
def mcp_invoke_tool(ctx, server_id, tool_name, arguments, timeout) -> dict
# Tool: tools.mcp.list_tools
def mcp_list_tools(ctx, server_id) -> list[dict]
# Tool: tools.mcp.read_resource
def mcp_read_resource(ctx, server_id, uri) -> dict
```
### Component 4: DSL Builder Extensions
Location: highway_dsl/workflow_dsl.py
```python
class WorkflowBuilder:
def mcp_tool(self, task_id, server, tool, arguments=None, result_key=None, **kwargs):
"""Invoke an MCP tool."""
```
### Component 5: API Endpoints
Location: api/blueprints/v1/mcp.py
```
POST /api/v1/mcp/servers # Register MCP server
GET /api/v1/mcp/servers # List registered servers
GET /api/v1/mcp/servers/{id} # Get server details
DELETE /api/v1/mcp/servers/{id} # Remove server
PUT /api/v1/mcp/servers/{id} # Update config
GET /api/v1/mcp/servers/{id}/tools # List tools
POST /api/v1/mcp/servers/{id}/invoke # Invoke tool
```
---
## DSL Usage Examples
### Simple MCP Tool Call
```python
builder.mcp_tool(
"list_repos",
server="github",
tool="list_repositories",
arguments={"org": "anthropics"},
result_key="repos"
)
```
### Multi-Server Workflow
```python
builder.mcp_tool("get_prs", server="github",
tool="list_pull_requests", result_key="github_prs")
builder.mcp_tool("create_issues", server="linear",
tool="create_issues",
arguments={"prs": "{{github_prs}}"},
dependencies=["get_prs"])
builder.mcp_tool("notify", server="slack",
tool="send_message",
arguments={"channel": "#releases", "message": "Done"},
dependencies=["create_issues"])
```
---
## Implementation Phases
### Phase 1: Core Infrastructure
- Database schema for mcp_server_config
- MCP Python SDK integration (mcp[cli] package)
- DurableMCPClient with stdio transport
- Basic tool registration
### Phase 2: API & Registry
- MCPServerRegistry with tenant isolation
- REST API endpoints
- Vault credential handling
### Phase 3: DSL Integration
- WorkflowBuilder.mcp_tool() method
- OpenAPI documentation
- Workflow examples
### Phase 4: HTTP Transport & Polish
- HTTP/SSE transport support
- Connection pooling
- Circuit breaker integration
---
## Database Schema
```sql
CREATE TABLE mcp_server_config (
id UUID PRIMARY KEY,
tenant_id VARCHAR(255) NOT NULL,
server_id VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
transport VARCHAR(50) NOT NULL,
command TEXT,
args JSONB DEFAULT '[]',
url TEXT,
env_secrets JSONB DEFAULT '{}',
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id, server_id)
);
```
---
## Files to Create
- engine/mcp/__init__.py
- engine/mcp/client.py
- engine/mcp/registry.py
- engine/mcp/transports.py
- engine/tools/mcp_tools.py
- api/blueprints/v1/mcp.py
- api/openapi/models/mcp.py
- engine/migrations/sql/028_mcp_servers.sql
- tests/integration/test_mcp_integration.py
- api/dsl_templates/mcp_github_workflow.py
## Files to Modify
- highway_dsl/workflow_dsl.py - Add mcp_tool() method
- engine/tools/registry.py - Register MCP tools
- requirements.txt - Add mcp[cli]>=1.0.0
---
## Security Considerations
1. Credential Storage in Vault
2. Tenant Isolation
3. Command Injection Prevention
4. Rate Limiting
5. Timeout Enforcement
## References
- https://modelcontextprotocol.io/specification
- https://github.com/modelcontextprotocol/python-sdk
- https://www.anthropic.com/news/model-context-protocol
Comments
Loading comments...
Context
Loading context...
Audit History
View AllLoading audit history...