package breaker import ( "time" "github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/timex" ) const ( // 250ms for bucket duration window = time.Second * 10 buckets = 40 forcePassDuration = time.Second k = 1.5 minK = 1.1 protection = 5 ) // googleBreaker is a netflixBreaker pattern from google. // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/ type ( googleBreaker struct { k float64 stat *collection.RollingWindow[int64, *bucket] proba *mathx.Proba lastPass *syncx.AtomicDuration } windowResult struct { accepts int64 total int64 failingBuckets int64 workingBuckets int64 } ) func newGoogleBreaker() *googleBreaker { bucketDuration := time.Duration(int64(window) / int64(buckets)) st := collection.NewRollingWindow[int64, *bucket](func() *bucket { return new(bucket) }, buckets, bucketDuration) return &googleBreaker{ stat: st, k: k, proba: mathx.NewProba(), lastPass: syncx.NewAtomicDuration(), } } func (b *googleBreaker) accept() error { var w float64 history := b.history() w = b.k - (b.k-minK)*float64(history.failingBuckets)/buckets weightedAccepts := mathx.AtLeast(w, minK) * float64(history.accepts) // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 // for better performance, no need to care about the negative ratio dropRatio := (float64(history.total-protection) - weightedAccepts) / float64(history.total+1) if dropRatio <= 0 { return nil } lastPass := b.lastPass.Load() if lastPass > 0 && timex.Since(lastPass) > forcePassDuration { b.lastPass.Set(timex.Now()) return nil } dropRatio *= float64(buckets-history.workingBuckets) / buckets if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } b.lastPass.Set(timex.Now()) return nil } func (b *googleBreaker) allow() (internalPromise, error) { if err := b.accept(); err != nil { b.markDrop() return nil, err } return googlePromise{ b: b, }, nil } func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error { if err := b.accept(); err != nil { b.markDrop() if fallback != nil { return fallback(err) } return err } var succ bool defer func() { // if req() panic, success is false, mark as failure if succ { b.markSuccess() } else { b.markFailure() } }() err := req() if acceptable(err) { succ = true } return err } func (b *googleBreaker) markDrop() { b.stat.Add(drop) } func (b *googleBreaker) markFailure() { b.stat.Add(fail) } func (b *googleBreaker) markSuccess() { b.stat.Add(success) } func (b *googleBreaker) history() windowResult { var result windowResult b.stat.Reduce(func(b *bucket) { result.accepts += b.Success result.total += b.Sum if b.Failure > 0 { result.workingBuckets = 0 } else if b.Success > 0 { result.workingBuckets++ } if b.Success > 0 { result.failingBuckets = 0 } else if b.Failure > 0 { result.failingBuckets++ } }) return result } type googlePromise struct { b *googleBreaker } func (p googlePromise) Accept() { p.b.markSuccess() } func (p googlePromise) Reject() { p.b.markFailure() }