go-zero/core/mr/mapreduce.go

319 lines
7.5 KiB
Go
Raw Normal View History

2020-07-29 22:34:37 +08:00
package mr
2020-07-26 17:09:05 +08:00
import (
"context"
2020-07-26 17:09:05 +08:00
"errors"
"fmt"
"sync"
"github.com/zeromicro/go-zero/core/errorx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/threading"
2020-07-26 17:09:05 +08:00
)
const (
defaultWorkers = 16
minWorkers = 1
)
var (
2021-02-22 09:47:06 +08:00
// ErrCancelWithNil is an error that mapreduce was cancelled with nil.
ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
// ErrReduceNoOutput is an error that reduce did not output a value.
ErrReduceNoOutput = errors.New("reduce not writing value")
)
2020-07-26 17:09:05 +08:00
type (
// ForEachFunc is used to do element processing, but no output.
ForEachFunc func(item interface{})
2021-02-22 09:47:06 +08:00
// GenerateFunc is used to let callers send elements into source.
GenerateFunc func(source chan<- interface{})
// MapFunc is used to do element processing and write the output to writer.
MapFunc func(item interface{}, writer Writer)
// MapperFunc is used to do element processing and write the output to writer,
// use cancel func to cancel the processing.
MapperFunc func(item interface{}, writer Writer, cancel func(error))
// ReducerFunc is used to reduce all the mapping output and write to writer,
// use cancel func to cancel the processing.
ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
// VoidReducerFunc is used to reduce all the mapping output, but no output.
// Use cancel func to cancel the processing.
2020-07-26 17:09:05 +08:00
VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))
2021-02-22 09:47:06 +08:00
// Option defines the method to customize the mapreduce.
Option func(opts *mapReduceOptions)
2020-07-26 17:09:05 +08:00
mapReduceOptions struct {
ctx context.Context
2020-07-26 17:09:05 +08:00
workers int
}
2021-02-22 09:47:06 +08:00
// Writer interface wraps Write method.
2020-07-26 17:09:05 +08:00
Writer interface {
Write(v interface{})
}
)
2021-02-22 09:47:06 +08:00
// Finish runs fns parallelly, cancelled on any error.
2020-07-26 17:09:05 +08:00
func Finish(fns ...func() error) error {
if len(fns) == 0 {
return nil
}
return MapReduceVoid(func(source chan<- interface{}) {
for _, fn := range fns {
source <- fn
}
}, func(item interface{}, writer Writer, cancel func(error)) {
fn := item.(func() error)
if err := fn(); err != nil {
cancel(err)
}
}, func(pipe <-chan interface{}, cancel func(error)) {
}, WithWorkers(len(fns)))
}
2021-02-22 09:47:06 +08:00
// FinishVoid runs fns parallelly.
2020-07-26 17:09:05 +08:00
func FinishVoid(fns ...func()) {
if len(fns) == 0 {
return
}
ForEach(func(source chan<- interface{}) {
2020-07-26 17:09:05 +08:00
for _, fn := range fns {
source <- fn
}
}, func(item interface{}) {
fn := item.(func())
fn()
}, WithWorkers(len(fns)))
}
// ForEach maps all elements from given generate but no output.
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
drain(Map(generate, func(item interface{}, writer Writer) {
mapper(item)
}, opts...))
}
2021-02-22 09:47:06 +08:00
// Map maps all elements generated from given generate func, and returns an output channel.
2020-07-26 17:09:05 +08:00
func Map(generate GenerateFunc, mapper MapFunc, opts ...Option) chan interface{} {
options := buildOptions(opts...)
source := buildSource(generate)
collector := make(chan interface{}, options.workers)
2022-01-01 19:24:35 +08:00
done := make(chan lang.PlaceholderType)
2020-07-26 17:09:05 +08:00
2022-01-01 19:24:35 +08:00
go executeMappers(options.ctx, mapper, source, collector, done, options.workers)
2020-07-26 17:09:05 +08:00
return collector
}
2021-02-22 09:47:06 +08:00
// MapReduce maps all elements generated from given generate func,
2021-04-17 20:15:19 +08:00
// and reduces the output elements with given reducer.
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
opts ...Option) (interface{}, error) {
2020-07-26 17:09:05 +08:00
source := buildSource(generate)
return MapReduceChan(source, mapper, reducer, opts...)
2020-07-26 17:09:05 +08:00
}
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
2020-07-26 17:09:05 +08:00
opts ...Option) (interface{}, error) {
options := buildOptions(opts...)
output := make(chan interface{})
2021-08-26 16:47:28 +08:00
defer func() {
for range output {
panic("more than one element written in reducer")
}
}()
2020-07-26 17:09:05 +08:00
collector := make(chan interface{}, options.workers)
2022-01-01 19:24:35 +08:00
done := make(chan lang.PlaceholderType)
writer := newGuardedWriter(options.ctx, output, done)
var closeOnce sync.Once
2020-07-26 17:09:05 +08:00
var retErr errorx.AtomicError
finish := func() {
closeOnce.Do(func() {
2022-01-01 19:24:35 +08:00
close(done)
close(output)
})
}
2020-07-26 17:09:05 +08:00
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
retErr.Set(ErrCancelWithNil)
}
drain(source)
finish()
2020-07-26 17:09:05 +08:00
})
go func() {
defer func() {
2021-05-10 23:10:57 +08:00
drain(collector)
2020-07-26 17:09:05 +08:00
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
2020-07-26 17:09:05 +08:00
}
}()
2021-05-10 23:10:57 +08:00
2020-07-26 17:09:05 +08:00
reducer(collector, writer, cancel)
}()
2020-09-16 16:48:51 +08:00
go executeMappers(options.ctx, func(item interface{}, w Writer) {
2020-09-16 20:03:30 +08:00
mapper(item, w, cancel)
2022-01-01 19:24:35 +08:00
}, source, collector, done, options.workers)
2020-07-26 17:09:05 +08:00
select {
case <-options.ctx.Done():
cancel(context.DeadlineExceeded)
return nil, context.DeadlineExceeded
case value, ok := <-output:
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return value, nil
} else {
return nil, ErrReduceNoOutput
}
2020-07-26 17:09:05 +08:00
}
}
2021-02-22 09:47:06 +08:00
// MapReduceVoid maps all elements generated from given generate,
// and reduce the output elements with given reducer.
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
2020-07-26 17:09:05 +08:00
reducer(input, cancel)
}, opts...)
if errors.Is(err, ErrReduceNoOutput) {
return nil
}
2020-07-26 17:09:05 +08:00
return err
2020-07-26 17:09:05 +08:00
}
// WithContext customizes a mapreduce processing accepts a given ctx.
func WithContext(ctx context.Context) Option {
return func(opts *mapReduceOptions) {
opts.ctx = ctx
}
}
2021-02-22 09:47:06 +08:00
// WithWorkers customizes a mapreduce processing with given workers.
2020-07-26 17:09:05 +08:00
func WithWorkers(workers int) Option {
return func(opts *mapReduceOptions) {
if workers < minWorkers {
opts.workers = minWorkers
} else {
opts.workers = workers
}
}
}
func buildOptions(opts ...Option) *mapReduceOptions {
options := newOptions()
for _, opt := range opts {
opt(options)
}
return options
}
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})
threading.GoSafe(func() {
defer close(source)
generate(source)
})
return source
}
// drain drains the channel.
func drain(channel <-chan interface{}) {
// drain the channel
for range channel {
}
}
func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{},
collector chan<- interface{}, done <-chan lang.PlaceholderType, workers int) {
2020-07-26 17:09:05 +08:00
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(collector)
}()
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(ctx, collector, done)
2020-07-26 17:09:05 +08:00
for {
select {
case <-ctx.Done():
return
2020-07-26 17:09:05 +08:00
case <-done:
return
case pool <- lang.Placeholder:
item, ok := <-input
if !ok {
<-pool
return
}
wg.Add(1)
// better to safely run caller defined method
threading.GoSafe(func() {
defer func() {
wg.Done()
<-pool
}()
mapper(item, writer)
})
}
}
}
func newOptions() *mapReduceOptions {
return &mapReduceOptions{
ctx: context.Background(),
2020-07-26 17:09:05 +08:00
workers: defaultWorkers,
}
}
func once(fn func(error)) func(error) {
once := new(sync.Once)
return func(err error) {
once.Do(func() {
fn(err)
})
}
}
type guardedWriter struct {
ctx context.Context
2020-07-26 17:09:05 +08:00
channel chan<- interface{}
done <-chan lang.PlaceholderType
}
func newGuardedWriter(ctx context.Context, channel chan<- interface{},
done <-chan lang.PlaceholderType) guardedWriter {
2020-07-26 17:09:05 +08:00
return guardedWriter{
ctx: ctx,
2020-07-26 17:09:05 +08:00
channel: channel,
done: done,
}
}
func (gw guardedWriter) Write(v interface{}) {
select {
case <-gw.ctx.Done():
return
2020-07-26 17:09:05 +08:00
case <-gw.done:
return
default:
gw.channel <- v
}
}