SoFunction
Updated on 2025-03-03

Detailed explanation of Golang's method of using channel to implement data summary

1. Introduction

This article introduces the problem of data aggregation in concurrent programming, and discusses the methods of using mutex and channel to ensure data security in a concurrent environment.

First, through an instance, a case of concurrent pulling data and summarizing is described, and a mutex lock is used to ensure thread safety. Then, some of the disadvantages of mutex locks are discussed, channels are introduced as an alternative, and basic usage and characteristics of channels are introduced. Next, the example shows how to use channels to implement data summary under concurrent.

Finally, an example of using channels to implement data summary under coroutine concurrency in etcd is cited to show the application of channels in actual projects.

2. Problem introduction

During the request processing process, it is often necessary to pull data through the RPC interface. Sometimes, due to the large amount of data, a single data pull operation may result in a longer processing time for the entire request. To speed up processing, we usually consider enabling multiple coroutines to pull data concurrently. Once multiple coroutines pull data concurrently, the main coroutine needs to summarize the data pulled by these coroutines and then return the result. In this process, concurrent access to shared resources is often involved. In order to ensure thread safety, mutex locks are usually used. The following is a simple code to show the process:

package main

import (
        "fmt"
        "sync"
        "time"
)

type Data struct {
        ID   int
        Name string
}

var (
        // Summary of results        dataList []Data
        // Mutex lock        mutex    
)

func fetchData(page int, wg *) {
        // Simulate the time-consuming operation of data pulling by RPC interface        ()

        // Suppose a batch of data is obtained from the RPC interface        data := Data{
                ID:   page,
                Name: ("Data %d", page),
        }

        // Use mutex to protect concurrent access to shared data        ()
        defer ()
        dataList = append(dataList, data)

        ()
}

func main() {
        var wg 

        // Define the number of data pages to be pulled        numPages := 10

        // Start multiple coroutines to pull data concurrently        for i := 1; i <= numPages; i++ {
            (1)
            go fetchData(i, &wg)
        }

        // Wait for all coroutines to complete        ()

        // Print the pulled data        ("Fetched data:")
        for _, data := range dataList {
            ("ID: %d, Name: %s\n", , )
        }
}

In the above example, we define a shareddataListSlices are used to save the pulled data. Each goroutine is called byfetchDataFunctions to simulate the process of pulling data and use mutex locksmutexProtectdataListconcurrent access. Main coroutine useWait for all coroutines to complete the data pull task, and then print out the pulled data. By pulling data concurrently and using mutex locks to ensure thread safety, we can significantly improve the speed of data pulling and ensure data accuracy and consistency.

Looking back at the above implementation, it actually involves multiple coroutines operating the same data, which may lead to thread safety issues. Then here we ensure thread safety through mutex locks. Indeed, using mutex locks can ensure thread safety, but there are also some disadvantages, such as competition and blocking. When two coroutines compete for mutex locks at the same time, only one coroutine can obtain the lock, while other coroutines will be blocked, which may lead to performance bottlenecks. Of course, there is no big problem in this scenario. Secondly, the complexity of the code has increased. Using mutex locks requires careful design and management to ensure the correct acquisition and release of the lock. This increases the complexity and maintenance costs of the code, and if the locks are handled incorrectly in the code, it may be deadlocked and the program cannot continue execution.

Then we actually have a question. In the scenario where data summary is summarized in coroutine concurrency, there are other ways to ensure thread safety without using mutex locks? In fact, there is really,GoIn languagechannelVery suitable for this situation. By using channels, we can achieve thread-safe data sharing and synchronization without explicitly using mutex locks. Let's learn more about it belowchannel

3. Use of channel

3.1 Basic introduction to channel

3.1.1 Basic description

channelIn Go, it is a special data structure for communication and synchronization between coroutines. It is similar to a first-in-first-out (FIFO) queue for data transmission and sharing. In a concurrent environment, data can be sent to or received from the channel, and both operations are thread-safe.

usechannelThe advantage is that it provides a built-in synchronization mechanism that does not require explicit use of mutexes to handle concurrent access.

When a coroutine sends data to a channel, if the channel is full, the sending operation will be blocked until other coroutines receive data from the channel to free space. Similarly, when a coroutine receives data from a channel, if the channel is empty, the reception operation will be blocked until other coroutines send data to the channel.

At the same time, when multiple coroutines access the channel at the same time, the Go runtime system will automatically handle the details of synchronization and concurrent access between coroutines to ensure the correctness and consistency of data. This allows you to use channels in multiple coroutines to send and receive data without the need for additional locking or synchronization mechanisms to ensure thread safety.

Therefore, usechannelIn fact, it can avoid common concurrency problems, such as race conditions and deadlocks, simplifying the complexity of concurrent programming.

3.1.2 Basic use

By the abovechannelWe have already made the basic introductionchannelWith a basic understanding, we can actually roughly understand it as a concurrent and secure queue. Let's learn more about it belowchannelThe basic syntax ofchannel

The basic channel operation is divided into creationchannel, send data tochannel,take overchanneldata in, and closechannel. Here is a brief display:

Create a channel, use make function to create a channel, the type of the channel can be selected according to your needs, for exampleintstringwait:

ch := make(chan int)

Send data to channel: Use the <- operator to send data to the channel

ch <- data

Receive data from channel: Use the <- operator to receive data from the channel

result := <-ch

Close channel and close the channel using the close function. After closing the channel, you can still receive data from the channel, but you can no longer send data to the channel.

close(ch)

Through the four basic operations of the channel above, it is possible to pass data safely between different coroutines. Finally, through an example, we can fully demonstrate the basic use of channel.

package main

import "fmt"

func main() {
        ch := make(chan string) // Create string channel        defer close(ch)
        go func() {
                ch &lt;- "hello, channel!" // Send data to channel        }()

        result := &lt;-ch // Receive data from the channel        (result)
}

In this example, we create a string channelch. Then, in a separate coroutine, we send the string "hello, channel!" to the channel. Finally, the master coroutine receives data from the channel and prints it out.

By using channels, we can realize data transmission and synchronization between coroutines to ensure safe sharing of data and thread safety. The use of channels can simplify the complexity of concurrent programming and provide an efficient and reliable way to handle data transfer in concurrent scenarios.

3.2 Using channel to implement summary data

Below, we usechannelTo implement concurrent data aggregation, replace the previous implementation of using mutex locks to ensure thread safety:

package main

import (
        "fmt"
        "sync"
        "time"
)

type Data struct {
        ID   int
        Name string
}

func fetchData(page int, ch chan Data, wg *) {
        // Simulate the time-consuming operation of data pulling on the RPC interface        ()

        // Suppose a batch of data is obtained from the RPC interface        data := Data{
                ID:   page,
                Name: ("Data %d", page),
        }

        ch &lt;- data // Send data to the channel
        ()
}

func main() {
        var wg 

        // Define the number of data pages to be pulled        numPages := 10

        dataCh := make(chan Data, 10) // Create a channel for receiving data
        // Start multiple coroutines to pull data concurrently        for i := 1; i &lt;= numPages; i++ {
                (1)
                go fetchData(i, dataCh, &amp;wg)
        }

        go func() {
                ()
                close(dataCh) // Close the channel, indicating that all data has been sent.        }()

        // Receive data from the channel and summarize        var dataList []Data
        for data := range dataCh {
            dataList = append(dataList, data)
        }

        // Print the pulled data        ("Fetched data:")
        for _, data := range dataList {
                ("ID: %d, Name: %s\n", , )
        }
}

In the modified code, we create adataCh. Each coroutine sends data tochannelto complete the data summary. Master coroutine passes slavechannelReceive data and add it todataListThe data summary process is implemented. This approach does not require explicit locking and unlocking, and avoids complexity and performance issues caused by mutexes.

By usingchannel, We are able to implement data transfer and synchronization between coroutines in a more intuitive and safer way.channelIt plays a key role in concurrent programming, simplifying the management and implementation of concurrent operations. At the same time, it provides a built-in synchronization mechanism to ensure the correctness and consistency of data and avoid the problems of deadlock and race conditions.

3.3 Summary

The summary data in concurrency between coroutines can be classified as data delivery between coroutines. In this scenario, multiple coroutines pull data concurrently and then aggregate the data into a shared data structure. In order to ensure the correctness and consistency of data, some mechanism is needed to ensure that concurrent access to shared data by multiple coroutines is secure.

In the original implementation, mutex locks were used to protect concurrent access to shared data. Mutexes provide a mechanism for mutually exclusive access, ensuring that only one coroutine can access shared data at the same time, thereby avoiding data competition and inconsistency. This method introduces the overhead and complexity of locks while ensuring thread safety.

and usechannelTo implement secure data transfer between coroutines can be more concise and efficient. Each coroutine can pass the pulled data throughchannelSend to the main coroutine, the main coroutine is received bychanneldata in it is summarized.channelIt provides a concurrent and secure data transfer mechanism, and the data transmission between coroutines is synchronous and orderly. becausechannelIt provides a synchronization mechanism itself, which does not require additional locking and synchronization operations, and can more concisely realize secure data transfer between coroutines.

Therefore, if data transfer needs to be implemented between multiple coroutines, and this may lead to thread safety problems, use it at this timechannelIt is relatively appropriate to implement it.

4. Use in open source projects

Suppose we need toetcdPerform performance testing, at this time, a large number of concurrent requests need to be simulated.etcdPerform load testing and collect results data such as execution time, success/failure status of each request. Then the main coroutine needs to collect the result data of each request, perform statistical calculations, and generate corresponding performance reports. Based on this, statistical information such as total number of requests, request success rate, average execution time, slowest/fastest requests, as well as error distribution and detailed information of slow requests can be calculated.

Judging from the above description, we can actually imagine this model, multiple coroutines are executed concurrently, and then obtain the result data of each request. The main coroutine then needs to collect and summarize this data to generate a performance report based on this. This model is actually the data summary under coroutine concurrency as we mentioned above, sochannelIt is very suitable to implement data transmission between coroutines.

Let's take a look belowetcdcorresponding implementation.etcdThere is one inreportThe implementation of the object can accept the results of a series of requested data, and then generate a performance report and return it back. The structure is defined as follows:

type report struct {
   results   chan Result
   stats Stats
}
func (r *report) Results() chan<- Result { return  }
// Result describes the timings for an operation.
type Result struct {
   Start  
   End    
   Err    error
}
func newReport(precision string) *report {
   r := &report{
      results:   make(chan Result, 16),
   }
   return r
}

ResultThe structure is the result of a single test, andreportThe structure is used for reporting and statistics of the entire test process. By usingresultschannel, which can send the results of each test toreportin structure, for statistics and reports to be generated.

When performing performance pressure measurement, first passnewReportGenerate areportObject, and then multiple coroutines are started to perform stress testing requests at the same time. After each request is processed, a processing result will be generated and stored inResultAmong the objects. Then basedreportThe object'sResultsMethod to obtain the correspondingchannel, transfer the processing results to the main coroutine.

The main coroutine passes through traversalreportIn the objectresultsThe corresponding variablechannel, summarize and calculate all processing results, and based on this, stress measurement results and reports can be generated. Let’s take a look at the specific process below.

First, create areportObject, then start multiple coroutines to process the request, sending the result toreportIn the objectresultsCorrespondingchannelmiddle.

// Here the NewReportSample method is actually an encapsulation of the above newReport methodr := NewReportSample("%f")
// Here it is assumed that there is only one coroutine, which simulates execution of a series of tests and sends the test results to the results channel of the Report object.go func() {
   start := ()
   for i := 0; i &lt; 5; i++ {
      // Make a request without realizing it, just simply obtain the execution result and transmit the test result      end := ()
      () &lt;- Result{Start: start, End: end}
      start = end
   }
   () &lt;- Result{Start: start, End: (), Err: ("oops")}
   // Assume that all stress test requests have been executed   close(())
}()
// The main coroutine summarizes all processing results and then generates a stress test reportstats := &lt;-()

In the above code,rIt is throughNewReportSample("%f")Created oneReportObject. Then, in a separate coroutine, a series of tests are executed and the test results are sent to()in the channel.

The purpose of this code is to simulate the execution of a series of tests and send the test results toReportThe object'sresultsin the channel. By using()The channel returned by the method can send the test results to the report object for statistics and processing.

Next, the main coroutine should continue to run()The method reads data from the channel returned by the method, summarizes all processing results, and generates a stress test report. This method is actually encapsulated()In the method, the specific details are as follows:

func (r *report) Stats() &lt;-chan Stats {
    // Create a channel   donec := make(chan Stats, 1)
   // Start a coroutine to execute   go func() {
      defer close(donec)
      ()
      s := ()
      if  != nil {
          = ()
      }
      // If the execution is completed, return the result      donec &lt;- s
   }()
   // Return to channel   return donec
}
// The actual running tasks in the coroutine started by the Stats methodfunc (r *report) processResults() {
   st := ()
   // traverse the data in the channel in the method and then execute the processing flow   for res := range  {
      (&amp;res)
   }
   // Execute some specific calculation logic later}

The above code isreportTwo methods in a structure, whereStats()Method returns a read-onlyStatsaisle. This method will be executed in a separate coroutine and processedresultsTest results in the channel. In fact, it's a summarychannelThe data in it will be processed for a certain amount and then returned.

5. Summary

This paper introduces the data aggregation problem in concurrent programming and proposes a method to ensure thread safety using mutex locks and channels. Mutex locks are suitable for critical zone protection and shared resources, but there may be deadlocks and performance bottlenecks. In contrast, channels provide a more intuitive and secure way of communication between coroutines, avoiding locking problems and providing a more flexible concurrency mode.

Based on the introduction of the above content, it can be roughly clear that in the scenario of data transmission and summary, usechannelIt may be more appropriate to implement it, which can improve the readability and concurrency security of the code. Hope the above content will be helpful to you.

The above is a detailed explanation of Golang's method of using channel to implement data summary. For more information about Golang's data summary, please pay attention to my other related articles!