diff --git a/core/executors/periodicalexecutor.go b/core/executors/periodicalexecutor.go index d5372bf6..c5c17b40 100644 --- a/core/executors/periodicalexecutor.go +++ b/core/executors/periodicalexecutor.go @@ -6,6 +6,7 @@ import ( "time" "zero/core/proc" + "zero/core/syncx" "zero/core/threading" "zero/core/timex" ) @@ -30,6 +31,8 @@ type ( interval time.Duration container TaskContainer waitGroup sync.WaitGroup + // avoid race condition on waitGroup when calling wg.Add/Done/Wait(...) + wgBarrier syncx.Barrier guarded bool newTicker func(duration time.Duration) timex.Ticker lock sync.Mutex @@ -74,7 +77,9 @@ func (pe *PeriodicalExecutor) Sync(fn func()) { } func (pe *PeriodicalExecutor) Wait() { - pe.waitGroup.Wait() + pe.wgBarrier.Guard(func() { + pe.waitGroup.Wait() + }) } func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) { @@ -131,8 +136,12 @@ func (pe *PeriodicalExecutor) backgroundFlush() { } func (pe *PeriodicalExecutor) executeTasks(tasks interface{}) bool { - pe.waitGroup.Add(1) - defer pe.waitGroup.Done() + pe.wgBarrier.Guard(func() { + pe.waitGroup.Add(1) + }) + defer pe.wgBarrier.Guard(func() { + pe.waitGroup.Done() + }) ok := pe.hasTasks(tasks) if ok {