SoFunction
Updated on 2025-03-01

python grpc implements asynchronous calls (no grpc asynchronous interface)

Grpc synchronous calls are simpler, but when handling complex tasks, they will cause request blockage and affect throughput. Of course, the grpc asynchronous interface can be used to solve the problem. This party adopts another method: the server receives the request and puts it in the request queue, and the independent thread processes each request, and then calls the client's service and reply to the processing results. That is, the client is also the server.

The following DEMO implements functions:

  • Image transmission is realized between the client and the server
  • The reasoning service has two interfaces: Request and Response
  • The server implements the Request interface and the client implements the Response interface. These two interfaces are only used to send messages.
  • After the server's message processing thread handles the client's request, it calls the client's Response interface.

1.infer_session.proto

syntax = "proto3";
service Inference {
  rpc Request  (InferMessage) returns (Status) {} //Server side implementation  rpc Response (InferMessage) returns (Status) {} //Client implementation}
message InferMessage {
  int32 frame_id = 1;    //Frame number  int32 client_port=2;   //Client port  int32 shm_id=3;        //Shared memory block id  int32 width=4;         //Image Width  int32 height=5;        //Image height  int32 channels=6;      //Number of image channels  string session_id=7;   //Session uuid}
message Status {
  int32 status = 1;         //Status Code  string error_message=2;   //error message}

2. Generate Python library functions

python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./infer_session.proto

3.infer_session_server.py

from concurrent import futures
import logging
import threading
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import queue
import traceback
import time
from common import SharedMemory,ThreadSafeDict
import numpy as np

class InferenceServer(infer_session_pb2_grpc.InferenceServicer):
    def __init__(self) -> None:
        super().__init__()
        =None
        self.black_list=set()
        
    def Request(self, request, context):
        self.request_queue.put(request)
        return infer_session_pb2.Status(status=0,error_message="")

    def Open(self,port=50051):
        self.process_running=True
        self.bind_addr="localhost:{}".format(port)
        self.client_session = ThreadSafeDict()
        self.request_queue= ()

        self.process_thread = (target=)
        self.process_thread.start()

        self.service_ready_semaphore = (0)
        self.server_thread = (target=)
        self.server_thread.start()
        self.service_ready_semaphore.acquire()    
        return True

    def Run(self):
         = ((max_workers=10))
        infer_session_pb2_grpc.add_InferenceServicer_to_server(self, )
        .add_insecure_port(self.bind_addr)
        ()
        print("Server started, listening on " + self.bind_addr)
        self.service_ready_semaphore.release()
        .wait_for_termination()

    def Process(self):
        while self.process_running:
            if not self.request_queue.empty():
                request=self.request_queue.get(False,2)   
                if request.session_id in self.black_list:
                    if request.session_id in self.client_session:
                        del self.client_session[request.session_id]                    
                    continue
                try:
                    if request.session_id not in self.client_session:
                        record={}
                        print("connect:",request.client_port)
                        record['channel']=grpc.insecure_channel("localhost:{}".format(request.client_port))
                        record['stub']=infer_session_pb2_grpc.InferenceStub(record['channel'])
                        grpc.channel_ready_future(record['channel']).result(timeout=5)
                        self.client_session[request.session_id]=record

                    shm=SharedMemory(,,,
                                     request.client_port,request.shm_id)
                    data = ((,,), 
                                                dtype=np.uint8, buffer=())               
                    data+=1 #Modify data                    ()

                    ret=self.client_session[request.session_id]['stub'].Response(request,timeout=5) 
                    if !=0:
                        print("Response Error:{} {}".format(,ret.error_message))
                except:
                    traceback.print_exc()
                    self.black_list.add(request.session_id)
                    if request.session_id in self.client_session:
                        del self.client_session[request.session_id]
            else:
                (0.001)

    def Stop(self):
        print("Stop")
        (3)
        self.process_running=False
        self.process_thread.join()
        self.server_thread.join()

if __name__ == "__main__":

    ()
    server=InferenceServer()
    ()
    input()
    ()

4.infer_session_client.py

from __future__ import print_function
from concurrent import futures
import logging
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import threading
import numpy as np
import os
import queue
from common import SharedMemory
import time
import argparse
import uuid

class InferenceClient(infer_session_pb2_grpc.InferenceServicer):

    def __init__(self) -> None:
        super().__init__()
        self.send_index=0
        self.recv_index=None
        =uuid.uuid4()
        print()

    def Response(self, response, context):
        request=self.busy_q.get()
        pred_data = ((,,), 
                                    dtype=np.uint8, buffer=())

        golden=(pred_data.shape,dtype=np.uint8)
        (response.frame_id+1)

        result=(golden==pred_data).all()
        if not result:
           print("ID:{} ShmId:{} Pass:{}".format(response.frame_id,response.shm_id,result))
        self.free_q.put(request)
        self.recv_index=response.frame_id
        return infer_session_pb2.Status(status=0,error_message="")
    
    def WaitFinish(self):
        while True:
            if self.send_index==self.recv_index:
                return
            (0.001)

    def Open(self,client_port,width,height,channel,qsize,remote_addr="localhost:50051"):
        try:
            self.client_port=client_port
            self.bind_addr="localhost:{}".format(client_port)
            self.free_q= (qsize*2)
            self.busy_q= (qsize*2)
            for shm_id in range(qsize):
                self.free_q.put(SharedMemory(width,height,channel,self.client_port,shm_id))            
            =grpc.insecure_channel(remote_addr)
            grpc.channel_ready_future().result(timeout=5)
             = infer_session_pb2_grpc.InferenceStub()
            self.server_ready=False
            self.service_ready_semaphore = (0)
            self.server_thread = (target=)
            self.server_thread.start()
            self.service_ready_semaphore.acquire()
            return self.server_ready
        except:
            return False
        
    def Stop(self):
        print("Stop")
        (3)
        self.server_thread.join()

    def Request(self,frame_index):
        request=self.free_q.get()   
        data = ((,,), 
                                    dtype=np.uint8, buffer=())

        (frame_index)

        response = (infer_session_pb2.InferMessage(frame_id=frame_index,
                                                                    client_port=self.client_port,
                                                                    shm_id=request.shm_id,
                                                                    width=,
                                                                    height=,
                                                                    channels=,
                                                                    session_.format()
                                                                    ))
        self.busy_q.put(request)
        self.send_index=frame_index
        return ==0

    def Run(self):
        try:
             = ((max_workers=2))
            infer_session_pb2_grpc.add_InferenceServicer_to_server(self, )
            .add_insecure_port(self.bind_addr)
            ()
            self.server_ready=True
            print("Server started, listening on " + self.bind_addr)
            self.service_ready_semaphore.release()
            .wait_for_termination()
        except:
            self.service_ready_semaphore.release()

if __name__ == "__main__":

    parser = (description="Demo of argparse")
    parser.add_argument('--port', type=int,  default=50000)
    parser.add_argument('--remote_addr', type=str, default="localhost:50051")
    args = parser.parse_args()
    ()
    
    client=InferenceClient()
    (client_port=,width=320,height=240,channel=1,qsize=10,remote_addr=args.remote_addr)

    while True:
        t0=()
        count=128
        for i in range(count):
            (i)
        ()
        t1=()
        print("{} FPS:{:.3f}".format(,count/(t1-t0)))   
    
    ()

import mmap
import numpy as np
import os
import threading

# Define a SharedMemory class to read and write data in shared memoryclass SharedMemory(object):
    def __init__(self,width,height,channels,port,shm_id) -> None:
        =width
        =height
        =channels
        self.shm_id=shm_id
        ="/sys/fs/cgroup/{}_{}".format(port,shm_id)
        =width*height*channels
        if not ():
            ()        
            =(,os.O_RDWR|os.O_CREAT)
            (,)
        else:
            =(,os.O_RDWR)                
        =(,,access=mmap.ACCESS_WRITE)
        (0)
    
    # Get data in shared memory    def get(self):
        return 

    # Close shared memory    def close(self):
        ()
        ()

# Define a ThreadSafeDict class to operate dictionaries safely in multithreadingclass ThreadSafeDict:
    def __init__(self, initial_dict=None):
        self._dict = {} if initial_dict is None else initial_dict.copy()
         = ()

    # Get the value in the dictionary    def __getitem__(self, key):
        with :
            return self._dict[key]

    # Set the value in the dictionary    def __setitem__(self, key, value):
        with :
            self._dict[key] = value

    # Delete the value in the dictionary    def __delitem__(self, key):
        with :
            del self._dict[key]

    # Check if a key exists in the dictionary    def __contains__(self, item):
        with :
            return item in self._dict

    # Get the value in the dictionary, and return the default value if it does not exist    def get(self, key, default=None):
        with :
            return self._dict.get(key, default)

    # Set the value in the dictionary, and do not change the value if the key already exists    def setdefault(self, key, default):
        with :
            return self._dict.setdefault(key, default)

    # Update dictionary    def update(self, other_dict):
        with :
            self._dict.update(other_dict)

6. Run

python3 infer_session_server.py &
python3 infer_session_client.py --port 50001

7. Output

50001 FPS:2296.293
50001 FPS:2222.019
50001 FPS:2347.274
50001 FPS:2124.001

This is the end of this article about python grpc implementing asynchronous calls (without using grpc asynchronous interface). For more related content of python grpc asynchronous calls, please search for my previous articles or continue browsing the following related articles. I hope everyone will support me in the future!