Implement coroutine pooling using channel
Implementing Goroutine Pool through Channel is the disadvantage of causing frequent opening and cancellation of coroutines, but fortunately, it is simple, flexible and universal.
package main import ( "fmt" "io/ioutil" "net/http" "sync" ) // Pool goroutine Pool type Pool struct { queue chan int wg * } // New Create a new coroutine poolfunc New(size int) *Pool { if size <= 0 { size = 1 } return &Pool{ queue: make(chan int, size), wg: &{}, } } // Add Add a new executionfunc (p *Pool) Add(delta int) { // Add delta as a positive number for i := 0; i < delta; i++ { <- 1 } // If delta is negative, it will decrease for i := 0; i > delta; i-- { <- } (delta) } // Done execution is reduced by onefunc (p *Pool) Done() { <- () } func (p *Pool) Wait() { () } func main() { // Here is a limit of 100 concurrency pool := New(100) // {} //Suppose that 10 million http requests need to be sent, and then I send 100 coroutines to complete this matter for i := 0; i < 10000000; i++ { (1) //I found that 100 people are sending, and then it will be stuck until someone completes the announcement that he has quit the coroutine go func(i int) { resp, err := ("") if err != nil { (i, err) } else { defer () result, _ := () (i, string(result)) } () }(i) } () }
Consumer model implements coroutine pool
Frequently open and eliminate coroutines. If there are high performance requirements, it is recommended to optimize it into a fixed number of coroutines to fetch data from the channel for consumption, so as to avoid the creation and cancellation of coroutines.
package main import ( "fmt" "strconv" "sync" ) // Task Objecttype task struct { Production Consumer } // Set the number of consumers, that is, the size of the work poolfunc (t *task) setConsumerPoolSize(poolSize int) { = make(chan *Job, poolSize*10) = poolSize } // Task data objecttype Job struct { Data string } func NewTask(handler func(jobs chan *Job) (b bool)) (t *task) { t = &task{ Production: Production{Jobs: make(chan *Job, 100)}, Consumer: Consumer{WorkPoolNum: 10, Handler: handler}, } return } type Production struct { Jobs chan *Job } func (c Production) AddData(data *Job) { <- data } type Consumer struct { WorkPoolNum int Handler func(chan *Job) (b bool) Wg } // Asynchronously enable multiple works to process tasks, but all works will exit the program only after they are executed.func (c *Consumer) disposeData(data chan *Job) { for i := 0; i <= ; i++ { (1) go func() { defer func() { () }() (data) }() } () } func main() { // Implement a closure for processing data and implement business code consumerHandler := func(jobs chan *Job) (b bool) { for jobs := range jobs { (jobs) } return } // new a task processing object t := NewTask(consumerHandler) (500) // 500 coroutines are consumed simultaneously // Generate data according to your own business and add data to generate channel through the AddData method. Here are 1 million pieces of data go func() { for i := 0; i < 1000000; i++ { job := new(Job) iStr := (i) = "Define task data format" + iStr (job) } }() // Consumer consumption data () }
This is the article about GO implementing coroutine pool management. For more related content on GO implementing coroutine pools, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!