SoFunction
Updated on 2025-03-05

Golang implements Redis series (VI) How to implement the redis client in pipeline mode

The complete code of this article is/hdt3213/godis/redis/client

Usually, the communication mode of the TCP client is blocking: the client sends a request -> waits for the server to respond -> sends the next request. Because you need to wait for the network to transmit data, it takes more time to complete a request loop.

Can we send the next request directly without waiting for the server to respond? The answer is yes.

As a full duplex protocol, TCP can perform uplink and downlink communication at the same time, and there is no need to worry about the conflict that the client and server send packets simultaneously.

. When two people speak at the same time, they will be in conflict and cannot hear clearly, so they can only speak in turn. This communication method is called half-duplex. The broadcast can only be sent by the radio to the radio but cannot be transmitted in reverse. This method is called simplex.

We assign a goroutine to each tcp connection to ensure that the request received is first responded to first. On the other hand, the tcp protocol will ensure the orderliness of the data flow. The request server sent first on the same tcp connection will receive first, and the response client that replys first receives first. Therefore we don't have to worry about confusing the corresponding requests for the response.

This pattern in which the client continues to send requests to the server when the server does not respond is called the Pipeline mode. Because the time to wait for network transmission is reduced, the Pipeline mode can greatly improve throughput and reduce the number of tcp links required.

The redis client in pipeline mode needs to have two background coroutines responsible for tcp communication. The caller sends instructions to the background coroutines through channel and blocks and waits until a response is received. This is a typical asynchronous programming mode.

Let's first define the client's structure:

type Client struct {
    conn         // Connect to the server tcp    pendingReqs chan *Request // Wait for a request sent    waitingReqs chan *Request // Request waiting for the server to respond    ticker      * // Timer used to trigger heartbeat packets    addr        string

    ctx        
    cancelFunc 
    writing    * // A request being processed cannot be stopped immediately. It is used to implement graceful shutdown}

type Request struct {
    id        uint64 // Request id    args      [][]byte // Uplink parameters    reply      // Received return value    heartbeat bool // Tag whether it is a heartbeat request    waiting   * // After calling the coroutine to send the request, waitgroup to wait for the request to be processed asynchronously    err       error
}

The caller sends the request to the background coroutine and waits for the asynchronous processing to complete through the wait group:

func (client *Client) Send(args [][]byte)  {
	request := &request{
		args:      args,
		heartbeat: false,
		waiting:   &{},
	}
	(1)
	(1)
	defer ()
	 <- request // Request to join the queue	timeout := (maxWait) // Wait for a response or timeout	if timeout {
		return ("server time out")
	}
	if  != nil {
		return ("request failed")
	}
	return 
}

The core part of the client is the background read and write coroutine. Start by writing coroutines:

// Write coroutine entryfunc (client *Client) handleWrite() {
	for req := range  {
		(req)
	}
}

// Send a requestfunc (client *Client) doRequest(req *request) {
	if req == nil || len() == 0 {
		return
	}
    // Serialization request	re := ()
	bytes := ()
	_, err := (bytes)
	i := 0
    // Retry failed	for err != nil && i < 3 {
		err = (err)
		if err == nil {
			_, err = (bytes)
		}
		i++
	}
	if err == nil {
        // Send successfully and wait for the server to respond		 <- req
	} else {
		 = err
		()
	}
}

Reading coroutines is a protocol parser template we are familiar with. Friends who are not familiar with can go toAnalyze the Redis Cluster principlelearn more.

// Received a response from the serverfunc (client *Client) finishRequest(reply ) {
	defer func() {
		if err := recover(); err != nil {
			()
			(err)
		}
	}()
	request := <-
	if request == nil {
		return
	}
	 = reply
	if  != nil {
		()
	}
}

// Read coroutine is a RESP protocol resolverfunc (client *Client) handleRead() error {
	ch := ()
	for payload := range ch {
		if  != nil {
			((()))
			continue
		}
		()
	}
	return nil
}

Finally, write the client constructor and code to start the asynchronous coroutine:

func MakeClient(addr string) (*Client, error) {
    conn, err := ("tcp", addr)
    if err != nil {
        return nil, err
    }
    ctx, cancel := (())
    return &Client{
        addr:        addr,
        conn:        conn,
        sendingReqs: make(chan *Request, chanSize),
        waitingReqs: make(chan *Request, chanSize),
        ctx:         ctx,
        cancelFunc:  cancel,
        writing:     &{},
    }, nil
}

func (client *Client) Start() {
     = (10 * )
    go ()
    go func() {
        err := ()
        (err)
    }()
    go ()
}

Remember to wait for the request to complete when closing the client:

func (client *Client) Close() {
    // Prevent new requests from entering the queue first    close()

    // Wait for the completion of the processed request    ()

    // Free up resources    _ = () // Close the connection with the server. After the connection is closed, read the coroutine will exit    () // Use context to close the read coroutine    close() // Close the queue}

Test it out:

func TestClient(t *) {
    client, err := MakeClient("localhost:6379")
    if err != nil {
        (err)
    }
    ()

    result = ([][]byte{
        []byte("SET"),
        []byte("a"),
        []byte("a"),
    })
    if statusRet, ok := result.(*); ok {
        if  != "OK" {
            ("`set` failed, result: " + )
        }
    }

    result = ([][]byte{
        []byte("GET"),
        []byte("a"),
    })
    if bulkRet, ok := result.(*); ok {
        if string() != "a" {
            ("`get` failed, result: " + string())
        }
    }
}

Keep working, we will find a way is Finley, welcome to join us.

This is the article about how to implement the Redis client of the Redis series of Golang (VI) to implement the pipeline mode. This is all about this article. For more information about the redis client of the Redis client of the Golang implementation of the pipeline mode, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!