TDengine Go Connector/taosdata/driver-goUsing the API in cgo call, we found that the number of threads is growing continuously during use. This article starts with a cgo call to parse the Go source code and analyzes the reasons for thread growth.
Convert cgo code
Convert driver-go/wrapper/
go tool cgo
Generate after execution_obj
Folders
go code analysis
bytaosc.
middleTaosResetCurrentDB
As an example, analyze it.
// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos); func TaosResetCurrentDB(taosConnect ) { func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }() } //go:linkname _cgoCheckPointer func _cgoCheckPointer(interface{}, interface{}) //go:cgo_unsafe_args func _Cfunc_taos_reset_current_db(p0 ) (r1 _Ctype_void) { _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr((&p0))) if _Cgo_always_false { _Cgo_use(p0) } return } //go:linkname _cgo_runtime_cgocall func _cgo_runtime_cgocall(, uintptr) int32 //go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db //go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = (&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)
TaosResetCurrentDB
Call first_cgoCheckPointer
Check whether the incoming parameters arenil
。
//go:linkname _cgoCheckPointer
expresscgoCheckPointer
The method implementation is, if the incoming parameter is
nil
The program willpanic
。
Then call_Cfunc_taos_reset_current_db
。
Cfunc_taos_reset_current_db
In the method_Cgo_always_false
It will be false at runtime, so only the first sentence is analyzed_cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr((&p0)))
。
-
_cgo_runtime_cgocall
Realization isThis will focus on analysis.
-
_cgo_453a0cad50ef_Cfunc_taos_reset_current_db
From the last code block above, it can be seen thattaos_reset_current_db
Method pointer. -
uintptr((&p0))
Denotes the pointer address of p0. - From the above, we can see that this sentence means calling
, the parameter is the pointer address of the method pointer and parameter.
analyze
based ongolang 1.20.4
Analyze this method
func cgocall(fn, arg ) int32 { if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" { throw("cgocall unavailable") } if fn == nil { throw("cgocall nil") } if raceenabled { racereleasemerge((&racecgosync)) } mp := getg().m // Get the current goroutine's M ++ // Total cgo count +1 ++ // Current cgo count +1 [0] = 0 // Reset tracking entersyscall() // Enter the system call, save the context, mark the current goroutine exclusive m, skip garbage collection osPreemptExtEnter(mp) // Mark asynchronous preemption, invalidating the asynchronous preemption logic = true // Modify status errno := asmcgocall(fn, arg) // Where to actually make method calls = false // Modify status -- // The current cgo call -1 osPreemptExtExit(mp) // Recover asynchronous preemption exitsyscall() // Exit the system call and restore scheduler control if raceenabled { raceacquire((&racecgosync)) } // Avoid premature recycling of GC KeepAlive(fn) KeepAlive(arg) KeepAlive(mp) return errno }
Two of the main methodsentersyscall
andasmcgocall
, Next, these two methods are focused on analysis.
Analysis entersyscall
func entersyscall() { reentersyscall(getcallerpc(), getcallersp()) }
entersyscall
Directly calledreentersyscall
, followreentersyscall
A paragraph in the comment:
// If the syscall does not block, that is it, we do not emit any other events. // If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;
ifsyscall
If the call is not blocked, no event will be triggered.retaker
Will triggertraceGoSysBlock
, then you need to understand how long it takes to be considered blocked, follow it firstretaker
method.
func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { pp := allp[i] if pp == nil { continue } pd := & s := sysretake := false if s == _Prunning || s == _Psyscall { t := int64() if int64() != t { = uint32(t) = now } else if +forcePreemptNS <= now { preemptone(pp) sysretake = true } } // Preempt P from system calls if s == _Psyscall { // If a system monitored tick (20us) has been exceeded, preempt P from the system call t := int64() if !sysretake && int64() != t { = uint32(t) = now continue } if runqempty(pp) && ()+() > 0 && +10*1000*1000 > now { continue } unlock(&allpLock) incidlelocked(-1) if (&, s, _Pidle) { if { traceGoSysBlock(pp) traceProcStop(pp) } n++ ++ handoffp(pp) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }
From the above, we can see that the system call blocking will be preempted for more than 20 microseconds. P, cgo is forcedhandoffp
, next analysishandoffp
method
func handoffp(pp *p) { // ... // A new M needs to be started without tasks and without spins and idle M. if ()+() == 0 && (0, 1) { (0) startm(pp, true) return } // ... }
handoffp
The method will be calledstartm
To start a new M and followstartm
method.
func startm(pp *p, spinning bool) { // ... nmp := mget() if nmp == nil { // No M is available, call newm id := mReserveID() unlock(&) var fn func() if spinning { fn = mspinning } newm(fn, pp, id) releasem(mp) return } // ... }
If there is no M at this timestartm
Will callnewm
Create a new M, and then analyze itnewm
method.
func newm(fn func(), pp *p, id int64) { acquirem() mp := allocm(pp, fn, id) (pp) = initSigmask if gp := getg(); gp != nil && != nil && ( != 0 || ) && GOOS != "plan9" { lock(&) if == 0 { throw("on a locked thread with no template thread") } = (mp) if { = false notewakeup(&) } unlock(&) releasem(getg().m) return } newm1(mp) releasem(getg().m) } func newm1(mp *m) { if iscgo { var ts cgothreadstart if _cgo_thread_start == nil { throw("_cgo_thread_start missing") } (mp.g0) = (*uint64)((&[0])) = (abi.FuncPCABI0(mstart)) if msanenabled { msanwrite((&ts), (ts)) } if asanenabled { asanwrite((&ts), (ts)) } () // Create a new thread asmcgocall(_cgo_thread_start, (&ts)) () return } () newosproc(mp) () }
fromnewm
It is seen that if the threads are all blocking, it will be callednewm1
,newm1
Call_cgo_thread_start
Create a new thread.
From the above analysis, it is concluded that when cgo is called with high concurrent and the execution time exceeds 20 microseconds, a new thread will be created.
Analysis asmcgocall
Analyze only amd64
asm_amd64.s
TEXT ·asmcgocall(SB),NOSPLIT,$0-20 MOVQ fn+0(FP), AX MOVQ arg+8(FP), BX MOVQ SP, DX // Consider whether you need to switch to m.g0 stack // Also used to call to create new OS threads, which are already in the m.g0 stack get_tls(CX) MOVQ g(CX), DI CMPQ DI, $0 JEQ nosave MOVQ g_m(DI), R8 MOVQ m_gsignal(R8), SI CMPQ DI, SI JEQ nosave MOVQ m_g0(R8), SI CMPQ DI, SI JEQ nosave // Switch to the system stack CALL gosave_systemstack_switch<>(SB) MOVQ SI, g(CX) MOVQ (g_sched+gobuf_sp)(SI), SP // In the scheduling stack (pthread newly created stack) // Make sure there is enough space for the four stack-based fast-call registers // To make windows amd64 call service SUBQ $64, SP ANDQ $~15, SP // Align for gcc ABI MOVQ DI, 48(SP) // Save g MOVQ (g_stack+stack_hi)(DI), DI SUBQ DX, DI MOVQ DI, 40(SP) // Save the stack depth (cannot only save SP, because the stack may be copied during callback) MOVQ BX, DI // DI = AMD64 ABI First parameter MOVQ BX, CX // CX = Win64 First parameter CALL AX // Call fn // Recover registers, g, stack pointers get_tls(CX) MOVQ 48(SP), DI MOVQ (g_stack+stack_hi)(DI), SI SUBQ 40(SP), SI MOVQ DI, g(CX) MOVQ SI, SP MOVL AX, ret+16(FP) RET nosave: // Run on the system stack, there may be no g // No g happens in thread creation or thread ending (such as needm/dropm on Solaris platform) // This code is similar to the above, but does not save and restore g, and does not consider the stack movement problem (because we are on the system stack, not the goroutine stack) // If it is already on the system stack, the above code can be used directly, and the following code will be entered on Solaris. // Use this code to serve all calls that are "already on the system stack" to maintain correctness. SUBQ $64, SP ANDQ $~15, SP // ABI Alignment MOVQ $0, 48(SP) // The above code saves g, ensures that it is available when debugging MOVQ DX, 40(SP) // Save the original stack pointer MOVQ BX, DI // DI = AMD64 ABI First parameter MOVQ BX, CX // CX = Win64 First parameter CALL AX MOVQ 40(SP), SI // Restore the original stack pointer MOVQ SI, SP MOVL AX, ret+16(FP) RET
This section moves the current stack to the system stack for execution, because C requires an infinite stack, and executing C functions on Go's stack will cause stack overflow.
Caused a problem
The cgo call will move the current stack to the system stack, and a new thread will be created when the cgo is highly concurrent and blocked for more than 20 microseconds. Go does not destroy threads, which causes thread growth.
Solution
Limit the maximum number of threads for Go programs, default to the number of cpu cores.
(())
Use channel to limit the maximum number of cgo concurrencies to cpu cores
package thread import "runtime" var c chan struct{} func Lock() { c <- struct{}{} } func Unlock() { <-c } func init() { c = make(chan struct{}, ()) }
Limit for cgo calls that exceed 20 microseconds:
() (result) ()
The above is the detailed content of source code analysis of the reason why thread growth is caused by using cgo in Go. For more information about the growth of threads in Go, please pay attention to my other related articles!