#165 Consider adding tools.kafka for built-in Kafka producer/consumer support
Description
Edit## Proposal
Consider adding Kafka as a first-class tool in Highway, similar to tools.http, tools.email, tools.llm.
## Current State
To use Kafka in workflows, users must:
1. Write custom Python functions (producer.py, consumer.py)
2. Handle connection management manually
3. Implement their own error handling and retries
4. No circuit breaker integration
5. No standardized logging
## Proposed tools.kafka API
### tools.kafka.produce
```python
builder.task(
"send_messages",
"tools.kafka.produce",
kwargs={
"bootstrap_servers": "localhost:9092,localhost:9093",
"topic": "my-topic",
"messages": [
{"key": "k1", "value": {"data": "payload1"}},
{"key": "k2", "value": {"data": "payload2"}},
],
# OR generate messages
"message_count": 100,
"message_generator": "mymodule.generate_message",
"message_interval_seconds": 0.1,
},
result_key="producer_result",
)
```
### tools.kafka.consume
```python
builder.activity(
"consume_messages",
"tools.kafka.consume",
kwargs={
"bootstrap_servers": "localhost:9092",
"topic": "my-topic",
"consumer_group": "my-group",
"max_messages": 100,
"max_empty_polls": 30,
"processor": "mymodule.process_message", # Optional per-message callback
},
result_key="consumer_result",
timeout_policy=TimeoutPolicy(timeout=timedelta(hours=1)),
)
```
### tools.kafka.admin
```python
builder.task(
"create_topic",
"tools.kafka.admin",
kwargs={
"operation": "create_topic",
"topic": "new-topic",
"partitions": 3,
"replication_factor": 2,
},
)
```
## Benefits
1. **Consistency**: Same patterns as other tools (http, email, llm)
2. **Circuit Breaker**: Built-in integration with highway_circuitbreaker
3. **Logging**: Automatic DataShard logging of produce/consume operations
4. **Error Handling**: Standardized retry policies
5. **Configuration**: Bootstrap servers from Vault secrets
6. **Monitoring**: Integration with activity monitoring API
## Implementation Considerations
- Add kafka-python as optional dependency
- Store Kafka credentials in Vault (tools.secrets.get_secret)
- Circuit breaker for broker connections
- Support for SASL/SSL authentication
- Schema registry integration (Avro/Protobuf)
## Context
Implemented Kafka ETL pipeline (Issue #162) using custom Python functions. Pattern could be generalized into reusable tool.
Comments
Loading comments...
Context
Loading context...
Audit History
View AllLoading audit history...