diff --git a/core/mr/mapreduce.go b/core/mr/mapreduce.go index e5ab7041..edb32808 100644 --- a/core/mr/mapreduce.go +++ b/core/mr/mapreduce.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "github.com/zeromicro/go-zero/core/errorx" - "github.com/zeromicro/go-zero/core/lang" ) const ( @@ -24,30 +23,30 @@ var ( type ( // ForEachFunc is used to do element processing, but no output. - ForEachFunc func(item any) + ForEachFunc[T any] func(item T) // GenerateFunc is used to let callers send elements into source. - GenerateFunc func(source chan<- any) + GenerateFunc[T any] func(source chan<- T) // MapFunc is used to do element processing and write the output to writer. - MapFunc func(item any, writer Writer) + MapFunc[T, U any] func(item T, writer Writer[U]) // MapperFunc is used to do element processing and write the output to writer, // use cancel func to cancel the processing. - MapperFunc func(item any, writer Writer, cancel func(error)) + MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error)) // ReducerFunc is used to reduce all the mapping output and write to writer, // use cancel func to cancel the processing. - ReducerFunc func(pipe <-chan any, writer Writer, cancel func(error)) + ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error)) // VoidReducerFunc is used to reduce all the mapping output, but no output. // Use cancel func to cancel the processing. - VoidReducerFunc func(pipe <-chan any, cancel func(error)) + VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error)) // Option defines the method to customize the mapreduce. Option func(opts *mapReduceOptions) - mapperContext struct { + mapperContext[T, U any] struct { ctx context.Context - mapper MapFunc - source <-chan any + mapper MapFunc[T, U] + source <-chan T panicChan *onceChan - collector chan<- any - doneChan <-chan lang.PlaceholderType + collector chan<- U + doneChan <-chan struct{} workers int } @@ -57,8 +56,8 @@ type ( } // Writer interface wraps Write method. - Writer interface { - Write(v any) + Writer[T any] interface { + Write(v T) } ) @@ -68,12 +67,11 @@ func Finish(fns ...func() error) error { return nil } - return MapReduceVoid(func(source chan<- any) { + return MapReduceVoid(func(source chan<- func() error) { for _, fn := range fns { source <- fn } - }, func(item any, writer Writer, cancel func(error)) { - fn := item.(func() error) + }, func(fn func() error, writer Writer[any], cancel func(error)) { if err := fn(); err != nil { cancel(err) } @@ -87,27 +85,26 @@ func FinishVoid(fns ...func()) { return } - ForEach(func(source chan<- any) { + ForEach(func(source chan<- func()) { for _, fn := range fns { source <- fn } - }, func(item any) { - fn := item.(func()) + }, func(fn func()) { fn() }, WithWorkers(len(fns))) } // ForEach maps all elements from given generate but no output. -func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) { +func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option) { options := buildOptions(opts...) panicChan := &onceChan{channel: make(chan any)} source := buildSource(generate, panicChan) collector := make(chan any) - done := make(chan lang.PlaceholderType) + done := make(chan struct{}) - go executeMappers(mapperContext{ + go executeMappers(mapperContext[T, any]{ ctx: options.ctx, - mapper: func(item any, _ Writer) { + mapper: func(item T, _ Writer[any]) { mapper(item) }, source: source, @@ -131,26 +128,26 @@ func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) { // MapReduce maps all elements generated from given generate func, // and reduces the output elements with given reducer. -func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, - opts ...Option) (any, error) { +func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V], + opts ...Option) (V, error) { panicChan := &onceChan{channel: make(chan any)} source := buildSource(generate, panicChan) return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) } // MapReduceChan maps all elements from source, and reduce the output elements with given reducer. -func MapReduceChan(source <-chan any, mapper MapperFunc, reducer ReducerFunc, - opts ...Option) (any, error) { +func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V], + opts ...Option) (V, error) { panicChan := &onceChan{channel: make(chan any)} return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...) } // mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer. -func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper MapperFunc, - reducer ReducerFunc, opts ...Option) (any, error) { +func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U], + reducer ReducerFunc[U, V], opts ...Option) (val V, err error) { options := buildOptions(opts...) // output is used to write the final result - output := make(chan any) + output := make(chan V) defer func() { // reducer can only write once, if more, panic for range output { @@ -159,12 +156,12 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe }() // collector is used to collect data from mapper, and consume in reducer - collector := make(chan any, options.workers) + collector := make(chan U, options.workers) // if done is closed, all mappers and reducer should stop processing - done := make(chan lang.PlaceholderType) + done := make(chan struct{}) writer := newGuardedWriter(options.ctx, output, done) var closeOnce sync.Once - // use atomic.Value to avoid data race + // use atomic type to avoid data race var retErr errorx.AtomicError finish := func() { closeOnce.Do(func() { @@ -195,9 +192,9 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe reducer(collector, writer, cancel) }() - go executeMappers(mapperContext{ + go executeMappers(mapperContext[T, U]{ ctx: options.ctx, - mapper: func(item any, w Writer) { + mapper: func(item T, w Writer[U]) { mapper(item, w, cancel) }, source: source, @@ -210,26 +207,29 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe select { case <-options.ctx.Done(): cancel(context.DeadlineExceeded) - return nil, context.DeadlineExceeded + err = context.DeadlineExceeded case v := <-panicChan.channel: // drain output here, otherwise for loop panic in defer drain(output) panic(v) case v, ok := <-output: - if err := retErr.Load(); err != nil { - return nil, err + if e := retErr.Load(); e != nil { + err = e } else if ok { - return v, nil + val = v } else { - return nil, ErrReduceNoOutput + err = ErrReduceNoOutput } } + + return } // MapReduceVoid maps all elements generated from given generate, // and reduce the output elements with given reducer. -func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error { - _, err := MapReduce(generate, mapper, func(input <-chan any, writer Writer, cancel func(error)) { +func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U], + reducer VoidReducerFunc[U], opts ...Option) error { + _, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) { reducer(input, cancel) }, opts...) if errors.Is(err, ErrReduceNoOutput) { @@ -266,8 +266,8 @@ func buildOptions(opts ...Option) *mapReduceOptions { return options } -func buildSource(generate GenerateFunc, panicChan *onceChan) chan any { - source := make(chan any) +func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T { + source := make(chan T) go func() { defer func() { if r := recover(); r != nil { @@ -283,13 +283,13 @@ func buildSource(generate GenerateFunc, panicChan *onceChan) chan any { } // drain drains the channel. -func drain(channel <-chan any) { +func drain[T any](channel <-chan T) { // drain the channel for range channel { } } -func executeMappers(mCtx mapperContext) { +func executeMappers[T, U any](mCtx mapperContext[T, U]) { var wg sync.WaitGroup defer func() { wg.Wait() @@ -298,7 +298,7 @@ func executeMappers(mCtx mapperContext) { }() var failed int32 - pool := make(chan lang.PlaceholderType, mCtx.workers) + pool := make(chan struct{}, mCtx.workers) writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan) for atomic.LoadInt32(&failed) == 0 { select { @@ -306,7 +306,7 @@ func executeMappers(mCtx mapperContext) { return case <-mCtx.doneChan: return - case pool <- lang.Placeholder: + case pool <- struct{}{}: item, ok := <-mCtx.source if !ok { <-pool @@ -346,22 +346,21 @@ func once(fn func(error)) func(error) { } } -type guardedWriter struct { +type guardedWriter[T any] struct { ctx context.Context - channel chan<- any - done <-chan lang.PlaceholderType + channel chan<- T + done <-chan struct{} } -func newGuardedWriter(ctx context.Context, channel chan<- any, - done <-chan lang.PlaceholderType) guardedWriter { - return guardedWriter{ +func newGuardedWriter[T any](ctx context.Context, channel chan<- T, done <-chan struct{}) guardedWriter[T] { + return guardedWriter[T]{ ctx: ctx, channel: channel, done: done, } } -func (gw guardedWriter) Write(v any) { +func (gw guardedWriter[T]) Write(v T) { select { case <-gw.ctx.Done(): return diff --git a/core/mr/mapreduce_fuzz_test.go b/core/mr/mapreduce_fuzz_test.go index 15199808..936d3623 100644 --- a/core/mr/mapreduce_fuzz_test.go +++ b/core/mr/mapreduce_fuzz_test.go @@ -1,6 +1,3 @@ -//go:build go1.18 -// +build go1.18 - package mr import ( @@ -18,9 +15,9 @@ import ( func FuzzMapReduce(f *testing.F) { rand.Seed(time.Now().UnixNano()) - f.Add(uint(10), uint(runtime.NumCPU())) - f.Fuzz(func(t *testing.T, num, workers uint) { - n := int64(num)%5000 + 5000 + f.Add(int64(10), runtime.NumCPU()) + f.Fuzz(func(t *testing.T, n int64, workers int) { + n = n%5000 + 5000 genPanic := rand.Intn(100) == 0 mapperPanic := rand.Intn(100) == 0 reducerPanic := rand.Intn(100) == 0 @@ -29,34 +26,33 @@ func FuzzMapReduce(f *testing.F) { reducerIdx := rand.Int63n(n) squareSum := (n - 1) * n * (2*n - 1) / 6 - fn := func() (any, error) { + fn := func() (int64, error) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - return MapReduce(func(source chan<- any) { + return MapReduce(func(source chan<- int64) { for i := int64(0); i < n; i++ { source <- i if genPanic && i == genIdx { panic("foo") } } - }, func(item any, writer Writer, cancel func(error)) { - v := item.(int64) + }, func(v int64, writer Writer[int64], cancel func(error)) { if mapperPanic && v == mapperIdx { panic("bar") } writer.Write(v * v) - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) { var idx int64 var total int64 for v := range pipe { if reducerPanic && idx == reducerIdx { panic("baz") } - total += v.(int64) + total += v idx++ } writer.Write(total) - }, WithWorkers(int(workers)%50+runtime.NumCPU()/2)) + }, WithWorkers(workers%50+runtime.NumCPU())) } if genPanic || mapperPanic || reducerPanic { @@ -72,7 +68,7 @@ func FuzzMapReduce(f *testing.F) { } else { val, err := fn() assert.Nil(t, err) - assert.Equal(t, squareSum, val.(int64)) + assert.Equal(t, squareSum, val) } }) } diff --git a/core/mr/mapreduce_fuzzcase_test.go b/core/mr/mapreduce_fuzzcase_test.go index 39b3ab39..7ce2b58f 100644 --- a/core/mr/mapreduce_fuzzcase_test.go +++ b/core/mr/mapreduce_fuzzcase_test.go @@ -54,28 +54,27 @@ func TestMapReduceRandom(t *testing.T) { reducerIdx := rand.Int63n(n) squareSum := (n - 1) * n * (2*n - 1) / 6 - fn := func() (any, error) { - return MapReduce(func(source chan<- any) { + fn := func() (int64, error) { + return MapReduce(func(source chan<- int64) { for i := int64(0); i < n; i++ { source <- i if genPanic && i == genIdx { panic("foo") } } - }, func(item any, writer Writer, cancel func(error)) { - v := item.(int64) + }, func(v int64, writer Writer[int64], cancel func(error)) { if mapperPanic && v == mapperIdx { panic("bar") } writer.Write(v * v) - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) { var idx int64 var total int64 for v := range pipe { if reducerPanic && idx == reducerIdx { panic("baz") } - total += v.(int64) + total += v idx++ } writer.Write(total) @@ -95,7 +94,7 @@ func TestMapReduceRandom(t *testing.T) { } else { val, err := fn() assert.Nil(t, err) - assert.Equal(t, squareSum, val.(int64)) + assert.Equal(t, squareSum, val) } bar.Increment() }) diff --git a/core/mr/mapreduce_test.go b/core/mr/mapreduce_test.go index 79030fc7..00418cc8 100644 --- a/core/mr/mapreduce_test.go +++ b/core/mr/mapreduce_test.go @@ -3,7 +3,7 @@ package mr import ( "context" "errors" - "io" + "io/ioutil" "log" "runtime" "sync/atomic" @@ -17,7 +17,7 @@ import ( var errDummy = errors.New("dummy") func init() { - log.SetOutput(io.Discard) + log.SetOutput(ioutil.Discard) } func TestFinish(t *testing.T) { @@ -91,11 +91,11 @@ func TestForEach(t *testing.T) { defer goleak.VerifyNone(t) var count uint32 - ForEach(func(source chan<- any) { + ForEach(func(source chan<- int) { for i := 0; i < tasks; i++ { source <- i } - }, func(item any) { + }, func(item int) { atomic.AddUint32(&count, 1) }, WithWorkers(-1)) @@ -106,12 +106,12 @@ func TestForEach(t *testing.T) { defer goleak.VerifyNone(t) var count uint32 - ForEach(func(source chan<- any) { + ForEach(func(source chan<- int) { for i := 0; i < tasks; i++ { source <- i } - }, func(item any) { - if item.(int)%2 == 0 { + }, func(item int) { + if item%2 == 0 { atomic.AddUint32(&count, 1) } }) @@ -123,11 +123,11 @@ func TestForEach(t *testing.T) { defer goleak.VerifyNone(t) assert.PanicsWithValue(t, "foo", func() { - ForEach(func(source chan<- any) { + ForEach(func(source chan<- int) { for i := 0; i < tasks; i++ { source <- i } - }, func(item any) { + }, func(item int) { panic("foo") }) }) @@ -139,9 +139,9 @@ func TestGeneratePanic(t *testing.T) { t.Run("all", func(t *testing.T) { assert.PanicsWithValue(t, "foo", func() { - ForEach(func(source chan<- any) { + ForEach(func(source chan<- int) { panic("foo") - }, func(item any) { + }, func(item int) { }) }) }) @@ -154,14 +154,14 @@ func TestMapperPanic(t *testing.T) { var run int32 t.Run("all", func(t *testing.T) { assert.PanicsWithValue(t, "foo", func() { - _, _ = MapReduce(func(source chan<- any) { + _, _ = MapReduce(func(source chan<- int) { for i := 0; i < tasks; i++ { source <- i } - }, func(item any, writer Writer, cancel func(error)) { + }, func(item int, writer Writer[int], cancel func(error)) { atomic.AddInt32(&run, 1) panic("foo") - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int, writer Writer[int], cancel func(error)) { }) }) assert.True(t, atomic.LoadInt32(&run) < tasks/2) @@ -173,10 +173,10 @@ func TestMapReduce(t *testing.T) { tests := []struct { name string - mapper MapperFunc - reducer ReducerFunc + mapper MapperFunc[int, int] + reducer ReducerFunc[int, int] expectErr error - expectValue any + expectValue int }{ { name: "simple", @@ -185,8 +185,7 @@ func TestMapReduce(t *testing.T) { }, { name: "cancel with error", - mapper: func(item any, writer Writer, cancel func(error)) { - v := item.(int) + mapper: func(v int, writer Writer[int], cancel func(error)) { if v%3 == 0 { cancel(errDummy) } @@ -196,22 +195,20 @@ func TestMapReduce(t *testing.T) { }, { name: "cancel with nil", - mapper: func(item any, writer Writer, cancel func(error)) { - v := item.(int) + mapper: func(v int, writer Writer[int], cancel func(error)) { if v%3 == 0 { cancel(nil) } writer.Write(v * v) }, - expectErr: ErrCancelWithNil, - expectValue: nil, + expectErr: ErrCancelWithNil, }, { name: "cancel with more", - reducer: func(pipe <-chan any, writer Writer, cancel func(error)) { + reducer: func(pipe <-chan int, writer Writer[int], cancel func(error)) { var result int for item := range pipe { - result += item.(int) + result += item if result > 10 { cancel(errDummy) } @@ -226,21 +223,20 @@ func TestMapReduce(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.mapper == nil { - test.mapper = func(item any, writer Writer, cancel func(error)) { - v := item.(int) + test.mapper = func(v int, writer Writer[int], cancel func(error)) { writer.Write(v * v) } } if test.reducer == nil { - test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) { + test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) { var result int for item := range pipe { - result += item.(int) + result += item } writer.Write(result) } } - value, err := MapReduce(func(source chan<- any) { + value, err := MapReduce(func(source chan<- int) { for i := 1; i < 5; i++ { source <- i } @@ -256,22 +252,21 @@ func TestMapReduce(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { if test.mapper == nil { - test.mapper = func(item any, writer Writer, cancel func(error)) { - v := item.(int) + test.mapper = func(v int, writer Writer[int], cancel func(error)) { writer.Write(v * v) } } if test.reducer == nil { - test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) { + test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) { var result int for item := range pipe { - result += item.(int) + result += item } writer.Write(result) } } - source := make(chan any) + source := make(chan int) go func() { for i := 1; i < 5; i++ { source <- i @@ -291,13 +286,13 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) { defer goleak.VerifyNone(t) assert.Panics(t, func() { - MapReduce(func(source chan<- any) { + MapReduce(func(source chan<- int) { for i := 0; i < 10; i++ { source <- i } - }, func(item any, writer Writer, cancel func(error)) { + }, func(item int, writer Writer[int], cancel func(error)) { writer.Write(item) - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int, writer Writer[string], cancel func(error)) { drain(pipe) writer.Write("one") writer.Write("two") @@ -311,8 +306,8 @@ func TestMapReduceVoid(t *testing.T) { var value uint32 tests := []struct { name string - mapper MapperFunc - reducer VoidReducerFunc + mapper MapperFunc[int, int] + reducer VoidReducerFunc[int] expectValue uint32 expectErr error }{ @@ -323,8 +318,7 @@ func TestMapReduceVoid(t *testing.T) { }, { name: "cancel with error", - mapper: func(item any, writer Writer, cancel func(error)) { - v := item.(int) + mapper: func(v int, writer Writer[int], cancel func(error)) { if v%3 == 0 { cancel(errDummy) } @@ -334,8 +328,7 @@ func TestMapReduceVoid(t *testing.T) { }, { name: "cancel with nil", - mapper: func(item any, writer Writer, cancel func(error)) { - v := item.(int) + mapper: func(v int, writer Writer[int], cancel func(error)) { if v%3 == 0 { cancel(nil) } @@ -345,9 +338,9 @@ func TestMapReduceVoid(t *testing.T) { }, { name: "cancel with more", - reducer: func(pipe <-chan any, cancel func(error)) { + reducer: func(pipe <-chan int, cancel func(error)) { for item := range pipe { - result := atomic.AddUint32(&value, uint32(item.(int))) + result := atomic.AddUint32(&value, uint32(item)) if result > 10 { cancel(errDummy) } @@ -362,19 +355,18 @@ func TestMapReduceVoid(t *testing.T) { atomic.StoreUint32(&value, 0) if test.mapper == nil { - test.mapper = func(item any, writer Writer, cancel func(error)) { - v := item.(int) + test.mapper = func(v int, writer Writer[int], cancel func(error)) { writer.Write(v * v) } } if test.reducer == nil { - test.reducer = func(pipe <-chan any, cancel func(error)) { + test.reducer = func(pipe <-chan int, cancel func(error)) { for item := range pipe { - atomic.AddUint32(&value, uint32(item.(int))) + atomic.AddUint32(&value, uint32(item)) } } } - err := MapReduceVoid(func(source chan<- any) { + err := MapReduceVoid(func(source chan<- int) { for i := 1; i < 5; i++ { source <- i } @@ -392,18 +384,17 @@ func TestMapReduceVoidWithDelay(t *testing.T) { defer goleak.VerifyNone(t) var result []int - err := MapReduceVoid(func(source chan<- any) { + err := MapReduceVoid(func(source chan<- int) { source <- 0 source <- 1 - }, func(item any, writer Writer, cancel func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], cancel func(error)) { if i == 0 { time.Sleep(time.Millisecond * 50) } writer.Write(i) - }, func(pipe <-chan any, cancel func(error)) { + }, func(pipe <-chan int, cancel func(error)) { for item := range pipe { - i := item.(int) + i := item result = append(result, i) } }) @@ -417,13 +408,12 @@ func TestMapReducePanic(t *testing.T) { defer goleak.VerifyNone(t) assert.Panics(t, func() { - _, _ = MapReduce(func(source chan<- any) { + _, _ = MapReduce(func(source chan<- int) { source <- 0 source <- 1 - }, func(item any, writer Writer, cancel func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], cancel func(error)) { writer.Write(i) - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int, writer Writer[int], cancel func(error)) { for range pipe { panic("panic") } @@ -435,17 +425,16 @@ func TestMapReducePanicOnce(t *testing.T) { defer goleak.VerifyNone(t) assert.Panics(t, func() { - _, _ = MapReduce(func(source chan<- any) { + _, _ = MapReduce(func(source chan<- int) { for i := 0; i < 100; i++ { source <- i } - }, func(item any, writer Writer, cancel func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], cancel func(error)) { if i == 0 { panic("foo") } writer.Write(i) - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int, writer Writer[int], cancel func(error)) { for range pipe { panic("bar") } @@ -457,12 +446,12 @@ func TestMapReducePanicBothMapperAndReducer(t *testing.T) { defer goleak.VerifyNone(t) assert.Panics(t, func() { - _, _ = MapReduce(func(source chan<- any) { + _, _ = MapReduce(func(source chan<- int) { source <- 0 source <- 1 - }, func(item any, writer Writer, cancel func(error)) { + }, func(item int, writer Writer[int], cancel func(error)) { panic("foo") - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int, writer Writer[int], cancel func(error)) { panic("bar") }) }) @@ -472,18 +461,17 @@ func TestMapReduceVoidCancel(t *testing.T) { defer goleak.VerifyNone(t) var result []int - err := MapReduceVoid(func(source chan<- any) { + err := MapReduceVoid(func(source chan<- int) { source <- 0 source <- 1 - }, func(item any, writer Writer, cancel func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], cancel func(error)) { if i == 1 { cancel(errors.New("anything")) } writer.Write(i) - }, func(pipe <-chan any, cancel func(error)) { + }, func(pipe <-chan int, cancel func(error)) { for item := range pipe { - i := item.(int) + i := item result = append(result, i) } }) @@ -496,21 +484,19 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) { var done int32 var result []int - err := MapReduceVoid(func(source chan<- any) { + err := MapReduceVoid(func(source chan<- int) { for i := 0; i < defaultWorkers*2; i++ { source <- i } atomic.AddInt32(&done, 1) - }, func(item any, writer Writer, cancel func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], cancel func(error)) { if i == defaultWorkers/2 { cancel(errors.New("anything")) } writer.Write(i) - }, func(pipe <-chan any, cancel func(error)) { + }, func(pipe <-chan int, cancel func(error)) { for item := range pipe { - i := item.(int) - result = append(result, i) + result = append(result, item) } }) assert.NotNil(t, err) @@ -522,18 +508,18 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) { defer goleak.VerifyNone(t) uids := []int{1, 2, 3} - res, err := MapReduce(func(source chan<- any) { + res, err := MapReduce(func(source chan<- int) { for _, uid := range uids { source <- uid } - }, func(item any, writer Writer, cancel func(error)) { + }, func(item int, writer Writer[int], cancel func(error)) { writer.Write(item) - }, func(pipe <-chan any, writer Writer, cancel func(error)) { + }, func(pipe <-chan int, writer Writer[int], cancel func(error)) { drain(pipe) // not calling writer.Write(...), should not panic }) assert.Equal(t, ErrReduceNoOutput, err) - assert.Nil(t, res) + assert.Equal(t, 0, res) } func TestMapReduceVoidPanicInReducer(t *testing.T) { @@ -542,15 +528,14 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) { const message = "foo" assert.Panics(t, func() { var done int32 - _ = MapReduceVoid(func(source chan<- any) { + _ = MapReduceVoid(func(source chan<- int) { for i := 0; i < defaultWorkers*2; i++ { source <- i } atomic.AddInt32(&done, 1) - }, func(item any, writer Writer, cancel func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], cancel func(error)) { writer.Write(i) - }, func(pipe <-chan any, cancel func(error)) { + }, func(pipe <-chan int, cancel func(error)) { panic(message) }, WithWorkers(1)) }) @@ -561,13 +546,12 @@ func TestForEachWithContext(t *testing.T) { var done int32 ctx, cancel := context.WithCancel(context.Background()) - ForEach(func(source chan<- any) { + ForEach(func(source chan<- int) { for i := 0; i < defaultWorkers*2; i++ { source <- i } atomic.AddInt32(&done, 1) - }, func(item any) { - i := item.(int) + }, func(i int) { if i == defaultWorkers/2 { cancel() } @@ -580,20 +564,19 @@ func TestMapReduceWithContext(t *testing.T) { var done int32 var result []int ctx, cancel := context.WithCancel(context.Background()) - err := MapReduceVoid(func(source chan<- any) { + err := MapReduceVoid(func(source chan<- int) { for i := 0; i < defaultWorkers*2; i++ { source <- i } atomic.AddInt32(&done, 1) - }, func(item any, writer Writer, c func(error)) { - i := item.(int) + }, func(i int, writer Writer[int], c func(error)) { if i == defaultWorkers/2 { cancel() } writer.Write(i) - }, func(pipe <-chan any, cancel func(error)) { + }, func(pipe <-chan int, cancel func(error)) { for item := range pipe { - i := item.(int) + i := item result = append(result, i) } }, WithContext(ctx)) @@ -604,19 +587,19 @@ func TestMapReduceWithContext(t *testing.T) { func BenchmarkMapReduce(b *testing.B) { b.ReportAllocs() - mapper := func(v any, writer Writer, cancel func(error)) { - writer.Write(v.(int64) * v.(int64)) + mapper := func(v int64, writer Writer[int64], cancel func(error)) { + writer.Write(v * v) } - reducer := func(input <-chan any, writer Writer, cancel func(error)) { + reducer := func(input <-chan int64, writer Writer[int64], cancel func(error)) { var result int64 for v := range input { - result += v.(int64) + result += v } writer.Write(result) } for i := 0; i < b.N; i++ { - MapReduce(func(input chan<- any) { + MapReduce(func(input chan<- int64) { for j := 0; j < 2; j++ { input <- int64(j) } diff --git a/core/mr/readme-cn.md b/core/mr/readme-cn.md index 888868ca..7aaf3006 100644 --- a/core/mr/readme-cn.md +++ b/core/mr/readme-cn.md @@ -58,20 +58,19 @@ import ( ) func main() { - val, err := mr.MapReduce(func(source chan<- any) { + val, err := mr.MapReduce(func(source chan<- int) { // generator for i := 0; i < 10; i++ { source <- i } - }, func(item any, writer mr.Writer, cancel func(error)) { + }, func(i int, writer mr.Writer[int], cancel func(error)) { // mapper - i := item.(int) writer.Write(i * i) - }, func(pipe <-chan any, writer mr.Writer, cancel func(error)) { + }, func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) { // reducer var sum int for i := range pipe { - sum += i.(int) + sum += i } writer.Write(sum) }) diff --git a/core/mr/readme.md b/core/mr/readme.md index ba719a6c..3bb916b5 100644 --- a/core/mr/readme.md +++ b/core/mr/readme.md @@ -59,20 +59,19 @@ import ( ) func main() { - val, err := mr.MapReduce(func(source chan<- any) { + val, err := mr.MapReduce(func(source chan<- int) { // generator for i := 0; i < 10; i++ { source <- i } - }, func(item any, writer mr.Writer, cancel func(error)) { + }, func(i int, writer mr.Writer[int], cancel func(error)) { // mapper - i := item.(int) writer.Write(i * i) - }, func(pipe <-chan any, writer mr.Writer, cancel func(error)) { + }, func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) { // reducer var sum int for i := range pipe { - sum += i.(int) + sum += i } writer.Write(sum) }) diff --git a/gateway/server.go b/gateway/server.go index 489796e0..c6972709 100644 --- a/gateway/server.go +++ b/gateway/server.go @@ -63,12 +63,11 @@ func (s *Server) build() error { return err } - return mr.MapReduceVoid(func(source chan<- any) { + return mr.MapReduceVoid(func(source chan<- Upstream) { for _, up := range s.upstreams { source <- up } - }, func(item any, writer mr.Writer, cancel func(error)) { - up := item.(Upstream) + }, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) { cli := zrpc.MustNewClient(up.Grpc) source, err := s.createDescriptorSource(cli, up) if err != nil { @@ -109,9 +108,8 @@ func (s *Server) build() error { Handler: s.buildHandler(source, resolver, cli, m.RpcPath), }) } - }, func(pipe <-chan any, cancel func(error)) { - for item := range pipe { - route := item.(rest.Route) + }, func(pipe <-chan rest.Route, cancel func(error)) { + for route := range pipe { s.Server.AddRoute(route) } }) diff --git a/internal/health/health.go b/internal/health/health.go index fd4c8811..7c238d9c 100644 --- a/internal/health/health.go +++ b/internal/health/health.go @@ -9,7 +9,7 @@ import ( "github.com/zeromicro/go-zero/core/syncx" ) -// defaultHealthManager is global comboHealthManager for byone self. +// defaultHealthManager is global comboHealthManager. var defaultHealthManager = newComboHealthManager() type (