Preface
Synchronously suitable for multiple consecutive executions. The execution of each step depends on the previous operation. Asynchronous execution has nothing to do with the order of task execution (such as crawling data from 10 sites)
Synchronous execution class RunnerAsync
Supports return timeout detection and system interrupt detection
Error constant definition
//Timeout errorvar ErrTimeout = ("received timeout") //Operating system interrupt errorvar ErrInterrupt = ("received interrupt")
The implementation code is as follows
package task import ( "os" "time" "os/signal" "sync" ) //Execute tasks asynchronouslytype Runner struct { //Operating system signal detection interrupt chan //Record the status of execution completion complete chan error //Timeout detection timeout <-chan //Save all tasks to be executed, and execute them in sequence tasks []func(id int) error waitGroup lock errs []error } //new a Runner objectfunc NewRunner(d ) *Runner { return &Runner{ interrupt: make(chan , 1), complete: make(chan error), timeout: (d), waitGroup: {}, lock: {}, } } //Add a taskfunc (this *Runner) Add(tasks ...func(id int) error) { = append(, tasks...) } //Start Runner and listen for error messagesfunc (this *Runner) Start() error { //Receive operating system signals (, ) //Concurrent execution of tasks go func() { <- () }() select { //Return the execution result case err := <-: return err //Return after timeout case <-: return ErrTimeout } } //Execute all tasks asynchronouslyfunc (this *Runner) Run() error { for id, task := range { if () { return ErrInterrupt } (1) go func(id int) { () //Execute tasks err := task(id) //Save lock in result set = append(, err) () () }(id) } () return nil } //Discern whether the operating system interrupt signal has been receivedfunc (this *Runner) gotInterrupt() bool { select { case <-: //Stop receiving other signals () return true //Operation default: return false } } //Get the executed errorfunc (this *Runner) GetErrs() []error { return }
How to use
Add a task, which receives a closure of type int
Start starts execution injury, returns an error type, nil is the execution is completed, ErrTimeout represents the execution timeout, ErrInterrupt represents the execution is interrupted (similar to Ctrl + C operation)
Test sample code
package task import ( "testing" "time" "fmt" "os" "runtime" ) func TestRunnerAsync_Start(t *) { //Open multi-core (()) //Create runner object and set the timeout time runner := NewRunnerAsync(8 * ) //Add running tasks ( createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), ) ("Synchronous execution of tasks") //Start the task if err := (); err != nil { switch err { case ErrTimeout: ("Execution timeout") (1) case ErrInterrupt: ("Task interrupted") (2) } } ("Execution ends") } //Create the task to be executedfunc createTaskAsync() func(id int) { return func(id int) { ("Executing %v tasks\n", id) //Simulate task execution, sleep for two seconds //(1 * ) } }
Execution results
Execute tasks synchronously Execution is underway0A task Execution is underway1A task Execution is underway2A task Execution is underway3A task Execution is underway4A task Execution is underway5A task Execution is underway6A task Execution is underway7A task Execution is underway8A task Execution is underway9A task Execution is underway10A task Execution is underway11A task Execution is underway12A task runnerAsync_test.go:49: End of execution
Asynchronous execution of Runner
Supports return timeout detection and system interrupt detection
The implementation code is as follows
package task import ( "os" "time" "os/signal" "sync" ) //Execute tasks asynchronouslytype Runner struct { //Operating system signal detection interrupt chan //Record the status of execution completion complete chan error //Timeout detection timeout <-chan //Save all tasks to be executed, and execute them in sequence tasks []func(id int) error waitGroup lock errs []error } //new a Runner objectfunc NewRunner(d ) *Runner { return &Runner{ interrupt: make(chan , 1), complete: make(chan error), timeout: (d), waitGroup: {}, lock: {}, } } //Add a taskfunc (this *Runner) Add(tasks ...func(id int) error) { = append(, tasks...) } //Start Runner and listen for error messagesfunc (this *Runner) Start() error { //Receive operating system signals (, ) //Concurrent execution of tasks go func() { <- () }() select { //Return the execution result case err := <-: return err //Return after timeout case <-: return ErrTimeout } } //Execute all tasks asynchronouslyfunc (this *Runner) Run() error { for id, task := range { if () { return ErrInterrupt } (1) go func(id int) { () //Execute tasks err := task(id) //Save lock in result set = append(, err) () () }(id) } () return nil } //Discern whether the operating system interrupt signal has been receivedfunc (this *Runner) gotInterrupt() bool { select { case <-: //Stop receiving other signals () return true //Operation default: return false } } //Get the executed errorfunc (this *Runner) GetErrs() []error { return }
How to use
Add a task, the task receives an int type, and returns a closure of type error
Start starts execution injury, returns an error type, nil is the execution is completed, ErrTimeout represents the execution timeout, ErrInterrupt represents the execution is interrupted (similar to Ctrl + C operation)
getErrs get all task execution results
Test sample code
package task import ( "testing" "time" "fmt" "os" "runtime" ) func TestRunner_Start(t *) { //Open multi-core (()) //Create runner object and set the timeout time runner := NewRunner(18 * ) //Add running tasks ( createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), ) ("Execute tasks asynchronously") //Start the task if err := (); err != nil { switch err { case ErrTimeout: ("Execution timeout") (1) case ErrInterrupt: ("Task interrupted") (2) } } ("Execution ends") (()) } //Create the task to be executedfunc createTask() func(id int) error { return func(id int) error { ("Executing %v tasks\n", id) //Simulate task execution, sleep //(1 * ) return nil } }
Execution results
Execute tasks asynchronously Execution is underway2A task Execution is underway1A task Execution is underway4A task Execution is underway3A task Execution is underway6A task Execution is underway5A task Execution is underway9A task Execution is underway7A task Execution is underway10A task Execution is underway13A task Execution is underway8A task Execution is underway11A task Execution is underway12A task Execution is underway0A task runner_test.go:49: End of execution runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]
Summarize
The above is the entire content of this article. I hope that the content of this article has certain reference value for everyone's study or work. If you have any questions, you can leave a message to communicate. Thank you for your support.