Preface
This article mainly introduces relevant content on Golang's two-way copy of TCP connections. It is shared for your reference and learning. I won't say much below, let's take a look at the detailed introduction together.
The easiest implementation
Every time a Server connection is made, a new Client connection is opened. Use one goroutine to copy from the server to the client, and then use another goroutine to copy from the client to the server. Any party is disconnected, both directions are disconnected.
func main() { (1) listener, err := ("tcp", "127.0.0.1:8848") if err != nil { panic(err) } for { conn, err := () if err != nil { panic(err) } go handle(conn.(*)) } } func handle(server *) { defer () client, err := ("tcp", "127.0.0.1:8849") if err != nil { (err) return } defer () go func() { defer () defer () buf := make([]byte, 2048) (server, client, buf) }() buf := make([]byte, 2048) (client, server, buf) }
One thing worth noting is that the default buffer is relatively large, giving a small buffer can support more concurrent connections.
The two goroutines are in unison after one exits, and the other exits. This implementation is achieved by closing the server or client socket. Because the socket is closed, it will exit.
Client implements connection pool
An obvious problem is that after each server connection comes in, it is necessary to temporarily establish a connection to the new client side. This includes the handshake time of a tcp connection in the total time spent in the proxy. If the client can implement connection pool multiplexing of existing connections, the end-to-end delay can be shortened.
var pool = make(chan , 100) func borrow() (, error) { select { case conn := <- pool: return conn, nil default: return ("tcp", "127.0.0.1:8849") } } func release(conn ) error { select { case pool <- conn: // returned to pool return nil default: // pool is overflow return () } } func handle(server *) { defer () client, err := borrow() if err != nil { (err) return } defer release(client) go func() { defer () defer release(client) buf := make([]byte, 2048) (server, client, buf) }() buf := make([]byte, 2048) (client, server, buf) }
This version of implementation is obviously problematic. Because the connection cannot be guaranteed to remain connected when returned to the pool. Another more serious problem is that because the client's connection is no longer closed, when the server side closes the connection, the goroutine made from the client to the server cannot be exited.
Therefore, there are several problems to be solved:
- How to exit another goroutine when exiting on one goroutine?
- How to ensure that the connection returned to the pool is valid?
- How to keep the connection in the pool still valid?
Interrupt Goroutine via SetDeadline
A common view is that Goroutine cannot be interrupted. When a Goroutine is doing it, the coroutine is blocked there. There is actually not no way, we can interrupt Goroutine by. However, in the case of connection pool, the link cannot be Close. Another way is to interrupt the currently ongoing blocking read or blocking write by SetDeadline for a past timestamp.
var pool = make(chan , 100) type client struct { conn inUse * } func borrow() (clt *client, err error) { var conn select { case conn = <- pool: default: conn, err = ("tcp", "127.0.0.1:18849") } if err != nil { return nil, err } clt = &client{ conn: conn, inUse: &{}, } return } func release(clt *client) error { (().Add(-)) () () select { case pool <- : // returned to pool return nil default: // pool is overflow return () } } func handle(server *) { defer () clt, err := borrow() if err != nil { (err) return } (1) defer release(clt) go func() { (1) defer () defer release(clt) buf := make([]byte, 2048) (server, , buf) }() buf := make([]byte, 2048) (, server, buf) }
The goroutine interrupt is implemented through SetDeadline, and then it is passed to ensure that all users exit and then return it to the connection pool. Otherwise, when a connection is reused, the previous user may not have exited.
Connectivity
To ensure that the connection is still valid until it is returned to the pool. If an error is found during the process of reading and writing of the connection, we need to mark the connection that is problematic, and it will be released and then closed. However, SetDeadline will inevitably lead to a timeout error during reading or writing, so timeout needs to be eliminated.
var pool = make(chan , 100) type client struct { conn inUse * isValid int32 } const maybeValid = 0 const isValid = 1 const isInvalid = 2 func (clt *client) Read(b []byte) (n int, err error) { n, err = (b) if err != nil { if !isTimeoutError(err) { atomic.StoreInt32(&, isInvalid) } } else { atomic.StoreInt32(&, isValid) } return } func (clt *client) Write(b []byte) (n int, err error) { n, err = (b) if err != nil { if !isTimeoutError(err) { atomic.StoreInt32(&, isInvalid) } } else { atomic.StoreInt32(&, isValid) } return } type timeoutErr interface { Timeout() bool } func isTimeoutError(err error) bool { timeoutErr, _ := err.(timeoutErr) if timeoutErr == nil { return false } return () } func borrow() (clt *client, err error) { var conn select { case conn = <- pool: default: conn, err = ("tcp", "127.0.0.1:18849") } if err != nil { return nil, err } clt = &client{ conn: conn, inUse: &{}, isValid: maybeValid, } return } func release(clt *client) error { (().Add(-)) () () if == isValid { return () } select { case pool <- : // returned to pool return nil default: // pool is overflow return () } } func handle(server *) { defer () clt, err := borrow() if err != nil { (err) return } (1) defer release(clt) go func() { (1) defer () defer release(clt) buf := make([]byte, 2048) (server, clt, buf) }() buf := make([]byte, 2048) (clt, server, buf) }
To determine whether error is timeout, it requires type forced rotation to implement.
Whether the conn in the connection pool is still valid, it is relatively expensive to implement it by using the backend ping method. Because different protocols need to connect to different pings. The easiest way is to try it next time you use it. If the connection is not easy to use, change to create a new connection to avoid continuously getting invalid connections. In this way, invalid connections are eliminated.
About correctness
This article was written at Hangzhou Airport, which does not guarantee the correctness of the content at all.
Summarize
The above is the entire content of this article. I hope that the content of this article has certain reference value for everyone's study or work. If you have any questions, you can leave a message to communicate. Thank you for your support.