SoFunction
Updated on 2025-03-04

New Go1.18 feature uses Generics generics for streaming

Preface

Stream is a streaming library based on Go 1.18+ generics. It supports parallel processing of data in streams. Parallel streams divide elements into multiple partitions evenly, create the same number of goroutine executions, and ensure that elements in the stream remain in the original order after processing is completed.

GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)

Install

Go version 1.18+ needs to be installed

$ go get /xyctruth/stream

Import it in the code

import "/xyctruth/stream"

Base

s := ([]string{"d", "a", "b", "c", "a"}).
    Filter(func(s string) bool { return s != "b" }).
    Map(func(s string) string { return "class_" + s }).
    Sort().
    Distinct().
    ToSlice()
// The type of slice element needs to be converteds := [int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + (v) }).
    Reduce(func(r string, v string) string { return r + v })

Type constraints

anyAccept any type of element, so it cannot be used== != > <Comparing elements causes you to not use functions such as Sort(), Find(), etc., but you can use SortFunc(fn), FindFunc(fn)... instead

type SliceStream[E any] struct {
    slice      []E
}
([]int{1, 2, 3, 7, 1})

comparableThe type of reception can be used== !=Compare elements, but still cannot be used> <Compare elements, so you can't use Sort(), Min()... etc functions, but you can use SortFunc(fn), MinFunc()... instead

type SliceComparableStream[E comparable] struct {
    SliceStream[E]
}
([]int{1, 2, 3, 7, 1})

The type of reception can be used== != > <, so all functions can be used

type SliceOrderedStream[E ] struct {
    SliceComparableStream[E]
}
([]int{1, 2, 3, 7, 1})

Type conversion

Sometimes we need to useMap ,ReduceConverts the type of slice elements, but unfortunately, Golang does not currently support structure methods with additional type parameters, and all type parameters must be declared in the structure. We temporarily use temporary solutions to resolve this issue before Golang support.

// SliceMappingStream  Need to convert the type of slice elements.
// - E elements type
// - MapE map elements type
// - ReduceE reduce elements type
type SliceMappingStream[E any, MapE any, ReduceE any] struct {
    SliceStream[E]
}
s := [int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + (v) }).
    Reduce(func(r string, v string) string { return r + v })

parallel

ParallelThe function receives agoroutines intParameters. If goroutines>1 is turned on, otherwise parallel is turned off, and the default stream is turned off.

Parallel will divide elements in the stream into multiple partitions evenly, and create the same number of goroutine executions, and ensure that the elements in the stream remain in the original order after processing is completed.

s := ([]string{"d", "a", "b", "c", "a"}).
    Parallel(10).
    Filter(func(s string) bool {
    // Some time-consuming operations    return s != "b"
    }).
    Map(func(s string) string {
    // Some time-consuming operations    return "class_" + s
    }).
    ForEach(
    func(index int, s string) {
    // Some time-consuming operations    },
    ).ToSlice()

Parallel Type

  • First: Once the first return value is obtained, parallel processing ends. For: AllMatch, AnyMatch, FindFunc
  • ALL: All elements need to be processed in parallel, get all return values, and then end in parallel. For: Map, Filter
  • Action: All elements need to be processed in parallel and no return value is required. For: ForEach, Action

Parallel goroutines

The number of parallel goroutines has different choices when it comes to CPU operations and IO operations.

Generally, when facing CPU operations, the number of goroutines does not need to be set larger than the number of CPU cores, while when IO operations, the number of goroutines can be set much larger than the number of CPU cores.

CPU operation

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    (newArray(1000)) // Simulate CPU time-consuming operation})

Benchmarking using 6 CPU cores

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByCPU
goarch: amd64
pkg: /xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_parallel(0)-6         	     717	   9183119 ns/op
BenchmarkParallelByCPU/goroutines(2)-6          	    1396	   4303113 ns/op
BenchmarkParallelByCPU/goroutines(4)-6          	    2539	   2388197 ns/op
BenchmarkParallelByCPU/goroutines(6)-6          	    2932	   2159407 ns/op
BenchmarkParallelByCPU/goroutines(8)-6          	    2334	   2577405 ns/op
BenchmarkParallelByCPU/goroutines(10)-6         	    2649	   2352926 ns/op

IO Operation

NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
    () // Simulate IO time-consuming operation})

Benchmarking using 6 CPU cores

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByIO
goos: darwin
goarch: amd64
pkg: /xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6          	      52	 102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6           	     100	  55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6           	     214	  27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6           	     315	  18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6           	     411	  14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6          	     537	  11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6          	    2629	   2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6         	    5094	   1221887 ns/op

Project address/xyctruth/stream

The above is the detailed content of the new Go1.18 feature using Generics generics for streaming. For more information about Go1.18 Generics generic streaming, please follow my other related articles!