SoFunction
Updated on 2025-03-05

Source code analysis The reason why using cgo in Go language leads to thread growth

TDengine Go Connector/taosdata/driver-goUsing the API in cgo call, we found that the number of threads is growing continuously during use. This article starts with a cgo call to parse the Go source code and analyzes the reasons for thread growth.

Convert cgo code

Convert driver-go/wrapper/

go tool cgo

Generate after execution_objFolders

go code analysis

bytaosc.middleTaosResetCurrentDBAs an example, analyze it.

// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
func TaosResetCurrentDB(taosConnect ) {
    func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
}
//go:linkname _cgoCheckPointer 
func _cgoCheckPointer(interface{}, interface{})
//go:cgo_unsafe_args
func _Cfunc_taos_reset_current_db(p0 ) (r1 _Ctype_void) {
    _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr((&p0)))
    if _Cgo_always_false {
        _Cgo_use(p0)
    }
    return
}
//go:linkname _cgo_runtime_cgocall 
func _cgo_runtime_cgocall(, uintptr) int32
//go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
//go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = (&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)

TaosResetCurrentDBCall first_cgoCheckPointerCheck whether the incoming parameters arenil

//go:linkname _cgoCheckPointer expresscgoCheckPointerThe method implementation is, if the incoming parameter isnilThe program willpanic

Then call_Cfunc_taos_reset_current_db

Cfunc_taos_reset_current_dbIn the method_Cgo_always_falseIt will be false at runtime, so only the first sentence is analyzed_cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr((&p0)))

  • _cgo_runtime_cgocallRealization isThis will focus on analysis.
  • _cgo_453a0cad50ef_Cfunc_taos_reset_current_dbFrom the last code block above, it can be seen thattaos_reset_current_dbMethod pointer.
  • uintptr((&p0))Denotes the pointer address of p0.
  • From the above, we can see that this sentence means calling, the parameter is the pointer address of the method pointer and parameter.

analyze

based ongolang 1.20.4Analyze this method

func cgocall(fn, arg ) int32 {
    if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
        throw("cgocall unavailable")
    }
    if fn == nil {
        throw("cgocall nil")
    }
    if raceenabled {
        racereleasemerge((&racecgosync))
    }
    mp := getg().m // Get the current goroutine's M    ++  // Total cgo count +1    ++      // Current cgo count +1    [0] = 0 // Reset tracking    entersyscall() // Enter the system call, save the context, mark the current goroutine exclusive m, skip garbage collection    osPreemptExtEnter(mp) // Mark asynchronous preemption, invalidating the asynchronous preemption logic     = true // Modify status    errno := asmcgocall(fn, arg) // Where to actually make method calls     = false // Modify status    -- // The current cgo call -1    osPreemptExtExit(mp) // Recover asynchronous preemption    exitsyscall() // Exit the system call and restore scheduler control    if raceenabled {
        raceacquire((&racecgosync))
    }
    // Avoid premature recycling of GC    KeepAlive(fn)
    KeepAlive(arg)
    KeepAlive(mp)
    return errno
}

Two of the main methodsentersyscallandasmcgocall, Next, these two methods are focused on analysis.

Analysis entersyscall

func entersyscall() {
    reentersyscall(getcallerpc(), getcallersp())
}

entersyscallDirectly calledreentersyscall, followreentersyscallA paragraph in the comment:

// If the syscall does not block, that is it, we do not emit any other events.
// If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;

ifsyscallIf the call is not blocked, no event will be triggered.retakerWill triggertraceGoSysBlock, then you need to understand how long it takes to be considered blocked, follow it firstretakermethod.

func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    for i := 0; i < len(allp); i++ {
        pp := allp[i]
        if pp == nil {
            continue
        }
        pd := &
        s := 
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            t := int64()
            if int64() != t {
                 = uint32(t)
                 = now
            } else if +forcePreemptNS <= now {
                preemptone(pp)
                sysretake = true
            }
        }
        // Preempt P from system calls        if s == _Psyscall {
            // If a system monitored tick (20us) has been exceeded, preempt P from the system call            t := int64()
            if !sysretake && int64() != t {
                 = uint32(t)
                 = now
                continue
            }
            if runqempty(pp) && ()+() > 0 && +10*1000*1000 > now {
                continue
            }
            unlock(&allpLock)
            incidlelocked(-1)
            if (&, s, _Pidle) {
                if  {
                    traceGoSysBlock(pp)
                    traceProcStop(pp)
                }
                n++
                ++
                handoffp(pp)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

From the above, we can see that the system call blocking will be preempted for more than 20 microseconds. P, cgo is forcedhandoffp, next analysishandoffpmethod

func handoffp(pp *p) {
    // ...
    // A new M needs to be started without tasks and without spins and idle M.    if ()+() == 0 && (0, 1) {
        (0)
        startm(pp, true)
        return
    }
    // ...
}

handoffpThe method will be calledstartmTo start a new M and followstartmmethod.

func startm(pp *p, spinning bool) {
    // ...
    nmp := mget()
    if nmp == nil {
        // No M is available, call newm        id := mReserveID()
        unlock(&)
        var fn func()
        if spinning {
            fn = mspinning
        }
        newm(fn, pp, id)
        releasem(mp)
        return
    }
    // ...
}

If there is no M at this timestartmWill callnewmCreate a new M, and then analyze itnewmmethod.

func newm(fn func(), pp *p, id int64) {
    acquirem()
    mp := allocm(pp, fn, id)
    (pp)
     = initSigmask
    if gp := getg(); gp != nil &&  != nil && ( != 0 || ) && GOOS != "plan9" {
        lock(&)
        if  == 0 {
            throw("on a locked thread with no template thread")
        }
         = 
        (mp)
        if  {
             = false
            notewakeup(&)
        }
        unlock(&)
        releasem(getg().m)
        return
    }
    newm1(mp)
    releasem(getg().m)
}
func newm1(mp *m) {
    if iscgo {
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        (mp.g0)
         = (*uint64)((&[0]))
         = (abi.FuncPCABI0(mstart))
        if msanenabled {
            msanwrite((&ts), (ts))
        }
        if asanenabled {
            asanwrite((&ts), (ts))
        }
        ()
        // Create a new thread        asmcgocall(_cgo_thread_start, (&ts))
        ()
        return
    }
    ()
    newosproc(mp)
    ()
}

fromnewmIt is seen that if the threads are all blocking, it will be callednewm1newm1Call_cgo_thread_startCreate a new thread.

From the above analysis, it is concluded that when cgo is called with high concurrent and the execution time exceeds 20 microseconds, a new thread will be created.

Analysis asmcgocall

Analyze only amd64

asm_amd64.s

TEXT ·asmcgocall(SB),NOSPLIT,$0-20
    MOVQ    fn+0(FP), AX
    MOVQ    arg+8(FP), BX
    MOVQ    SP, DX
    // Consider whether you need to switch to m.g0 stack    // Also used to call to create new OS threads, which are already in the m.g0 stack    get_tls(CX)
    MOVQ    g(CX), DI
    CMPQ    DI, $0
    JEQ nosave
    MOVQ    g_m(DI), R8
    MOVQ    m_gsignal(R8), SI
    CMPQ    DI, SI
    JEQ nosave
    MOVQ    m_g0(R8), SI
    CMPQ    DI, SI
    JEQ nosave
    // Switch to the system stack    CALL    gosave_systemstack_switch<>(SB)
    MOVQ    SI, g(CX)
    MOVQ    (g_sched+gobuf_sp)(SI), SP
    // In the scheduling stack (pthread newly created stack)    // Make sure there is enough space for the four stack-based fast-call registers    // To make windows amd64 call service    SUBQ    $64, SP
    ANDQ    $~15, SP // Align for gcc ABI    MOVQ    DI, 48(SP) // Save g    MOVQ    (g_stack+stack_hi)(DI), DI
    SUBQ    DX, DI
    MOVQ    DI, 40(SP) // Save the stack depth (cannot only save SP, because the stack may be copied during callback)    MOVQ    BX, DI  // DI = AMD64 ABI First parameter    MOVQ    BX, CX  // CX = Win64 First parameter    CALL    AX  // Call fn    // Recover registers, g, stack pointers    get_tls(CX)
    MOVQ    48(SP), DI
    MOVQ    (g_stack+stack_hi)(DI), SI
    SUBQ    40(SP), SI
    MOVQ    DI, g(CX)
    MOVQ    SI, SP
    MOVL    AX, ret+16(FP)
    RET
nosave:
    // Run on the system stack, there may be no g    // No g happens in thread creation or thread ending (such as needm/dropm on Solaris platform)    // This code is similar to the above, but does not save and restore g, and does not consider the stack movement problem (because we are on the system stack, not the goroutine stack)    // If it is already on the system stack, the above code can be used directly, and the following code will be entered on Solaris.    // Use this code to serve all calls that are "already on the system stack" to maintain correctness.    SUBQ    $64, SP
    ANDQ    $~15, SP // ABI Alignment    MOVQ    $0, 48(SP) // The above code saves g, ensures that it is available when debugging    MOVQ    DX, 40(SP) // Save the original stack pointer    MOVQ    BX, DI  // DI = AMD64 ABI First parameter    MOVQ    BX, CX  // CX = Win64 First parameter    CALL    AX
    MOVQ    40(SP), SI // Restore the original stack pointer    MOVQ    SI, SP
    MOVL    AX, ret+16(FP)
    RET

This section moves the current stack to the system stack for execution, because C requires an infinite stack, and executing C functions on Go's stack will cause stack overflow.

Caused a problem

The cgo call will move the current stack to the system stack, and a new thread will be created when the cgo is highly concurrent and blocked for more than 20 microseconds. Go does not destroy threads, which causes thread growth.

Solution

Limit the maximum number of threads for Go programs, default to the number of cpu cores.

(())

Use channel to limit the maximum number of cgo concurrencies to cpu cores

package thread
import "runtime"
var c chan struct{}
func Lock() {
    c <- struct{}{}
}
func Unlock() {
    <-c
}
func init() {
    c = make(chan struct{}, ())
}

Limit for cgo calls that exceed 20 microseconds:

()
(result)
()

The above is the detailed content of source code analysis of the reason why thread growth is caused by using cgo in Go. For more information about the growth of threads in Go, please pay attention to my other related articles!