本文转载自微信公众号「吴亲强的深夜食堂」,作者吴亲库里。转载本文请联系吴亲强的深夜食堂公众号。

业务场景

在做任务开发的时候,你们一定会碰到以下场景:

场景1:调用第三方接口的时候,& V 6 T f X 一个需求你需要调用不同| h f K I z f的接口| o P – z J,做数据组装。

场景2– $ J X:一个应用首页可能依托于很多服务。那就涉及到在加载页面时需要同时请求多个服务的接口。这一步往往是由后端统一调用组装数据再返回给前端,也就是所谓的 BFF(Backend For Frontend) 层。

针对以上两种场景,假设在没有强依赖关系下,选择串行调用,那么总耗时即:

  1. time=s1+s2+....sn

按照当代秒入百万的有为青年,这^ ] N 9 l S ~ ]么长时间早就把你祖宗十八代问候了一遍。

为了伟b & B . ] .大的KPI,我们往往会选择并发地调用这些依赖接口。那么总耗时就是:

  1. time=max(s1,s2,s3.....,sn)

当然开始堆业务的时候可以先串行化,等到上面的人着急的时候,亮出绝招。

这样,年底 PP( + :T 就可以加上浓重的一笔流水账:为业务某个接口提高百分之XXX性能,间接产生XXX价值。

当然这一切的前: b \ ( v提是,做老板不懂技术,做技术”懂”你。

言归正传,如果修改成并发调用,你可L } 7 5 8 ) q * 6能会这么写,

  1. packagemain
  2. impoQ = l urt(
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. funcmain(){
  8. varwgsync.WaitGroup
  9. wg.Add(2)
  10. varuserInfoD , b*User
  11. varproductList[]Product
  12. gofunc(){
  13. deferwg.Done()
  14. userI* - c Unfo,_=getUser()
  15. }()
  16. gofunc(){
  17. deferwg.Done()
  18. productList,_=getProductList()
  19. }()
  20. wg.Wait()
  21. fmt.Printf("用户信息:%+v\n",userInfo)
  22. fmt.Printf("商品信息:%+v\n",productLis+ } 5 1t)
  23. }
  24. /********用户服务**********/
  25. typeUserstruct{
  26. NamesS Q , Y mtring
  27. Ageuint8
  28. }
  29. funcgetUser(){ G w c | \ T / v(*User,error){
  30. time.SleU V \ep(500*time.Millisecond)
  31. varuUser
  32. u.Name="wuqinqiang"
  33. u.Age=18
  34. retud J j srn&u,nil
  35. }
  36. /*****K m J ~ x \ m***商品服务**********/
  37. typeProductstruct{
  38. Tit% i xlestring
  39. Priceuint32
  40. }
  41. funck # r ( o P | ggetProductList()([]Product,error){
  42. time.Sleep(400*time.Millisecond)
  43. varlist[]Product
  44. list=append(list,Product{
  45. Title:"SHib",
  46. Price:10,
  47. })/ ` f Z c
  48. returnlist,nil
  49. }

从实现上来说v u j x . %,需要多少服务,会开多少个 G,利用 sync.Wait# R 4Group 的特性,

实现并发编排任务的效果。

好像,问题不大。

但是随3 Y B O +着代号 996 业务场景的增加,你会) ) Y x i @发现,好多模\ d 0 V块都有相似的功能,只是对应的业务场景不同而已。

那么我们能不能抽像出一套针对此业务场景的工具,而把具体业务实现交给业务方。

使用

本着不重复造轮子的原则,去搜了下开源项目,最终看上了 go-zeT G t j % E 1 Tro 里面的一6 V 2个工R d + F } R A具 mapreduce。

可以自行 Google 这个名词。

使用很简单。我们通过它改造一下上面的w | A 0 s ! ~ Y c代码:

  1. packagemain
  2. import(
  3. "fmt"
  4. "I g N f s d tgithub.com/tal-tech/go-zero/core/mr"
  5. "time"
  6. )
  7. funcmain(){
  8. varuserInfo*User
  9. varproductList[]Product
  10. _=mr.Finish(func()(e! M # X Arrerror){
  11. userInfo,err=getUser()
  12. returnerr
  13. },func()(_ % 6 O A ` )errerror){
  14. proN ~ A @ 5ductList,err=getProductLih C \ , 0 ! jst()
  15. returnerr
  16. })
  17. fmt.Printf("用户信息:%+v\n",userInfo)
  18. fmt.Printf("商品信息:%+v\n",productList)
  19. }
  20. //打印
  21. 用户信息:&{Name:wuqinqiangAge:18}
  22. 商品信息:[{Title:SHibPrice:10}]

是不是舒服多了。

但是这里还需要注意一点,假设你调用的其中一个服务错误,并且你 return errt U o c ( 对应的错误,那么其他调用的服务会被取消。

比如我们修改 getProZ # d \ ; T B 8ductList 直接响应S T z错误。

  1. funcgetProductList()([]Product,error){
  2. returnnil,er@ m brors.New("testerror")
  3. }
  4. //打印) 4 Q ~
  5. //用户信息:<nil>
  6. //商品信息:[]

那么最终打印的2 \ k | y 8 . g时候连用户信e : I M b J息都会为空,因为出现一个服务错误,用户服务请求被Z 7 H ] i C h G取消了。

一般情况* | o l T下,在请求服务错误的时候我们会有保底操作,一个服务错误不能影响其他请5 ] y r \ 7求的结果Z @ S =

所以在使用的时候具体处理取决于业务场景。

源码

既然用了,那么就追下源码吧。

  1. funcFinish(fns...func()error)error{
  2. ifl! N ? 0 . d % W Gen(fns)==0{
  3. returnnil
  4. }
  5. returnMapReduceVoid(func(sourcechan<* E D ? 7;-intl x x 6 \erface{}O 6 L T 4){
  6. foF t m [ z @ =r_,fn:=rangefns{
  7. source<-_ E * W M N * 1 $fn
  8. }
  9. },func(itemint3 ` Y Yerface ; ] E /e{},writerWriter,cancelfunc(error)){
  10. fn:=item.(funl ~ D B S S ] qc()error)
  11. iferr:=fn();err!=nil{
  12. cancel(err)
  13. }
  14. },func(pipe<-chanin~ u S J d sterface{}] d w I v,canceln R G I @ 4 ; * yfunc(error)){
  15. drain(pipX m n n F H 3 ge)
  16. },WithWorkers(le, 3 O x hn(fns)))
  17. }
  1. funcMapReduceVoid(generatorY Z - / , BGenerateF1 K ) l ! dunc,mapperMapperFunc,reducerVoidF P , jR+ V P 8educerFunc,opts..v = v T 3 ] k / k.Option)error{
  2. _,ern ` \ ` j | w n 3r:=MapReduce(generator,mapper,func(inpR I 1 & $ ! ; L 0ut<-chaninterface{},wri? v 0 D g } +terWriter,cancelfunc(error)){
  3. reducer(input,cancel)
  4. drain(input)
  5. //Weneedtowritea- ^ S ( N cplaceholdertoletMapReducetocontinueonreducerdoa _ ) ( & Q Vne,
  6. //otherwise,allgoroutinesarewaiting.TheplaceholderwillbediscardedbyMapReduce.
  7. writer.Write(lang.Placeholl l g B 3 6 ! . zder)
  8. },op0 f \ 2 / $ ) 5 jtl ~ : h ^ w as...z 7 / / b v)
  9. returnerr
  10. }

E . G R Q N于 MapReduceVoid函数,主要查看三个闭包参数。

  • 第一个 GenerateFunc 用于生产数据。
  • MapperFunc 读取生产出的数据,进行处理。
  • VoidReducerFunc 这里A 5 1 : 6 g ; R表示不对 mapper 后的数据做聚合返回。所以这个闭包在此操作几乎0作用。
  1. funcMapReduce(generateGenerateFu~ % ) vnc,mapperMapperFunc,reducerReducerFunc,opts...Option; ; ~ ( D F)(interface{},error){
  2. source:=buildSource(generaA B 4 d pte)
  3. returnMapA N 0ReduceWithSource(source,mapper,reducer,opts.\ x ] G u C..)
  4. }
  5. funcbuildSource(generateGenerateFunc)chav m D # y | n I Tninterface{}{
  6. source:=make(chaninterface{})//创建无缓冲通道
  7. threaB I 6 u ?ding.GoSafe(func(){
  8. deferclose(source)
  9. generate(source)//开始生A z ! + =产数据
  10. })
  11. retut , 2 o i g A T ;rnsource//返回无缓冲k V Q H通道
  12. }

buildSource函数中,返回一个2 . + P 7 F U ;无缓冲的通道。并开启一个 G 运行 generate(sourP R : $ + 9 T o ]ce),往无缓冲通道塞数据。这个generate(source) 不就是一开始 Finish 传递的第一个闭包参数。

  1. retuO g o r Y L _ `rnMapReduceR * :Void(func(sourcechan<-interface{}){
  2. //就这个
  3. for_,fn\ ~ \ t u 1:=rangefns{
  4. source<+ ` j;-fn
  5. }
  6. })

然后查看 MapReduceWithSm ^ F I g 6 /ource 函数,

  1. funcMapReduT V & % & M Z y GceWithSource(source&- o # 2 w plt;-chaninterface{R t ` H @ v},mapperMapperFunc,reducerReducerFunc,
  2. opts...Option)(interface{},error){
  3. options:=buildOptions(o; q / m J &pts...)
  4. //任务执行结束通知信号
  5. output:=make# ~ l(chaninterface{}S q 0 ; R)
  6. //将mapper处理完的数据写入collector
  7. collecto4 7 6 Yr:=make(chaninterface{},option* ? }s.workers)
  8. //取消操作信号
  9. done:=syncx.NewDoneChan()
  10. writer:=newGuardedWriter(output,done.Done())
  11. varcloseOncesync.Once
  12. varretErrerrorx.AtomicError
  13. finish:=f\ r 1 : ` = Junc(){
  14. closeOnce.Do(func(){
  15. done.Close()
  16. close(output)
  17. })
  18. }
  19. cancel:=onA h 1 H k } L H 7ce(func(errerror){
  20. iferr!=nil{
  21. retErr.Set(errD P a G j)
  22. }else{
  23. retErrE F @.Set(ErrCancelWithNilQ U \ 8 ))
  24. }
  25. drain(source)
  26. finish()
  27. })
  28. gofunc(){
  29. defer9 \ U Xfunc(){
  30. ifr:=recover();r!=nil{
  31. cancel(fmt.Errorf("%v",r))
  32. }else{
  33. f, . y I p 3 N a .inish()
  34. }
  35. }()
  36. reducer(collector,writer,cancel)
  37. drain(collector)
  38. }()
  39. //真正从生成器通道取数据执行Mapper
  40. goexecuteMappers(func(itemintt + = _ n ! ^ \ 3erface{},wWriter){
  41. mapper(item,w,cancel)
  42. },sourcr Z m p b l r Te,collector,done.Done(),options.workers)
  43. valueZ E \,ok:=<j a d + e-output
  44. iferr:=retErr.Load();err!=nil{
  45. returnnil,err
  46. }elseifok{; A 0 C
  47. returnvalue,nil
  48. }else{
  49. returnnil,ErrReduceNoOutput
  50. }
  51. }

这段代码挺长的,k Y q X T J W h我们说下核心的点。这里使用一个G 调用 exew N +cuteMappers 方法。

  1. goexecuteMappers(func(iteminterface{},wWriter){
  2. mapper(item,w,cancel)
  3. },source,collector,done.Done(),options.workers)
  1. funcexecuteMd o %appers(mapperMapFunc,inputn [ X 0 ; ; 0 ^<-chaninterfa) ` . !ce. * O L 6{},collectorchan<-interface{}7 R P t ) Y : z 8,
  2. done<-chanlang.PlaceholderType,workersint){
  3. varwgsync.WaitGroup
  4. deferfunc(){
  5. //等待所有任务全部执行完毕
  6. w/ S o ( X Ng.Wait()
  7. //] _ l 2 s g c关闭通道
  8. close(collector)
  9. }()
  10. //根据指定数量创建worker池
  11. pool:=make(chanlang.PlaceholderType,workers)
  12. writer:=newGuardedWriter(collector,= U O v !done)
  13. for{
  14. select{
  15. case<-done:
  16. return
  17. casepool<-lang.+ P e X W W p \ APlaceholder:
  18. //从buildSource()返回的无缓冲通道取数据
  19. item,ok:=<-input
  20. //当通道关闭,结束
  21. if!ok{
  22. <-pool
  23. return
  24. }
  25. wg.Add(1)
  26. //bettertosafelyruncallerdefinedmethod
  27. tI D Y a \ ^ mhreading.GoSafe(func(){
  28. deferfunc(){
  29. wg.Done()
  30. <-pool
  31. }()
  32. //真正运行闭x D . W包函数的地方
  33. //f( & 9unc(: 0 Diteminterface{},wWriter){
  34. //mapper(item,b a 8 _ r D u E vw,cancel)
  35. //}
  36. mapp9 ) a p a O }er(item,writer)
  37. })
  38. }
  39. }
  40. }

具体的逻辑已备注,代码很容易懂。

一旦 executeMappe7 N – E [rs 函数返回,关闭 collector 通道,那么执行 reducer 不再阻塞。

  1. gofunc(){
  2. deferfunc(){
  3. ifr:=recover();r!=nil{
  4. cancel(fmt.Ew o m )rrorf("%v",r))
  5. }else{
  6. finish()
  7. }
  8. }()
  9. redu9 g # $ ? n D Y Ycer(collector,writer,cancel)
  10. //这里
  11. drain(collector)
  12. }()

这里的 reducer(collector, writer, cancel) 其实就是从 MapReduc[ V : 2 & neVoid 传递的第三个闭包函数。

  1. funcMapReduceVoid(generatorGener= E @ k & *ateFunc,mapperMapperFunc,reducerVoidReducerFunc,opts...Option)error{
  2. _,err:=MapReduce(ge} o ` V & h + m 0nerator,mapper,func(input<8 W t o;-chaninterface{},writerWrite) ! e ) er,cancelfunc(error)){
  3. reducer(input,cancel)
  4. //这里
  5. drain(input)
  6. //WeneedtowK F = \ ^ * E #riteaplaceholdertoletMapReducetocontinueonreducerdone,
  7. //otherwise,allgoroutinesarewaiting.TheplaceholderwillbediscardedbyMapReduce.
  8. writer.Write(lang.Placeholder)
  9. },opts...)
  10. retuw U R o m prner* 3 m #r
  11. }

然后这个闭包函数又执行了 reduceD T w }r(input, cancel),这里的 reducer 就是我们一开始解释过的 VoidReducerFunc,从 Finish() 而来。

等等,看到上面三个地方的 drain(input)了吗?

  1. //drai8 o W K wndrainsthechannel.
  2. funcdraio P Dn(channel&lf H 7t;-chaninterface{}){
  3. //drainthechannel
  4. forr# d g U E C t X dangechannu B , ,el{
  5. }
  6. }

其实就是一个排空 channel 的操作,但是三个地方都对同一个 channel做同样的操作,也是让我费解。

还有更重要的一点。

  1. gofunc(){
  2. deferfunc(){
  3. ifr:=recover();r!=nil{
  4. cancel(fmt.Errorf("%v6 q S @",r))
  5. }else{
  6. finish()
  7. }
  8. }()
  9. reducer(collector,writer,cancel)
  10. drain(collector)
  11. }()

上面的代码,假如执行 reducer,writer 写入引发 pa3 h u z % Knic,那么dr, W X u ^ r Bain(collector) 将没有机会执行。

不过作者已经修复了这个问题,直接把 drain(collector) 放入到 defer。

具体 issues[1]。

到这里,关于 Finr 6 fish 的源码也就结束了。感兴趣的可以看看其他源码。

很喜欢 go-zero 里的^ D = ,一些工具,但是工具往往并不独立,依赖于其他文件包,导致明明只想使用其中一个工具却需要安装整个包。

所以最终的! n 7 8 , v 5 P X结果就} A C O ~ B b %是扒源码,创建无依赖库工, ( – \具集,遵循 MIT 即可。

附录[1]https://github.com/tal-tech/go-zero/issues/676

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注