#165 Consider adding tools.kafka for built-in Kafka producer/consumer support

closed medium Created 2025-11-29 04:36 · Updated 2025-11-30 05:38

Description

Edit
## Proposal Consider adding Kafka as a first-class tool in Highway, similar to tools.http, tools.email, tools.llm. ## Current State To use Kafka in workflows, users must: 1. Write custom Python functions (producer.py, consumer.py) 2. Handle connection management manually 3. Implement their own error handling and retries 4. No circuit breaker integration 5. No standardized logging ## Proposed tools.kafka API ### tools.kafka.produce ```python builder.task( "send_messages", "tools.kafka.produce", kwargs={ "bootstrap_servers": "localhost:9092,localhost:9093", "topic": "my-topic", "messages": [ {"key": "k1", "value": {"data": "payload1"}}, {"key": "k2", "value": {"data": "payload2"}}, ], # OR generate messages "message_count": 100, "message_generator": "mymodule.generate_message", "message_interval_seconds": 0.1, }, result_key="producer_result", ) ``` ### tools.kafka.consume ```python builder.activity( "consume_messages", "tools.kafka.consume", kwargs={ "bootstrap_servers": "localhost:9092", "topic": "my-topic", "consumer_group": "my-group", "max_messages": 100, "max_empty_polls": 30, "processor": "mymodule.process_message", # Optional per-message callback }, result_key="consumer_result", timeout_policy=TimeoutPolicy(timeout=timedelta(hours=1)), ) ``` ### tools.kafka.admin ```python builder.task( "create_topic", "tools.kafka.admin", kwargs={ "operation": "create_topic", "topic": "new-topic", "partitions": 3, "replication_factor": 2, }, ) ``` ## Benefits 1. **Consistency**: Same patterns as other tools (http, email, llm) 2. **Circuit Breaker**: Built-in integration with highway_circuitbreaker 3. **Logging**: Automatic DataShard logging of produce/consume operations 4. **Error Handling**: Standardized retry policies 5. **Configuration**: Bootstrap servers from Vault secrets 6. **Monitoring**: Integration with activity monitoring API ## Implementation Considerations - Add kafka-python as optional dependency - Store Kafka credentials in Vault (tools.secrets.get_secret) - Circuit breaker for broker connections - Support for SASL/SSL authentication - Schema registry integration (Avro/Protobuf) ## Context Implemented Kafka ETL pipeline (Issue #162) using custom Python functions. Pattern could be generalized into reusable tool.

Comments

Loading comments...

Context

Loading context...

Audit History

View All
Loading audit history...