SoFunction
Updated on 2025-03-05

Detailed explanation of the synchronous and asynchronous execution of multiple tasks (Runner and RunnerAsync)

Preface

Synchronously suitable for multiple consecutive executions. The execution of each step depends on the previous operation. Asynchronous execution has nothing to do with the order of task execution (such as crawling data from 10 sites)

Synchronous execution class RunnerAsync

Supports return timeout detection and system interrupt detection

Error constant definition

//Timeout errorvar ErrTimeout = ("received timeout")
//Operating system interrupt errorvar ErrInterrupt = ("received interrupt")

The implementation code is as follows

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
//Execute tasks asynchronouslytype Runner struct {
 //Operating system signal detection interrupt chan 
 //Record the status of execution completion complete chan error
 //Timeout detection timeout <-chan 
 //Save all tasks to be executed, and execute them in sequence tasks []func(id int) error
 waitGroup 
 lock 
 errs []error
}
 
//new a Runner objectfunc NewRunner(d ) *Runner {
 return &Runner{
 interrupt: make(chan , 1),
 complete: make(chan error),
 timeout: (d),
 waitGroup: {},
 lock: {},
 }
}
 
//Add a taskfunc (this *Runner) Add(tasks ...func(id int) error) {
  = append(, tasks...)
}
 
//Start Runner and listen for error messagesfunc (this *Runner) Start() error {
 //Receive operating system signals (, )
 //Concurrent execution of tasks go func() {
  <- ()
 }()
 select {
 //Return the execution result case err := <-:
 return err
 //Return after timeout case <-:
 return ErrTimeout
 }
}
 
//Execute all tasks asynchronouslyfunc (this *Runner) Run() error {
 for id, task := range  {
 if () {
  return ErrInterrupt
 }
 (1)
 go func(id int) {
  ()
  //Execute tasks  err := task(id)
  //Save lock in result set   = append(, err)
 
  ()
  ()
 }(id)
 }
 ()
 
 return nil
}
 
//Discern whether the operating system interrupt signal has been receivedfunc (this *Runner) gotInterrupt() bool {
 select {
 case <-:
 //Stop receiving other signals ()
 return true
 //Operation default:
 return false
 }
}
 
//Get the executed errorfunc (this *Runner) GetErrs() []error {
 return 
}

How to use 

Add a task, which receives a closure of type int

Start starts execution injury, returns an error type, nil is the execution is completed, ErrTimeout represents the execution timeout, ErrInterrupt represents the execution is interrupted (similar to Ctrl + C operation)

Test sample code

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunnerAsync_Start(t *) {
 //Open multi-core (())
 //Create runner object and set the timeout time runner := NewRunnerAsync(8 * )
 //Add running tasks (
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 )
 ("Synchronous execution of tasks")
 //Start the task if err := (); err != nil {
 switch err {
 case ErrTimeout:
  ("Execution timeout")
  (1)
 case ErrInterrupt:
  ("Task interrupted")
  (2)
 }
 }
 ("Execution ends")
}
 
//Create the task to be executedfunc createTaskAsync() func(id int) {
 return func(id int) {
 ("Executing %v tasks\n", id)
 //Simulate task execution, sleep for two seconds //(1 * )
 }
}

Execution results

Execute tasks synchronously
Execution is underway0A task
Execution is underway1A task
Execution is underway2A task
Execution is underway3A task
Execution is underway4A task
Execution is underway5A task
Execution is underway6A task
Execution is underway7A task
Execution is underway8A task
Execution is underway9A task
Execution is underway10A task
Execution is underway11A task
Execution is underway12A task
 runnerAsync_test.go:49: End of execution

Asynchronous execution of Runner

Supports return timeout detection and system interrupt detection

The implementation code is as follows

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
//Execute tasks asynchronouslytype Runner struct {
 //Operating system signal detection interrupt chan 
 //Record the status of execution completion complete chan error
 //Timeout detection timeout <-chan 
 //Save all tasks to be executed, and execute them in sequence tasks []func(id int) error
 waitGroup 
 lock 
 errs []error
}
 
//new a Runner objectfunc NewRunner(d ) *Runner {
 return &Runner{
  interrupt: make(chan , 1),
  complete: make(chan error),
  timeout: (d),
  waitGroup: {},
  lock:  {},
 }
}
 
//Add a taskfunc (this *Runner) Add(tasks ...func(id int) error) {
  = append(, tasks...)
}
 
//Start Runner and listen for error messagesfunc (this *Runner) Start() error {
 //Receive operating system signals (, )
 //Concurrent execution of tasks go func() {
   <- ()
 }()
 select {
 //Return the execution result case err := <-:
  return err
  //Return after timeout case <-:
  return ErrTimeout
 }
}
 
//Execute all tasks asynchronouslyfunc (this *Runner) Run() error {
 for id, task := range  {
  if () {
   return ErrInterrupt
  }
  (1)
  go func(id int) {
   ()
   //Execute tasks   err := task(id)
   //Save lock in result set    = append(, err)
   ()
   ()
  }(id)
 }
 ()
 return nil
}
 
//Discern whether the operating system interrupt signal has been receivedfunc (this *Runner) gotInterrupt() bool {
 select {
 case <-:
  //Stop receiving other signals  ()
  return true
  //Operation default:
  return false
 }
}
 
//Get the executed errorfunc (this *Runner) GetErrs() []error {
 return 
}

How to use 

Add a task, the task receives an int type, and returns a closure of type error

Start starts execution injury, returns an error type, nil is the execution is completed, ErrTimeout represents the execution timeout, ErrInterrupt represents the execution is interrupted (similar to Ctrl + C operation)

getErrs get all task execution results

Test sample code

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunner_Start(t *) {
 //Open multi-core (())
 //Create runner object and set the timeout time runner := NewRunner(18 * )
 //Add running tasks (
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
 )
 ("Execute tasks asynchronously")
 //Start the task if err := (); err != nil {
  switch err {
  case ErrTimeout:
   ("Execution timeout")
   (1)
  case ErrInterrupt:
   ("Task interrupted")
   (2)
  }
 }
 ("Execution ends")
 (())
}
 
//Create the task to be executedfunc createTask() func(id int) error {
 return func(id int) error {
  ("Executing %v tasks\n", id)
  //Simulate task execution, sleep  //(1 * )
  return nil
 }
}

Execution results

Execute tasks asynchronously
Execution is underway2A task
Execution is underway1A task
Execution is underway4A task
Execution is underway3A task
Execution is underway6A task
Execution is underway5A task
Execution is underway9A task
Execution is underway7A task
Execution is underway10A task
Execution is underway13A task
Execution is underway8A task
Execution is underway11A task
Execution is underway12A task
Execution is underway0A task
 runner_test.go:49: End of execution
 runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

Summarize

The above is the entire content of this article. I hope that the content of this article has certain reference value for everyone's study or work. If you have any questions, you can leave a message to communicate. Thank you for your support.