Preface
Stream is a streaming library based on Go 1.18+ generics. It supports parallel processing of data in streams. Parallel streams divide elements into multiple partitions evenly, create the same number of goroutine executions, and ensure that elements in the stream remain in the original order after processing is completed.
GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)
Install
Go version 1.18+ needs to be installed
$ go get /xyctruth/stream
Import it in the code
import "/xyctruth/stream"
Base
s := ([]string{"d", "a", "b", "c", "a"}). Filter(func(s string) bool { return s != "b" }). Map(func(s string) string { return "class_" + s }). Sort(). Distinct(). ToSlice() // The type of slice element needs to be converteds := [int, string, string]([]int{1, 2, 3, 4, 5}). Filter(func(v int) bool { return v >3 }). Map(func(v int) string { return "mapping_" + (v) }). Reduce(func(r string, v string) string { return r + v })
Type constraints
any
Accept any type of element, so it cannot be used==
!=
>
<
Comparing elements causes you to not use functions such as Sort(), Find(), etc., but you can use SortFunc(fn), FindFunc(fn)... instead
type SliceStream[E any] struct { slice []E } ([]int{1, 2, 3, 7, 1})
comparable
The type of reception can be used==
!=
Compare elements, but still cannot be used>
<
Compare elements, so you can't use Sort(), Min()... etc functions, but you can use SortFunc(fn), MinFunc()... instead
type SliceComparableStream[E comparable] struct { SliceStream[E] } ([]int{1, 2, 3, 7, 1})
The type of reception can be used
==
!=
>
<
, so all functions can be used
type SliceOrderedStream[E ] struct { SliceComparableStream[E] } ([]int{1, 2, 3, 7, 1})
Type conversion
Sometimes we need to useMap
,Reduce
Converts the type of slice elements, but unfortunately, Golang does not currently support structure methods with additional type parameters, and all type parameters must be declared in the structure. We temporarily use temporary solutions to resolve this issue before Golang support.
// SliceMappingStream Need to convert the type of slice elements. // - E elements type // - MapE map elements type // - ReduceE reduce elements type type SliceMappingStream[E any, MapE any, ReduceE any] struct { SliceStream[E] } s := [int, string, string]([]int{1, 2, 3, 4, 5}). Filter(func(v int) bool { return v >3 }). Map(func(v int) string { return "mapping_" + (v) }). Reduce(func(r string, v string) string { return r + v })
parallel
Parallel
The function receives agoroutines int
Parameters. If goroutines>1 is turned on, otherwise parallel is turned off, and the default stream is turned off.
Parallel will divide elements in the stream into multiple partitions evenly, and create the same number of goroutine executions, and ensure that the elements in the stream remain in the original order after processing is completed.
s := ([]string{"d", "a", "b", "c", "a"}). Parallel(10). Filter(func(s string) bool { // Some time-consuming operations return s != "b" }). Map(func(s string) string { // Some time-consuming operations return "class_" + s }). ForEach( func(index int, s string) { // Some time-consuming operations }, ).ToSlice()
Parallel Type
-
First
: Once the first return value is obtained, parallel processing ends. For: AllMatch, AnyMatch, FindFunc -
ALL
: All elements need to be processed in parallel, get all return values, and then end in parallel. For: Map, Filter -
Action
: All elements need to be processed in parallel and no return value is required. For: ForEach, Action
Parallel goroutines
The number of parallel goroutines has different choices when it comes to CPU operations and IO operations.
Generally, when facing CPU operations, the number of goroutines does not need to be set larger than the number of CPU cores, while when IO operations, the number of goroutines can be set much larger than the number of CPU cores.
CPU operation
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) { (newArray(1000)) // Simulate CPU time-consuming operation})
Benchmarking using 6 CPU cores
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByCPU goarch: amd64 pkg: /xyctruth/stream cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkParallelByCPU/no_parallel(0)-6 717 9183119 ns/op BenchmarkParallelByCPU/goroutines(2)-6 1396 4303113 ns/op BenchmarkParallelByCPU/goroutines(4)-6 2539 2388197 ns/op BenchmarkParallelByCPU/goroutines(6)-6 2932 2159407 ns/op BenchmarkParallelByCPU/goroutines(8)-6 2334 2577405 ns/op BenchmarkParallelByCPU/goroutines(10)-6 2649 2352926 ns/op
IO Operation
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) { () // Simulate IO time-consuming operation})
Benchmarking using 6 CPU cores
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByIO goos: darwin goarch: amd64 pkg: /xyctruth/stream cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz BenchmarkParallelByIO/no_parallel(0)-6 52 102023558 ns/op BenchmarkParallelByIO/goroutines(2)-6 100 55807303 ns/op BenchmarkParallelByIO/goroutines(4)-6 214 27868725 ns/op BenchmarkParallelByIO/goroutines(6)-6 315 18925789 ns/op BenchmarkParallelByIO/goroutines(8)-6 411 14439700 ns/op BenchmarkParallelByIO/goroutines(10)-6 537 11164758 ns/op BenchmarkParallelByIO/goroutines(50)-6 2629 2310602 ns/op BenchmarkParallelByIO/goroutines(100)-6 5094 1221887 ns/op
Project address/xyctruth/stream
The above is the detailed content of the new Go1.18 feature using Generics generics for streaming. For more information about Go1.18 Generics generic streaming, please follow my other related articles!