The complete code of this article is/hdt3213/godis/redis/client
Usually, the communication mode of the TCP client is blocking: the client sends a request -> waits for the server to respond -> sends the next request. Because you need to wait for the network to transmit data, it takes more time to complete a request loop.
Can we send the next request directly without waiting for the server to respond? The answer is yes.
As a full duplex protocol, TCP can perform uplink and downlink communication at the same time, and there is no need to worry about the conflict that the client and server send packets simultaneously.
. When two people speak at the same time, they will be in conflict and cannot hear clearly, so they can only speak in turn. This communication method is called half-duplex. The broadcast can only be sent by the radio to the radio but cannot be transmitted in reverse. This method is called simplex.
We assign a goroutine to each tcp connection to ensure that the request received is first responded to first. On the other hand, the tcp protocol will ensure the orderliness of the data flow. The request server sent first on the same tcp connection will receive first, and the response client that replys first receives first. Therefore we don't have to worry about confusing the corresponding requests for the response.
This pattern in which the client continues to send requests to the server when the server does not respond is called the Pipeline mode. Because the time to wait for network transmission is reduced, the Pipeline mode can greatly improve throughput and reduce the number of tcp links required.
The redis client in pipeline mode needs to have two background coroutines responsible for tcp communication. The caller sends instructions to the background coroutines through channel and blocks and waits until a response is received. This is a typical asynchronous programming mode.
Let's first define the client's structure:
type Client struct { conn // Connect to the server tcp pendingReqs chan *Request // Wait for a request sent waitingReqs chan *Request // Request waiting for the server to respond ticker * // Timer used to trigger heartbeat packets addr string ctx cancelFunc writing * // A request being processed cannot be stopped immediately. It is used to implement graceful shutdown} type Request struct { id uint64 // Request id args [][]byte // Uplink parameters reply // Received return value heartbeat bool // Tag whether it is a heartbeat request waiting * // After calling the coroutine to send the request, waitgroup to wait for the request to be processed asynchronously err error }
The caller sends the request to the background coroutine and waits for the asynchronous processing to complete through the wait group:
func (client *Client) Send(args [][]byte) { request := &request{ args: args, heartbeat: false, waiting: &{}, } (1) (1) defer () <- request // Request to join the queue timeout := (maxWait) // Wait for a response or timeout if timeout { return ("server time out") } if != nil { return ("request failed") } return }
The core part of the client is the background read and write coroutine. Start by writing coroutines:
// Write coroutine entryfunc (client *Client) handleWrite() { for req := range { (req) } } // Send a requestfunc (client *Client) doRequest(req *request) { if req == nil || len() == 0 { return } // Serialization request re := () bytes := () _, err := (bytes) i := 0 // Retry failed for err != nil && i < 3 { err = (err) if err == nil { _, err = (bytes) } i++ } if err == nil { // Send successfully and wait for the server to respond <- req } else { = err () } }
Reading coroutines is a protocol parser template we are familiar with. Friends who are not familiar with can go toAnalyze the Redis Cluster principlelearn more.
// Received a response from the serverfunc (client *Client) finishRequest(reply ) { defer func() { if err := recover(); err != nil { () (err) } }() request := <- if request == nil { return } = reply if != nil { () } } // Read coroutine is a RESP protocol resolverfunc (client *Client) handleRead() error { ch := () for payload := range ch { if != nil { ((())) continue } () } return nil }
Finally, write the client constructor and code to start the asynchronous coroutine:
func MakeClient(addr string) (*Client, error) { conn, err := ("tcp", addr) if err != nil { return nil, err } ctx, cancel := (()) return &Client{ addr: addr, conn: conn, sendingReqs: make(chan *Request, chanSize), waitingReqs: make(chan *Request, chanSize), ctx: ctx, cancelFunc: cancel, writing: &{}, }, nil } func (client *Client) Start() { = (10 * ) go () go func() { err := () (err) }() go () }
Remember to wait for the request to complete when closing the client:
func (client *Client) Close() { // Prevent new requests from entering the queue first close() // Wait for the completion of the processed request () // Free up resources _ = () // Close the connection with the server. After the connection is closed, read the coroutine will exit () // Use context to close the read coroutine close() // Close the queue}
Test it out:
func TestClient(t *) { client, err := MakeClient("localhost:6379") if err != nil { (err) } () result = ([][]byte{ []byte("SET"), []byte("a"), []byte("a"), }) if statusRet, ok := result.(*); ok { if != "OK" { ("`set` failed, result: " + ) } } result = ([][]byte{ []byte("GET"), []byte("a"), }) if bulkRet, ok := result.(*); ok { if string() != "a" { ("`get` failed, result: " + string()) } } }
Keep working, we will find a way is Finley, welcome to join us.
This is the article about how to implement the Redis client of the Redis series of Golang (VI) to implement the pipeline mode. This is all about this article. For more information about the redis client of the Redis client of the Golang implementation of the pipeline mode, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!