update readme to add mapreduce link

This commit is contained in:
kevin 2020-09-07 23:35:57 +08:00
parent 0e674933f3
commit 520f4d7c1b
3 changed files with 169 additions and 197 deletions

View File

@ -1,29 +1,176 @@
# mapreduce用法
# 通过MapReduce降低服务响应时间
## Map
go-zero微服务框架中提供了许多开箱即用的工具好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错实现代码风格的统一方便他人阅读等等本系列文章将分别介绍go-zero框架中工具的使用及其实现原理
> channel是Map的返回值
## 并发处理工具[MapReduce](https://github.com/tal-tech/go-zero/tree/master/core/mr)
由于Map是个并发操作如果不用range或drain的方式那么在使用返回值的时候可能Map里面的代码还在读写这个返回值可能导致数据不全或者`concurrent read write错误`
[MapReduce](https://zh.wikipedia.org/wiki/MapReduce)是Google提出的一个软件架构用于大规模数据集的并行运算go-zero中的MapReduce工具正是借鉴了这种架构思想
* 如果需要收集Map生成的结果那么使用如下方式
go-zero框架中的MapReduce工具主要用来对批量数据进行并发的处理以此来提升服务的性能
```
for v := range channel {
// v is with type interface{}
![mapreduce原理图](./images/mr.png)
我们通过几个示例来演示MapReduce的用法
MapReduce主要有三个参数第一个参数为generate用以生产数据第二个参数为mapper用以对数据进行处理第三个参数为reducer用以对mapper后的数据做聚合返回还可以通过opts选项设置并发处理的线程数量
场景一: 某些功能的结果往往需要依赖多个服务比如商品详情的结果往往会依赖用户服务、库存服务、订单服务等等一般被依赖的服务都是以rpc的形式对外提供为了降低依赖的耗时我们往往需要对依赖做并行处理
```
func productDetail(uid, pid int64) (*ProductDetail, error) {
var pd ProductDetail
err := mr.Finish(func() (err error) {
pd.User, err = userRpc.User(uid)
return
}, func() (err error) {
pd.Store, err = storeRpc.Store(pid)
return
}, func() (err error) {
pd.Order, err = orderRpc.Order(pid)
return
})
if err != nil {
log.Printf("product detail error: %v", err)
return nil, err
}
```
* 如果不需要收集结果那么就需要显式的调用mapreduce.Drain
```
mapreduce.Drain(channel)
```
## MapReduce
return &pd, nil
}
```
该示例中返回商品详情依赖了多个服务获取数据,因此做并发的依赖处理,对接口的性能有很大的提升
场景二: 很多时候我们需要对一批数据进行处理比如对一批用户id效验每个用户的合法性并且效验过程中有一个出错就认为效验失败返回的结果为效验合法的用户id
```
func checkLegal(uids []int64) ([]int64, error) {
r, err := mr.MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
uid := item.(int64)
ok, err := check(uid)
if err != nil {
cancel(err)
}
if ok {
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var uids []int64
for p := range pipe {
uids = append(uids, p.(int64))
}
writer.Write(uids)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}
return r.([]int64), nil
}
func check(uid int64) (bool, error) {
// do something check user legal
return true, nil
}
```
该示例中如果check过程出现错误则通过cancel方法结束效验过程并返回error整个效验过程结束如果某个uid效验结果为false则最终结果不返回该uid
**MapReduce使用注意事项**
* mapper和reducer中都可以调用cancel参数为error调用后立即返回返回结果为nil, error
* mapper中如果不调用writer.Write则item最终不会被reducer聚合
* reducer中如果不调用writer.Wirte则返回结果为nil, ErrReduceNoOutput
* reducer为单线程所有mapper出来的结果在这里串行聚合
***实现原理分析:***
MapReduce中首先通过buildSource方法通过执行generate(参数为无缓冲channel)产生数据并返回无缓冲的channelmapper会从该channel中读取数据
```
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})
go func() {
defer close(source)
generate(source)
}()
return source
}
```
在MapReduceWithSource方法中定义了cancel方法mapper和reducer中都可以调用该方法调用后主线程收到close信号会立马返回
```
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
// 默认的error
retErr.Set(ErrCancelWithNil)
}
drain(source)
// 调用close(ouput)主线程收到Done信号立马返回
finish()
})
```
在mapperDispatcher方法中调用了executeMappersexecuteMappers消费buildSource产生的数据每一个item都会起一个goroutine单独处理默认最大并发数为16可以通过WithWorkers进行设置
```
var wg sync.WaitGroup
defer func() {
wg.Wait() // 保证所有的item都处理完成
close(collector)
}()
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // 将mapper处理完的数据写入collector
for {
select {
case <-done: // 当调用了cancel会触发立即返回
return
case pool <- lang.Placeholder: // 控制最大并发数
item, ok := <-input
if !ok {
<-pool
return
}
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-pool
}()
mapper(item, writer) // 对item进行处理处理完调用writer.Write把结果写入collector对应的channel中
}()
}
}
```
reducer单goroutine对数mapper写入collector的数据进行处理如果reducer中没有手动调用writer.Write则最终会执行finish方法对output进行close避免死锁
```
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
}()
```
在该工具包中还提供了许多针对不同业务场景的方法实现原理与MapReduce大同小异感兴趣的同学可以查看源码学习
* MapReduceVoid 功能和MapReduce类似但没有结果返回只返回error
* Finish 处理固定数量的依赖返回error有一个error立即返回
* FinishVoid 和Finish方法功能类似没有返回值
* Map 只做generate和mapper处理返回channel
* MapVoid 和Map功能类似无返回
本文主要介绍了go-zero框架中的MapReduce工具在实际的项目中非常实用。用好工具对于提升服务性能和开发效率都有很大的帮助希望本篇文章能给大家带来一些收获。
* mapper和reducer方法里可以调用cancel调用了cancel之后返回值会是`nil, false`
* mapper里面如果有item不写入writer那么这个item就不会被reduce收集
* mapper里面如果有处理item时panic那么这个item也不会被reduce收集
* reduce是单线程所有mapper出来的结果在这里串行处理
* reduce里面不写writer或者panic会导致返回`nil, false`

176
doc/mr.md
View File

@ -1,176 +0,0 @@
# go-zero如何通过MapReduce降低服务的响应时间
go-zero微服务框架中提供了许多开箱即用的工具好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错实现代码风格的统一方便他人阅读等等本系列文章将分别介绍go-zero框架中工具的使用及其实现原理
## 并发处理工具[MapReduce](https://github.com/tal-tech/go-zero/tree/master/core/mr)
[MapReduce](https://zh.wikipedia.org/wiki/MapReduce)是Google提出的一个软件架构用于大规模数据集的并行运算go-zero中的MapReduce工具正是借鉴了这种架构思想
go-zero框架中的MapReduce工具主要用来对批量数据进行并发的处理以此来提升服务的性能
![mapreduce原理图](./images/mr.png)
我们通过几个示例来演示MapReduce的用法
MapReduce主要有三个参数第一个参数为generate用以生产数据第二个参数为mapper用以对数据进行处理第三个参数为reducer用以对mapper后的数据做聚合返回还可以通过opts选项设置并发处理的线程数量
场景一: 某些功能的结果往往需要依赖多个服务比如商品详情的结果往往会依赖用户服务、库存服务、订单服务等等一般被依赖的服务都是以rpc的形式对外提供为了降低依赖的耗时我们往往需要对依赖做并行处理
```
func productDetail(uid, pid int64) (*ProductDetail, error) {
var pd ProductDetail
err := mr.Finish(func() (err error) {
pd.User, err = userRpc.User(uid)
return
}, func() (err error) {
pd.Store, err = storeRpc.Store(pid)
return
}, func() (err error) {
pd.Order, err = orderRpc.Order(pid)
return
})
if err != nil {
log.Printf("product detail error: %v", err)
return nil, err
}
return &pd, nil
}
```
该示例中返回商品详情依赖了多个服务获取数据,因此做并发的依赖处理,对接口的性能有很大的提升
场景二: 很多时候我们需要对一批数据进行处理比如对一批用户id效验每个用户的合法性并且效验过程中有一个出错就认为效验失败返回的结果为效验合法的用户id
```
func checkLegal(uids []int64) ([]int64, error) {
r, err := mr.MapReduce(func(source chan<- interface{}) {
for _, uid := range uids {
source <- uid
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
uid := item.(int64)
ok, err := check(uid)
if err != nil {
cancel(err)
}
if ok {
writer.Write(uid)
}
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var uids []int64
for p := range pipe {
uids = append(uids, p.(int64))
}
writer.Write(uids)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}
return r.([]int64), nil
}
func check(uid int64) (bool, error) {
// do something check user legal
return true, nil
}
```
该示例中如果check过程出现错误则通过cancel方法结束效验过程并返回error整个效验过程结束如果某个uid效验结果为false则最终结果不返回该uid
**MapReduce使用注意事项**
* mapper和reducer中都可以调用cancel参数为error调用后立即返回返回结果为nil, error
* mapper中如果不调用writer.Write则item最终不会被reducer聚合
* reducer中如果不调用writer.Wirte则返回结果为nil, ErrReduceNoOutput
* reducer为单线程所有mapper出来的结果在这里串行聚合
***实现原理分析:***
MapReduce中首先通过buildSource方法通过执行generate(参数为无缓冲channel)产生数据并返回无缓冲的channelmapper会从该channel中读取数据
```
func buildSource(generate GenerateFunc) chan interface{} {
source := make(chan interface{})
go func() {
defer close(source)
generate(source)
}()
return source
}
```
在MapReduceWithSource方法中定义了cancel方法mapper和reducer中都可以调用该方法调用后主线程收到close信号会立马返回
```
cancel := once(func(err error) {
if err != nil {
retErr.Set(err)
} else {
// 默认的error
retErr.Set(ErrCancelWithNil)
}
drain(source)
// 调用close(ouput)主线程收到Done信号立马返回
finish()
})
```
在mapperDispatcher方法中调用了executeMappersexecuteMappers消费buildSource产生的数据每一个item都会起一个goroutine单独处理默认最大并发数为16可以通过WithWorkers进行设置
```
var wg sync.WaitGroup
defer func() {
wg.Wait() // 保证所有的item都处理完成
close(collector)
}()
pool := make(chan lang.PlaceholderType, workers)
writer := newGuardedWriter(collector, done) // 将mapper处理完的数据写入collector
for {
select {
case <-done: // 当调用了cancel会触发立即返回
return
case pool <- lang.Placeholder: // 控制最大并发数
item, ok := <-input
if !ok {
<-pool
return
}
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-pool
}()
mapper(item, writer) // 对item进行处理处理完调用writer.Write把结果写入collector对应的channel中
}()
}
}
```
reducer单goroutine对数mapper写入collector的数据进行处理如果reducer中没有手动调用writer.Write则最终会执行finish方法对output进行close避免死锁
```
go func() {
defer func() {
if r := recover(); r != nil {
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reducer(collector, writer, cancel)
}()
```
在该工具包中还提供了许多针对不同业务场景的方法实现原理与MapReduce大同小异感兴趣的同学可以查看源码学习
* MapReduceVoid 功能和MapReduce类似但没有结果返回只返回error
* Finish 处理固定数量的依赖返回error有一个error立即返回
* FinishVoid 和Finish方法功能类似没有返回值
* Map 只做generate和mapper处理返回channel
* MapVoid 和Map功能类似无返回
本文主要介绍了go-zero框架中的MapReduce工具在实际的项目中非常实用。用好工具对于提升服务性能和开发效率都有很大的帮助go-zero框架中还提供了许多其他的实用工具由于篇幅有限后续文章再做介绍希望本篇文章能给大家带来帮助

View File

@ -204,6 +204,7 @@ Content-Length: 0
* [快速构建高并发微服务](doc/shorturl.md)
* [快速构建高并发微服务-多RPC版](doc/bookstore.md)
* [goctl使用帮助](doc/goctl.md)
* [通过MapReduce降低服务响应时间](doc/mapreduce.md)
* [关键字替换和敏感词过滤工具](doc/keywords.md)
## 9. 微信交流群