need
When an interface is called, a list of ten elements is received, and ten tasks need to be executed concurrently. Each task must return the execution result and exception, and then the returned result is loaded into a slice list and the result is returned uniformly.
Structures that require coroutine processing
type Order struct { Name string `json:"name"` Id int `json:"id"` }
Determine the number of channels
Generally, the number of elements that need to be processed in the parameter shall prevail
taskNum := 10
Initialize the channel
orderCh := make(chan Order, taskNum) //Receive the returned resultserrCh := make(chan error, taskNum) //Receive returned exception
Initiate execution
We use it to monitor execution
wg := {} for i:=0; i < taskNum; i++ { (1) go func() { defer () if i == 3 {//Simulate when i=3, return an exception err := ("there is an error") errCh <- err return } //Result of assembly res := Order{ Name: "num: " + (i), Id: i, } orderCh <- res }() } () //Waiting for all tasks to be completed
Use for-select to receive execution results
orderList := make([]Order, taskNum) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //Receive orderCh if ok { orderList = append(orderList, order) } case err := <-errCh: //Receive errCh if err != nil { return err //Depending on the requirements, if you find an error in the design, you will stop executing it directly and return the error } default: ("done") } } //After processing the data, close the channelclose(orderCh) close(errCh)
Timeout issue
During the task execution process, it is necessary to control the execution time of each task, which cannot exceed a certain range. We use a timer to solve this problem.
timeoutTime := * 3 //Timeout timetaskTimer := (timeoutTime) //Initialize the timerorderList := make([]Order, taskNum) for i:=0; i<taskNum; i++ { select { .... case <-: //Processing timeout err := ("task timeout") // Here we think that timeout is an error, and the assignment is to err return ... } //Every timer needs to be reset (timeoutTime) }
Coroutine panic problem
The main program cannot capture panics in the coroutine, so if it is not processed manually, panics in the coroutine will cause the entire program to abort. We handle it in the defer
for i:=0; i < taskNum; i++ { (1) go func() { defer func () { () //Catch exceptions separately in coroutine if r := recover(); r != nil { err := (("System panic:%v", r)) errCh <- err // Here, the panic information is converted to err and can also be processed according to requirements and exception level return } }() ........ }() }
Sequence question
The order of returned list elements needs to be consistent with the order of the arguments. At this time, we need to define a structure with sequence number.
// When you need to record the original order, define a numbered structuretype OrderWithSeq struct { Seq int OrderItem Order } //Rewrite the relevant sorting typetype BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq } // Adjustment returns resultorderCh := make(chan OrderWithSeq, taskNum) //Receive structure with serial number//Add a sequence number when executing a taskfor i:=0; i < taskNum; i++ { i:= i (1) go func() { ···· //Result of assembly res := Order{ Name: "num: " + (i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //Bring the serial number i with it OrderItem: res, } }() //Receive information and assemble it according to the structure with serial number orderSeqList := make([]OrderWithSeq, taskNum) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //Receive orderCh if ok { orderList = append(orderSeqList, order) } ..... } } //Sort in original order(BySeq(orderSeqList)) ....Reassemble data return
Summarize
The standard template is as follows:
type Order struct { Name string `json:"name"` Id int `json:"id"` } // When you need to record the original order, define a numbered structuretype OrderWithSeq struct { Seq int OrderItem Order } //Rewrite the relevant sorting typetype BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq } taskNum := 10 orderCh := make(chan OrderWithSeq, taskNum) //Receive structure with serial numbererrCh := make(chan error, taskNum) //Receive returned exceptionwg := {} //Add a sequence number when executing a taskfor i:=0; i < taskNum; i++ { i:= i (1) go func() { defer func () { () //Catch exceptions separately in coroutine if r := recover(); r != nil { err := (("System panic:%v", r)) errCh <- err // Here, the panic information is converted to err and can also be processed according to requirements and exception level return } }() //Result of assembly res := Order{ Name: "num: " + (i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //Bring the serial number i with it OrderItem: res, } }() () //Receive information and assemble it according to the structure with serial number orderSeqList := make([]OrderWithSeq, taskNum) timeoutTime := * 3 taskTimer := (timeoutTime) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //Receive orderCh if ok { orderList = append(orderSeqList, order) } case err := <-errCh: //Receive errCh if err != nil { return err } case <-: //Processing timeout err := ("task timeout") return default: ("done") } (timeoutTime) } close(orderCh) close(errCh) //Sort in original order(BySeq(orderSeqList))
This is the article shared by this article about the general template for golang batch execution tasks. For more related golang batch execution content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!