handling async streaming request in grpc python

2024/10/9 4:18:47

I am trying to understand how to handle a grpc api with bidirectional streaming (using the Python API).

Say I have the following simple server definition:

syntax = "proto3";
package simple;service TestService {rpc Translate(stream Msg) returns (stream Msg){}
}message Msg
{string msg = 1;
}

Say that the messages that will be sent from the client come asynchronously ( as a consequence of user selecting some ui elements).

The generated python stub for the client will contain a method Translate that will accept a generator function and will return an iterator.

What is not clear to me is how would I write the generator function that will return messages as they are created by the user. Sleeping on the thread while waiting for messages doesn't sound like the best solution.

Answer

This is a bit clunky right now, but you can accomplish your use case as follows:

#!/usr/bin/env pythonfrom __future__ import print_functionimport time
import random
import collections
import threadingfrom concurrent import futures
from concurrent.futures import ThreadPoolExecutor
import grpcfrom translate_pb2 import Msg
from translate_pb2_grpc import TestServiceStub
from translate_pb2_grpc import TestServiceServicer
from translate_pb2_grpc import add_TestServiceServicer_to_serverdef translate_next(msg):return ''.join(reversed(msg))class Translator(TestServiceServicer):def Translate(self, request_iterator, context):for req in request_iterator:print("Translating message: {}".format(req.msg))yield Msg(msg=translate_next(req.msg))class TranslatorClient(object):def __init__(self):self._stop_event = threading.Event()self._request_condition = threading.Condition()self._response_condition = threading.Condition()self._requests = collections.deque()self._last_request = Noneself._expected_responses = collections.deque()self._responses = {}def _next(self):with self._request_condition:while not self._requests and not self._stop_event.is_set():self._request_condition.wait()if len(self._requests) > 0:return self._requests.popleft()else:raise StopIteration()def next(self):return self._next()def __next__(self):return self._next()def add_response(self, response):with self._response_condition:request = self._expected_responses.popleft()self._responses[request] = responseself._response_condition.notify_all()def add_request(self, request):with self._request_condition:self._requests.append(request)with self._response_condition:self._expected_responses.append(request.msg)self._request_condition.notify()def close(self):self._stop_event.set()with self._request_condition:self._request_condition.notify()def translate(self, to_translate):self.add_request(to_translate)with self._response_condition:while True:self._response_condition.wait()if to_translate.msg in self._responses:return self._responses[to_translate.msg]def _run_client(address, translator_client):with grpc.insecure_channel('localhost:50054') as channel:stub = TestServiceStub(channel)responses = stub.Translate(translator_client)for resp in responses:translator_client.add_response(resp)def main():server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))add_TestServiceServicer_to_server(Translator(), server)server.add_insecure_port('[::]:50054')server.start()translator_client = TranslatorClient()client_thread = threading.Thread(target=_run_client, args=('localhost:50054', translator_client))client_thread.start()def _translate(to_translate):return translator_client.translate(Msg(msg=to_translate)).msgtranslator_pool = futures.ThreadPoolExecutor(max_workers=4)to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)translations = translator_pool.map(_translate, to_translate)print("Translations: {}".format(zip(to_translate, translations)))translator_client.close()client_thread.join()server.stop(None)if __name__ == "__main__":main()

The basic idea is to have an object called TranslatorClient running on a separate thread, correlating requests and responses. It expects that responses will return in the order that requests were sent out. It also implements the iterator interface so that you can pass it directly to an invocation of the Translate method on your stub.

We spin up a thread running _run_client which pulls responses out of TranslatorClient and feeds them back in the other end with add_response.

The main function I included here is really just a strawman since I don't have the particulars of your UI code. I'm running _translate in a ThreadPoolExecutor to demonstrate that, even though translator_client.translate is synchronous, it yields, allowing you to have multiple in-flight requests at once.

We recognize that this is a lot of code to write for such a simple use case. Ultimately, the answer will be asyncio support. We have plans for this in the not-too-distant future. But for the moment, this sort of solution should keep you going whether you're running python 2 or python 3.

https://en.xdnf.cn/q/70062.html

Related Q&A

Add new column to a HuggingFace dataset

In the dataset I have 5000000 rows, I would like to add a column called embeddings to my dataset. dataset = dataset.add_column(embeddings, embeddings) The variable embeddings is a numpy memmap array of…

Django: how to order_by on a related field of a related field

Im using annotate to add a property to an object which I can then use for order_by. However, I want to annotate on a field of a relation on a relation. I know I should be able to get to the field someh…

How to extract the cell state and hidden state from an RNN model in tensorflow?

I am new to TensorFlow and have difficulties understanding the RNN module. I am trying to extract hidden/cell states from an LSTM. For my code, I am using the implementation from https://github.com/ay…

Python - Nested List to Tab Delimited File?

I have a nested list comprising ~30,000 sub-lists, each with three entries, e.g.,nested_list = [[x, y, z], [a, b, c]].I wish to create a function in order to output this data construct into a tab delim…

How to make sure buildout doesnt use the already installed packages?

I am trying to switch fully to buildout - but our development environment already has lot of stuff installed in /usr/lib/pythonxx/How can I make sure that buildout doesnt use the libraries installed on…

Can python setup.py install use wheels?

I am using setuptools. Is there a way to have the following command use wheels instead of source?python setup.py installIn particular, I have a custom package that requires pandas. While pandas insta…

Getting the last element of a level in a multiindex

I have a dataframe in this format:a b x 1 1 31 1 2 1 1 3 42 1 4 423 1 5 42 1 6 3 1 7 44 1 8 65437 1 9 73 2 1 5656 2 2 7 2 3 5 2 4 5 2 5 34a a…

Sphinx and JavaScript Documentation Workflow [closed]

Closed. This question needs to be more focused. It is not currently accepting answers.Want to improve this question? Update the question so it focuses on one problem only by editing this post.Closed 5…

Getting two characters from string in python [duplicate]

This question already has answers here:Split string every nth character(19 answers)How to iterate over a list in chunks(40 answers)Closed last year.how to get in python from string not one character, b…

I Call API from PYTHON I get the response 406 Not Acceptable

I created a API in my site and Im trying to call an API from python but I always get 406 as a response, however, if I put the url in the browser with the parameters, I can see the correct answerI alrea…