>_
.issue.db
/highway-workflow-engine
Dashboard
Issues
Memory
Lessons
Audit Log
New Issue
Edit Issue #165
Update issue details
Title *
Description
## Proposal Consider adding Kafka as a first-class tool in Highway, similar to tools.http, tools.email, tools.llm. ## Current State To use Kafka in workflows, users must: 1. Write custom Python functions (producer.py, consumer.py) 2. Handle connection management manually 3. Implement their own error handling and retries 4. No circuit breaker integration 5. No standardized logging ## Proposed tools.kafka API ### tools.kafka.produce ```python builder.task( "send_messages", "tools.kafka.produce", kwargs={ "bootstrap_servers": "localhost:9092,localhost:9093", "topic": "my-topic", "messages": [ {"key": "k1", "value": {"data": "payload1"}}, {"key": "k2", "value": {"data": "payload2"}}, ], # OR generate messages "message_count": 100, "message_generator": "mymodule.generate_message", "message_interval_seconds": 0.1, }, result_key="producer_result", ) ``` ### tools.kafka.consume ```python builder.activity( "consume_messages", "tools.kafka.consume", kwargs={ "bootstrap_servers": "localhost:9092", "topic": "my-topic", "consumer_group": "my-group", "max_messages": 100, "max_empty_polls": 30, "processor": "mymodule.process_message", # Optional per-message callback }, result_key="consumer_result", timeout_policy=TimeoutPolicy(timeout=timedelta(hours=1)), ) ``` ### tools.kafka.admin ```python builder.task( "create_topic", "tools.kafka.admin", kwargs={ "operation": "create_topic", "topic": "new-topic", "partitions": 3, "replication_factor": 2, }, ) ``` ## Benefits 1. **Consistency**: Same patterns as other tools (http, email, llm) 2. **Circuit Breaker**: Built-in integration with highway_circuitbreaker 3. **Logging**: Automatic DataShard logging of produce/consume operations 4. **Error Handling**: Standardized retry policies 5. **Configuration**: Bootstrap servers from Vault secrets 6. **Monitoring**: Integration with activity monitoring API ## Implementation Considerations - Add kafka-python as optional dependency - Store Kafka credentials in Vault (tools.secrets.get_secret) - Circuit breaker for broker connections - Support for SASL/SSL authentication - Schema registry integration (Avro/Protobuf) ## Context Implemented Kafka ETL pipeline (Issue #162) using custom Python functions. Pattern could be generalized into reusable tool.
Priority
Low
Medium
High
Critical
Status
Open
In Progress
Closed
Due Date (YYYY-MM-DD)
Tags (comma separated)
Related Issues (IDs)
Enter IDs of issues related to this one. They will be linked as 'related'.
Update Issue
Cancel