SoFunction
Updated on 2025-03-04

golang Concurrent Programming Producer and Consumer Detailed Explanation

The most attractive thing about golang may be concurrency. Golang has absolute advantages in both code writing and performance.

Learning the concurrency characteristics of a language, I like to implement a producer and consumer model. This model is very classic and is suitable for many concurrency scenarios. Let me briefly introduce the concurrency programming of golang through this model.

go concurrency syntax

Coroutine go

Coroutines are the smallest unit of golang concurrency, similar to threads in other languages, but the implementation of threads uses the implementation of the operating system. Each thread scheduling is a system call, which requires switching from user mode to kernel mode. This is a very time-consuming operation. Therefore, too many threads in general programs will cause a lot of performance to consume thread switching. This kind of scheduling is implemented inside golang. The switching of coroutines under this scheduling is very lightweight. It is normal for hundreds of coroutines to run in a golang program.

golang is created for concurrency, and the syntax to start a coroutine is very simple. Just use the go keyword

go func () {
    // do something
}

Synchronous signal

Multiple coroutines can be synchronized, which is similar to the semaphore in Linux.

var wg   // Declare a semaphore(1)   // Add one semaphore()   // Decrease the semaphore by one()   // Semaphore is timing blocking,Until the semaphore is0Wake up when

Channel chan

A channel can be understood as a message queue, where the producer puts it into the queue, and the consumer takes it from the queue. The channel can be closed using close

ic := make(chan int, 10)  // Declare a channelic <- 10        // Put it into the passagei := <- ic      // Take from the channelclose(ic)       // Close the channel

Producer and consumer implementation

Define product categories

This product category is defined based on specific business needs

type Product struct {
    name  int
    value int
}

Producer

If the stop flag is not false, keep putting product into the channel, and the semaphore is completed after completion

func producer(wg *, products chan<- Product, name int, stop *bool) {
    for !*stop {
        product := Product{name: name, value: ()}
        products <- product
        ("producer %v produce a product: %#v\n", name, product)
        ((200+(1000)) * )
    }
    ()
}

consumer

Continuously take product from the channel and then perform corresponding processing until the channel is closed and products are empty, the for loop will not terminate, which is exactly what we expect

func consumer(wg *, products <-chan Product, name int) {
    for product := range products {
        ("consumer %v consume a product: %#v\n", name, product)
        ((200+(1000)) * )
    }
    ()
}

Main thread

var wgp 
var wgc 
stop := false
products := make(chan Product, 10)
// Create 5 producers and 5 consumersfor i := 0; i &lt; 5; i++ {
    go producer(&amp;wgp, products, i, &amp;stop)
    go consumer(&amp;wgc, products, i)
    (1)
    (1)
}
((1) * )
stop = true     // Set the producer termination signal()      // Wait for the producer to exitclose(products) // Close the channel()      // Wait for the consumer to exit

Supplement: Go concurrent programming-implemented producer consumer model through channel

Overview

The producer consumer model is a classic model of multi-threaded design, which is widely used in the multi-threaded/process model design of various systems.

This article introduces the characteristics of channel in Go language and implements two producer and consumer models through Go language.

Some features of channel

In Go, channel is a very important means of coroutine communication. Channel is a two-way channel. Through channel, data transmission between coroutines can be realized, and synchronization between coroutines can be realized (there will be introduced later).

The producer consumer model introduced in this article mainly uses the following characteristics of the channel: only one coroutine can access a certain item in the channel at any time.

Single Producer Single Consumer Model

Putting both producers and consumers into a wireless loop is very similar to our server-side task processing. Producers constantly put data into the channel, while consumers constantly extract data from the channel and process (print).

Since the producer's coroutine will not exit, the channel's write will exist permanently, so when no data is placed in the channel, the consumer side will block and wait for the producer side to put the data into it.

The code implementation is as follows:

package main
import (
    "fmt"
    "time"
)
var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan int = make(chan int)
func sum(a int, b int) {
    ch1 &lt;- a + b
}
// write data to channel
func writer(max int) {
    for {
        for i := 0; i &lt; max; i++ {  // Simply put an integer into the channel            bufChan &lt;- i
            (1 * )  //Control the frequency of placement        }
    }
}
// read data fro m channel
func reader(max int) {
    for {
        r := &lt;-bufChan
        ("read value: %d\n", r)
    }
    // Notify the main thread that the work is over, this step can be omitted    msgChan &lt;- 1
}
func testWriterAndReader(max int) {
    go writer(max)
    go reader(max)
    // The task of writer and reader is over, and the main thread will be notified    res := &lt;-msgChan
    ("task is done: value=%d\n", res)
}
func main() {
    testWriterAndReader(100)
}

Multi-producer consumer model

We can use the channel's feature that only one coroutine can access one of its data at a certain point in time to implement the producer consumer model. Because channel has such characteristics, we do not need to add locks when placing and consuming data.

package main
import (
    "time"
    "fmt"
    "os"
)
var ch1 chan int = make(chan int)
var bufChan chan int = make(chan int, 1000)
var msgChan chan string = make(chan string)
func sum(a int, b int) {
    ch1 &lt;- a + b
}
// write data to channel
func writer(max int) {
    for {
        for i := 0; i &lt; max; i++ {
            bufChan &lt;- i
            (, "%v write: %d\n", (), i)
            (10 * )
        }
    }
}
// read data fro m channel
func reader(name string) {
    for {
        r := &lt;-bufChan
        ("%s read value: %d\n", name, r)
    }
    msgChan &lt;- name
}
func testWriterAndReader(max int) {
    // Turn on the goroutine of multiple writers and constantly write data into the channel    go writer(max)
    go writer(max)
    // Turn on the goroutine of multiple readers, constantly read data from the channel, and process data    go reader("read1")
    go reader("read2")
    go reader("read3")
    // Get the task completion status of three readers    name1 := &lt;-msgChan
    name2 := &lt;-msgChan
    name3 := &lt;-msgChan
    ("%s,%s,%s: All is done!!", name1, name2, name3)
}
func main() {
    testWriterAndReader(100)
}

The output is as follows:

read3 read value: 0

80731 write: 0

80731 write: 0

read1 read value: 0

80731 write: 1

read2 read value: 1

80731 write: 1

read3 read value: 1

80731 write: 2

read2 read value: 2

80731 write: 2

... ...

Summarize

This paper implements a classic producer and consumer model through channel, taking advantage of the characteristics of channel. However, it should be noted that when the consumer's speed is smaller than the producer, the channel may cause congestion, resulting in an increase in memory usage. Therefore, in actual scenarios, the size of the channel's buffer needs to be considered.

Set the channel size. When the production data is greater than the channel capacity, the producer will block. These issues need to be considered in actual scenarios.

One solution is to use a fixed array or slice as a ring buffer instead of a channel, synchronize through the Sync package mechanism to implement the producer consumer model, so as to avoid consumer blockage due to channel fullness.

However, for ring buffers, old data may be overwritten, and specific usage scenarios need to be considered. Regarding the principles and implementation of ring buffers, further analysis will be made when analyzing the use of Sync packages.

The above is personal experience. I hope you can give you a reference and I hope you can support me more. If there are any mistakes or no complete considerations, I would like to give you advice.