package breaker import ( "time" "github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/mathx" ) const ( // 250ms for bucket duration window = time.Second * 10 buckets = 40 maxFailBucketsToDecreaseK = 30 minBucketsToSpeedUp = 3 k = 1.5 minK = 1.1 recoveryK = 3 - k protection = 5 ) // googleBreaker is a netflixBreaker pattern from google. // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/ type ( googleBreaker struct { k float64 stat *collection.RollingWindow[int64, *bucket] proba *mathx.Proba } windowResult struct { accepts int64 total int64 failingBuckets int64 workingBuckets int64 } ) func newGoogleBreaker() *googleBreaker { bucketDuration := time.Duration(int64(window) / int64(buckets)) st := collection.NewRollingWindow[int64, *bucket](func() *bucket { return new(bucket) }, buckets, bucketDuration) return &googleBreaker{ stat: st, k: k, proba: mathx.NewProba(), } } func (b *googleBreaker) accept() error { var w float64 history := b.history() if history.failingBuckets >= minBucketsToSpeedUp { w = b.k - float64(history.failingBuckets-1)*(b.k-minK)/maxFailBucketsToDecreaseK w = mathx.AtLeast(w, minK) } else { w = b.k } weightedAccepts := w * float64(history.accepts) // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 // for better performance, no need to care about the negative ratio dropRatio := (float64(history.total-protection) - weightedAccepts) / float64(history.total+1) if dropRatio <= 0 { return nil } // If we have more than 2 working buckets, we are in recovery mode, // the latest bucket is the current one, so we ignore it. if history.workingBuckets >= minBucketsToSpeedUp { dropRatio /= recoveryK } if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } return nil } func (b *googleBreaker) allow() (internalPromise, error) { if err := b.accept(); err != nil { b.markDrop() return nil, err } return googlePromise{ b: b, }, nil } func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error { if err := b.accept(); err != nil { b.markDrop() if fallback != nil { return fallback(err) } return err } var succ bool defer func() { // if req() panic, success is false, mark as failure if succ { b.markSuccess() } else { b.markFailure() } }() err := req() if acceptable(err) { succ = true } return err } func (b *googleBreaker) markDrop() { b.stat.Add(drop) } func (b *googleBreaker) markFailure() { b.stat.Add(fail) } func (b *googleBreaker) markSuccess() { b.stat.Add(success) } func (b *googleBreaker) history() windowResult { var result windowResult b.stat.Reduce(func(b *bucket) { result.accepts += b.Success result.total += b.Sum if b.Failure > 0 { result.workingBuckets = 0 } else if b.Success > 0 { result.workingBuckets++ } if b.Drop > 0 && b.Failure > 0 { result.failingBuckets++ } else { result.failingBuckets = 0 } }) return result } type googlePromise struct { b *googleBreaker } func (p googlePromise) Accept() { p.b.markSuccess() } func (p googlePromise) Reject() { p.b.markFailure() }