I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.
I have been looking for something like the Spring implementation:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {System.out.println("Received Messasge in group mygroup: " + message);
}
I have looked at:
- kafka-python
- pykafka
- confluent-kafka
But I couldn't find anything related to event-driven style of implementation in Python.
Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.
from kafka import KafkaConsumer
import threadingBOOTSTRAP_SERVERS = ['localhost:9092']def register_kafka_listener(topic, listener):
# Poll kafkadef poll():# Initialize consumer Instanceconsumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)print("About to start polling for topic:", topic)consumer.poll(timeout_ms=6000)print("Started Polling for topic:", topic)for msg in consumer:print("Entered the loop\nKey: ",msg.key," Value:", msg.value)kafka_listener(msg)print("About to register listener to topic:", topic)t1 = threading.Thread(target=poll)t1.start()print("started a background thread")def kafka_listener(data):print("Image Ratings:\n", data.value.decode("utf-8"))register_kafka_listener('topic1', kafka_listener)
The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.