SoFunction
Updated on 2025-03-03

Detailed explanation of golang's method of converting multiplexed asynchronous io to blocking io

Preface

This article mainly introduces to you the relevant content about how golang transforms multiplexed asynchronous io into blocking io. We will share it for your reference and learning. I won’t say much below. Let’s take a look at the detailed introduction together:

package main

import (
 "net"
)

func handleConnection(c ) {
 //Read and write data buffer := make([]byte, 1024)
 (buffer)
 ([]byte("Hello from server"))
}

func main() {
 l, err := ("tcp", "host:port")
 if err != nil {
 return
 }
 defer ()
 for {
 c, err := ()
 if err!= nil {
 return
 }
 go handleConnection(c)
 }
}

For us, we will write the above code. It is very simple. Indeed, the network part of golang hides too much for us. We don’t have to call the underlying socket functions like C++, nor do we have to use complex io multiplexing related logic such as epoll, but is the above code really blocking when calling accept and read as we seem?

// Multiple goroutines may invoke methods on a Conn simultaneously.
//Official comment: Multiple goroutines may call methods on one connection at the same time. My understanding is the so-called shocking effect.//In other words, you have multiple goroutines listening to the same connection and the same event, and all goroutines will be triggered.//This is just my guess and is to be verified.type Conn interface {
 Read(b []byte) (n int, err error)
 Write(b []byte) (n int, err error)
 Close() error
 LocalAddr() Addr
 RemoteAddr() Addr
 SetDeadline(t ) error
 SetReadDeadline(t ) error
 SetWriteDeadline(t ) error
}

type conn struct {
 fd *netFD
}

There is another Conn interface here. The following conn implements this interface, and there is only one member netFD.

// Network file descriptor.
type netFD struct {
 // locking/lifetime of sysfd + serialize access to Read and Write methods
 fdmu fdMutex

 // immutable until Close
 sysfd  int
 family  int
 sotype  int
 isConnected bool
 net   string
 laddr  Addr
 raddr  Addr

 // wait server
 pd pollDesc
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 //................
 for {
 s, rsa, err = accept()
 if err != nil {
 nerr, ok := err.(*)
 if !ok {
 return nil, err
 }
 switch  {
 /* If the error is EAGAIN, the Socket's buffer is empty and no data is read
     Then call, */
 case :
 if err = (); err == nil {
  continue
 }
 case :
 continue
 }
 return nil, err
 }
 break
 }
 //.........
 //The code is too long and will not be listed anymore. If you are interested, please see the source code of go, fd_unix.go under runtime return netfd, nil
}

The above code snippet is the accept part. Here we note that when an error occurs in accept, we will check whether the error is, if so, call WaitRead to wait for the goroutine currently reading this fd until the read event on this fd happens again. When new data arrives on this socket, the WaitRead call returns and continues the execution of the for loop, so that the place where the netFD read is called becomes a synchronous "blocking". If you are interested, you can see the reading and writing methods of netFD, and both have the same implementation.

All the questions here are concentrated on pollDesc. What exactly is it?

const (
 pdReady uintptr = 1
 pdWait uintptr = 2
)

// Network poller descriptor.
type pollDesc struct {
 link *pollDesc // in pollcache, protected by 
 lock mutex // protects the following fields
 fd  uintptr
 closing bool
 seq  uintptr // protects from stale timers and ready notifications
 rg  uintptr // pdReady, pdWait, G waiting for read or nil
 rt  timer // read deadline timer (set if  != nil)
 rd  int64 // read deadline
 wg  uintptr // pdReady, pdWait, G waiting for write or nil
 wt  timer // write deadline timer
 wd  int64 // write deadline
 user uint32 // user settable cookie
}

type pollCache struct {
 lock mutex
 first *pollDesc
}

The pollDesc network poller is a polling mechanism established in Golang for each socket file descriptor. The polling here is not a poll in the general sense, but Golang's runtime calls epoll_wait to obtain all socket file descriptors that generate IO events after the scheduling goroutine or GC is completed or within a specified time. Of course, before running time polling, the socket file descriptor and the relevant information of the current goroutine need to be added to the data structure maintained by epoll, and the current goroutine is suspended. When the IO is ready, the execution of the current goroutine is restored through the file descriptor returned by epoll and the information of the accompanying goroutine. Here we can see that there are two variables wg and rg in pollDesc. In fact, we can regard them as semaphores. These two variables have several different states:

  • pdReady: io ready
  • pdWait: The current goroutine is preparing to hang on the semaphore, but has not hanged yet.
  • G pointer: When we change it to a pointer to the current goroutine, the current goroutine hangs

Continue to the above WaitRead call, what exactly has go done here to make the current goroutine hang?

func net_runtime_pollWait(pd *pollDesc, mode int) int {
 err := netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // As for now only Solaris uses level-triggered IO.
 if GOOS == "solaris" {
 netpollarm(pd, mode)
 }
 for !netpollblock(pd, int32(mode), false) {
 err = netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // Can happen if timeout has fired and unblocked us,
 // but before we had a chance to run, timeout has been reset.
 // Pretend it has not happened and retry.
 }
 return 0
}


// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 //Get the read and write semaphore in the corresponding pollDesc according to the read and write mode gpp := &
 if mode == 'w' {
 gpp = &
 }

 for {
 old := *gpp
 //It is ready to return true directly if old == pdReady {
 *gpp = 0
 return true
 }
 if old != 0 {
 throw("netpollblock: double wait")
 }
  //Set gpp pdWait if (gpp, 0, pdWait) {
 break
 }
 }

 if waitio || netpollcheckerr(pd, mode) == 0 {
 gopark(netpollblockcommit, (gpp), "IO wait", traceEvGoBlockNet, 5)
 }

 old := (gpp, 0)
 if old > pdWait {
 throw("netpollblock: corrupted state")
 }
 return old == pdReady
}

When WaitRead is called, the net_runtime_pollWait function above is called after a piece of assembly. The function loops to call the netpollblock function, returning true to indicate that io is ready, returning false to indicate an error or timeout, the gopark function is called in the netpollblock, and the gopark function calls the mcall function, which is implemented by assembly. The specific function is to suspend the current goroutine and then execute other executable goroutines. At this point, the entire goroutine hang process has ended. How to notify the goroutine when it is readable? This is the credit of epoll.

func netpoll(block bool) *g {
 if epfd == -1 {
 return nil
 }
 waitms := int32(-1)
 if !block {
 waitms = 0
 }
 var events [128]epollevent
retry:
 //A maximum of 128 events are listened to each time n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 if n < 0 {
 if n != -_EINTR {
 println("runtime: epollwait on fd", epfd, "failed with", -n)
 throw("epollwait failed")
 }
 goto retry
 }
 var gp guintptr
 for i := int32(0); i < n; i++ {
 ev := &events[i]
 if  == 0 {
 continue
 }
 var mode int32
 //Read the event if &(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'r'
 }
 //Write events if &(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'w'
 }
 if mode != 0 {
  //Convert data in epoll to pollDesc pd := *(**pollDesc)((&))
 netpollready(&gp, pd, mode)
 }
 }
 if block && gp == 0 {
 goto retry
 }
 return ()
}

This is the familiar code. The use of epoll looks much more affordable.pd:=*(**pollDesc)((&))This is the most critical sentence. We get the pollDesc of the current readable time here. As we have already said above, the current goroutine will hang when the read and write semaphore of pollDesc is saved as G pointer. Here we call the netpollready function, and the corresponding read and write semaphore G pointer is erased in the function and set it to pdReady. The G-pointer state is erased, and the current G pointer of the goroutine is placed in the runnable queue, so the goroutine is awakened.

It can be seen that although we are writing tcp server, it seems to be a blocking network model, it is actually implemented based on the mechanism of asynchronous multiplexing, but it is encapsulated into a development model similar to blocking io, so that we do not have to pay attention to complex concepts such as asynchronous io, multiplexing, and chaotic callback functions.

Summarize

The above is the entire content of this article. I hope that the content of this article has certain reference value for everyone's study or work. If you have any questions, you can leave a message to communicate. Thank you for your support.