Grpc synchronous calls are simpler, but when handling complex tasks, they will cause request blockage and affect throughput. Of course, the grpc asynchronous interface can be used to solve the problem. This party adopts another method: the server receives the request and puts it in the request queue, and the independent thread processes each request, and then calls the client's service and reply to the processing results. That is, the client is also the server.
The following DEMO implements functions:
- Image transmission is realized between the client and the server
- The reasoning service has two interfaces: Request and Response
- The server implements the Request interface and the client implements the Response interface. These two interfaces are only used to send messages.
- After the server's message processing thread handles the client's request, it calls the client's Response interface.
1.infer_session.proto
syntax = "proto3"; service Inference { rpc Request (InferMessage) returns (Status) {} //Server side implementation rpc Response (InferMessage) returns (Status) {} //Client implementation} message InferMessage { int32 frame_id = 1; //Frame number int32 client_port=2; //Client port int32 shm_id=3; //Shared memory block id int32 width=4; //Image Width int32 height=5; //Image height int32 channels=6; //Number of image channels string session_id=7; //Session uuid} message Status { int32 status = 1; //Status Code string error_message=2; //error message}
2. Generate Python library functions
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./infer_session.proto
3.infer_session_server.py
from concurrent import futures import logging import threading import grpc import infer_session_pb2 import infer_session_pb2_grpc import queue import traceback import time from common import SharedMemory,ThreadSafeDict import numpy as np class InferenceServer(infer_session_pb2_grpc.InferenceServicer): def __init__(self) -> None: super().__init__() =None self.black_list=set() def Request(self, request, context): self.request_queue.put(request) return infer_session_pb2.Status(status=0,error_message="") def Open(self,port=50051): self.process_running=True self.bind_addr="localhost:{}".format(port) self.client_session = ThreadSafeDict() self.request_queue= () self.process_thread = (target=) self.process_thread.start() self.service_ready_semaphore = (0) self.server_thread = (target=) self.server_thread.start() self.service_ready_semaphore.acquire() return True def Run(self): = ((max_workers=10)) infer_session_pb2_grpc.add_InferenceServicer_to_server(self, ) .add_insecure_port(self.bind_addr) () print("Server started, listening on " + self.bind_addr) self.service_ready_semaphore.release() .wait_for_termination() def Process(self): while self.process_running: if not self.request_queue.empty(): request=self.request_queue.get(False,2) if request.session_id in self.black_list: if request.session_id in self.client_session: del self.client_session[request.session_id] continue try: if request.session_id not in self.client_session: record={} print("connect:",request.client_port) record['channel']=grpc.insecure_channel("localhost:{}".format(request.client_port)) record['stub']=infer_session_pb2_grpc.InferenceStub(record['channel']) grpc.channel_ready_future(record['channel']).result(timeout=5) self.client_session[request.session_id]=record shm=SharedMemory(,,, request.client_port,request.shm_id) data = ((,,), dtype=np.uint8, buffer=()) data+=1 #Modify data () ret=self.client_session[request.session_id]['stub'].Response(request,timeout=5) if !=0: print("Response Error:{} {}".format(,ret.error_message)) except: traceback.print_exc() self.black_list.add(request.session_id) if request.session_id in self.client_session: del self.client_session[request.session_id] else: (0.001) def Stop(self): print("Stop") (3) self.process_running=False self.process_thread.join() self.server_thread.join() if __name__ == "__main__": () server=InferenceServer() () input() ()
4.infer_session_client.py
from __future__ import print_function from concurrent import futures import logging import grpc import infer_session_pb2 import infer_session_pb2_grpc import threading import numpy as np import os import queue from common import SharedMemory import time import argparse import uuid class InferenceClient(infer_session_pb2_grpc.InferenceServicer): def __init__(self) -> None: super().__init__() self.send_index=0 self.recv_index=None =uuid.uuid4() print() def Response(self, response, context): request=self.busy_q.get() pred_data = ((,,), dtype=np.uint8, buffer=()) golden=(pred_data.shape,dtype=np.uint8) (response.frame_id+1) result=(golden==pred_data).all() if not result: print("ID:{} ShmId:{} Pass:{}".format(response.frame_id,response.shm_id,result)) self.free_q.put(request) self.recv_index=response.frame_id return infer_session_pb2.Status(status=0,error_message="") def WaitFinish(self): while True: if self.send_index==self.recv_index: return (0.001) def Open(self,client_port,width,height,channel,qsize,remote_addr="localhost:50051"): try: self.client_port=client_port self.bind_addr="localhost:{}".format(client_port) self.free_q= (qsize*2) self.busy_q= (qsize*2) for shm_id in range(qsize): self.free_q.put(SharedMemory(width,height,channel,self.client_port,shm_id)) =grpc.insecure_channel(remote_addr) grpc.channel_ready_future().result(timeout=5) = infer_session_pb2_grpc.InferenceStub() self.server_ready=False self.service_ready_semaphore = (0) self.server_thread = (target=) self.server_thread.start() self.service_ready_semaphore.acquire() return self.server_ready except: return False def Stop(self): print("Stop") (3) self.server_thread.join() def Request(self,frame_index): request=self.free_q.get() data = ((,,), dtype=np.uint8, buffer=()) (frame_index) response = (infer_session_pb2.InferMessage(frame_id=frame_index, client_port=self.client_port, shm_id=request.shm_id, width=, height=, channels=, session_.format() )) self.busy_q.put(request) self.send_index=frame_index return ==0 def Run(self): try: = ((max_workers=2)) infer_session_pb2_grpc.add_InferenceServicer_to_server(self, ) .add_insecure_port(self.bind_addr) () self.server_ready=True print("Server started, listening on " + self.bind_addr) self.service_ready_semaphore.release() .wait_for_termination() except: self.service_ready_semaphore.release() if __name__ == "__main__": parser = (description="Demo of argparse") parser.add_argument('--port', type=int, default=50000) parser.add_argument('--remote_addr', type=str, default="localhost:50051") args = parser.parse_args() () client=InferenceClient() (client_port=,width=320,height=240,channel=1,qsize=10,remote_addr=args.remote_addr) while True: t0=() count=128 for i in range(count): (i) () t1=() print("{} FPS:{:.3f}".format(,count/(t1-t0))) ()
import mmap import numpy as np import os import threading # Define a SharedMemory class to read and write data in shared memoryclass SharedMemory(object): def __init__(self,width,height,channels,port,shm_id) -> None: =width =height =channels self.shm_id=shm_id ="/sys/fs/cgroup/{}_{}".format(port,shm_id) =width*height*channels if not (): () =(,os.O_RDWR|os.O_CREAT) (,) else: =(,os.O_RDWR) =(,,access=mmap.ACCESS_WRITE) (0) # Get data in shared memory def get(self): return # Close shared memory def close(self): () () # Define a ThreadSafeDict class to operate dictionaries safely in multithreadingclass ThreadSafeDict: def __init__(self, initial_dict=None): self._dict = {} if initial_dict is None else initial_dict.copy() = () # Get the value in the dictionary def __getitem__(self, key): with : return self._dict[key] # Set the value in the dictionary def __setitem__(self, key, value): with : self._dict[key] = value # Delete the value in the dictionary def __delitem__(self, key): with : del self._dict[key] # Check if a key exists in the dictionary def __contains__(self, item): with : return item in self._dict # Get the value in the dictionary, and return the default value if it does not exist def get(self, key, default=None): with : return self._dict.get(key, default) # Set the value in the dictionary, and do not change the value if the key already exists def setdefault(self, key, default): with : return self._dict.setdefault(key, default) # Update dictionary def update(self, other_dict): with : self._dict.update(other_dict)
6. Run
python3 infer_session_server.py & python3 infer_session_client.py --port 50001
7. Output
50001 FPS:2296.293
50001 FPS:2222.019
50001 FPS:2347.274
50001 FPS:2124.001
This is the end of this article about python grpc implementing asynchronous calls (without using grpc asynchronous interface). For more related content of python grpc asynchronous calls, please search for my previous articles or continue browsing the following related articles. I hope everyone will support me in the future!