Is there a Python API for event-driven Kafka consumer?

2024/11/16 7:52:07

I have been trying to build a Flask app that has Kafka as the only interface. For this reason, I want have a Kafka consumer that is triggered when there is new message in the stream of the concerned topic and respond by pushing messages back to the Kafka stream.

I have been looking for something like the Spring implementation:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {System.out.println("Received Messasge in group mygroup: " + message);
}

I have looked at:

  1. kafka-python
  2. pykafka
  3. confluent-kafka

But I couldn't find anything related to event-driven style of implementation in Python.

Answer

Here is an implementation of the idea given by @MickaelMaison's answer. I used kafka-python.

from kafka import KafkaConsumer
import threadingBOOTSTRAP_SERVERS = ['localhost:9092']def register_kafka_listener(topic, listener):
# Poll kafkadef poll():# Initialize consumer Instanceconsumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)print("About to start polling for topic:", topic)consumer.poll(timeout_ms=6000)print("Started Polling for topic:", topic)for msg in consumer:print("Entered the loop\nKey: ",msg.key," Value:", msg.value)kafka_listener(msg)print("About to register listener to topic:", topic)t1 = threading.Thread(target=poll)t1.start()print("started a background thread")def kafka_listener(data):print("Image Ratings:\n", data.value.decode("utf-8"))register_kafka_listener('topic1', kafka_listener)

The polling is done in a different thread. Once a message is received, the listener is called by passing the data retrieved from Kafka.

https://en.xdnf.cn/q/71369.html

Related Q&A

SWIG python initialise a pointer to NULL

Is it possible to initialise a ptr to NULL from the python side when dealing with SWIG module?For example, say I have wrapped a struct track_t in a swig module m (_m.so), I can create a pointer to the…

Replacing punctuation in a data frame based on punctuation list [duplicate]

This question already has answers here:Fast punctuation removal with pandas(4 answers)Closed 5 years ago.Using Canopy and Pandas, I have data frame a which is defined by:a=pd.read_csv(text.txt)df=pd.Da…

How to import one submodule from different submodule? [duplicate]

This question already has answers here:Relative imports for the billionth time(14 answers)Closed 6 years ago.My project has the following structure:DSTC/st/__init__.pya.pyg.pytb.pydstc.pyHere is a.py i…

How to add dimension to a tensor using Tensorflow

I have method reformat in which using numpy I convert a label(256,) to label(256,2) shape. Now I want to do same operation on a Tensor with shape (256,)My code looks like this (num_labels=2) :--def ref…

Down arrow symbol in matplotlib

I would like to create a plot where some of the points have a downward pointing arrow (see image below). In Astronomy this illustrates that the true value is actually lower than whats measured.Note tha…

Overwrite the previous print value in python?

How can i overwrite the previous "print" value in python?print "hello" print "dude" print "bye"It will output:hello dude byeBut i want to overwrite the value.In…

pyQt4 - How to select table rows and disable editing cells

I create a QTableWidget with:self.table = QtGui.QTableWidget() self.table.setObjectName(table) self.table.setSelectionBehavior(QtGui.QAbstractItemView.SelectRows) verticalLayout.addWidget(self.table)wi…

Error when checking input: expected dense_input to have shape (21,) but got array with shape (1,)

How to fix the input array to meet the input shape?I tried to transpose the input array, as described here, but an error is the same.ValueError: Error when checking input: expected dense_input to have…

Sort order when loading related objects using selectinload in SQLAlchemy

Is there a way to specify the sort order when loading related objects using the selectinload option in SQLAlchemy?My SQLAlchemy version: 1.2.10 My python version: 3.6.6

How to implement autovivification for nested dictionary ONLY when assigning values?

TL;DR How can I get superkeys to be autovivified in a Python dict when assigning values to subkeys, without also getting them autovivified when checking for subkeys?Background: Normally in Python, se…