diff --git a/core/breaker/bucket.go b/core/breaker/bucket.go new file mode 100644 index 00000000..2dec1ab9 --- /dev/null +++ b/core/breaker/bucket.go @@ -0,0 +1,48 @@ +package breaker + +const ( + success = iota + fail + drop +) + +// bucket defines the bucket that holds sum and num of additions. +type bucket struct { + Sum int64 + Success int64 + Failure int64 + Drop int64 +} + +func (b *bucket) Add(v int64) { + switch v { + case fail: + b.fail() + case drop: + b.drop() + default: + b.succeed() + } +} + +func (b *bucket) Reset() { + b.Sum = 0 + b.Success = 0 + b.Failure = 0 + b.Drop = 0 +} + +func (b *bucket) drop() { + b.Sum++ + b.Drop++ +} + +func (b *bucket) fail() { + b.Sum++ + b.Failure++ +} + +func (b *bucket) succeed() { + b.Sum++ + b.Success++ +} diff --git a/core/breaker/bucket_test.go b/core/breaker/bucket_test.go new file mode 100644 index 00000000..ec43331b --- /dev/null +++ b/core/breaker/bucket_test.go @@ -0,0 +1,43 @@ +package breaker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBucketAdd(t *testing.T) { + b := &bucket{} + + // Test succeed + b.Add(0) // Using 0 for success + assert.Equal(t, int64(1), b.Sum, "Sum should be incremented") + assert.Equal(t, int64(1), b.Success, "Success should be incremented") + assert.Equal(t, int64(0), b.Failure, "Failure should not be incremented") + assert.Equal(t, int64(0), b.Drop, "Drop should not be incremented") + + // Test failure + b.Add(fail) + assert.Equal(t, int64(2), b.Sum, "Sum should be incremented") + assert.Equal(t, int64(1), b.Failure, "Failure should be incremented") + assert.Equal(t, int64(0), b.Drop, "Drop should not be incremented") + + // Test drop + b.Add(drop) + assert.Equal(t, int64(3), b.Sum, "Sum should be incremented") + assert.Equal(t, int64(1), b.Drop, "Drop should be incremented") +} + +func TestBucketReset(t *testing.T) { + b := &bucket{ + Sum: 3, + Success: 1, + Failure: 1, + Drop: 1, + } + b.Reset() + assert.Equal(t, int64(0), b.Sum, "Sum should be reset to 0") + assert.Equal(t, int64(0), b.Success, "Success should be reset to 0") + assert.Equal(t, int64(0), b.Failure, "Failure should be reset to 0") + assert.Equal(t, int64(0), b.Drop, "Drop should be reset to 0") +} diff --git a/core/breaker/googlebreaker.go b/core/breaker/googlebreaker.go index 46272956..f4846426 100644 --- a/core/breaker/googlebreaker.go +++ b/core/breaker/googlebreaker.go @@ -9,23 +9,38 @@ import ( const ( // 250ms for bucket duration - window = time.Second * 10 - buckets = 40 - k = 1.5 - protection = 5 + window = time.Second * 10 + buckets = 40 + maxFailBucketsToDecreaseK = 30 + minBucketsToSpeedUp = 3 + k = 1.5 + minK = 1.1 + recoveryK = 3 - k + protection = 5 ) // googleBreaker is a netflixBreaker pattern from google. // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/ -type googleBreaker struct { - k float64 - stat *collection.RollingWindow - proba *mathx.Proba -} +type ( + googleBreaker struct { + k float64 + stat *collection.RollingWindow[int64, *bucket] + proba *mathx.Proba + } + + windowResult struct { + accepts int64 + total int64 + failingBuckets int64 + workingBuckets int64 + } +) func newGoogleBreaker() *googleBreaker { bucketDuration := time.Duration(int64(window) / int64(buckets)) - st := collection.NewRollingWindow(buckets, bucketDuration) + st := collection.NewRollingWindow[int64, *bucket](func() *bucket { + return new(bucket) + }, buckets, bucketDuration) return &googleBreaker{ stat: st, k: k, @@ -34,15 +49,28 @@ func newGoogleBreaker() *googleBreaker { } func (b *googleBreaker) accept() error { - accepts, total := b.history() - weightedAccepts := b.k * float64(accepts) + var w float64 + history := b.history() + if history.failingBuckets >= minBucketsToSpeedUp { + w = b.k - float64(history.failingBuckets-1)*(b.k-minK)/maxFailBucketsToDecreaseK + w = mathx.AtLeast(w, minK) + } else { + w = b.k + } + weightedAccepts := w * float64(history.accepts) // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 - // for better performance, no need to care about negative ratio - dropRatio := (float64(total-protection) - weightedAccepts) / float64(total+1) + // for better performance, no need to care about the negative ratio + dropRatio := (float64(history.total-protection) - weightedAccepts) / float64(history.total+1) if dropRatio <= 0 { return nil } + // If we have more than 2 working buckets, we are in recovery mode, + // the latest bucket is the current one, so we ignore it. + if history.workingBuckets >= minBucketsToSpeedUp { + dropRatio /= recoveryK + } + if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } @@ -52,7 +80,7 @@ func (b *googleBreaker) accept() error { func (b *googleBreaker) allow() (internalPromise, error) { if err := b.accept(); err != nil { - b.markFailure() + b.markDrop() return nil, err } @@ -63,7 +91,7 @@ func (b *googleBreaker) allow() (internalPromise, error) { func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error { if err := b.accept(); err != nil { - b.markFailure() + b.markDrop() if fallback != nil { return fallback(err) } @@ -71,10 +99,10 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac return err } - var success bool + var succ bool defer func() { // if req() panic, success is false, mark as failure - if success { + if succ { b.markSuccess() } else { b.markFailure() @@ -83,27 +111,43 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac err := req() if acceptable(err) { - success = true + succ = true } return err } -func (b *googleBreaker) markSuccess() { - b.stat.Add(1) +func (b *googleBreaker) markDrop() { + b.stat.Add(drop) } func (b *googleBreaker) markFailure() { - b.stat.Add(0) + b.stat.Add(fail) } -func (b *googleBreaker) history() (accepts, total int64) { - b.stat.Reduce(func(b *collection.Bucket) { - accepts += int64(b.Sum) - total += b.Count +func (b *googleBreaker) markSuccess() { + b.stat.Add(success) +} + +func (b *googleBreaker) history() windowResult { + var result windowResult + + b.stat.Reduce(func(b *bucket) { + result.accepts += b.Success + result.total += b.Sum + if b.Failure > 0 { + result.workingBuckets = 0 + } else if b.Success > 0 { + result.workingBuckets++ + } + if b.Drop > 0 && b.Failure > 0 { + result.failingBuckets++ + } else { + result.failingBuckets = 0 + } }) - return + return result } type googlePromise struct { diff --git a/core/breaker/googlebreaker_test.go b/core/breaker/googlebreaker_test.go index 85d76660..cbd7e6d9 100644 --- a/core/breaker/googlebreaker_test.go +++ b/core/breaker/googlebreaker_test.go @@ -22,7 +22,9 @@ func init() { } func getGoogleBreaker() *googleBreaker { - st := collection.NewRollingWindow(testBuckets, testInterval) + st := collection.NewRollingWindow[int64, *bucket](func() *bucket { + return new(bucket) + }, testBuckets, testInterval) return &googleBreaker{ stat: st, k: 5, @@ -63,6 +65,32 @@ func TestGoogleBreakerOpen(t *testing.T) { }) } +func TestGoogleBreakerRecover(t *testing.T) { + st := collection.NewRollingWindow[int64, *bucket](func() *bucket { + return new(bucket) + }, testBuckets*2, testInterval) + b := &googleBreaker{ + stat: st, + k: k, + proba: mathx.NewProba(), + } + for i := 0; i < testBuckets; i++ { + for j := 0; j < 100; j++ { + b.stat.Add(1) + } + time.Sleep(testInterval) + } + for i := 0; i < testBuckets; i++ { + for j := 0; j < 100; j++ { + b.stat.Add(0) + } + time.Sleep(testInterval) + } + verify(t, func() bool { + return b.accept() == nil + }) +} + func TestGoogleBreakerFallback(t *testing.T) { b := getGoogleBreaker() markSuccess(b, 1) @@ -164,41 +192,38 @@ func TestGoogleBreakerSelfProtection(t *testing.T) { } func TestGoogleBreakerHistory(t *testing.T) { - var b *googleBreaker - var accepts, total int64 - sleep := testInterval t.Run("accepts == total", func(t *testing.T) { - b = getGoogleBreaker() + b := getGoogleBreaker() markSuccessWithDuration(b, 10, sleep/2) - accepts, total = b.history() - assert.Equal(t, int64(10), accepts) - assert.Equal(t, int64(10), total) + result := b.history() + assert.Equal(t, int64(10), result.accepts) + assert.Equal(t, int64(10), result.total) }) t.Run("fail == total", func(t *testing.T) { - b = getGoogleBreaker() + b := getGoogleBreaker() markFailedWithDuration(b, 10, sleep/2) - accepts, total = b.history() - assert.Equal(t, int64(0), accepts) - assert.Equal(t, int64(10), total) + result := b.history() + assert.Equal(t, int64(0), result.accepts) + assert.Equal(t, int64(10), result.total) }) t.Run("accepts = 1/2 * total, fail = 1/2 * total", func(t *testing.T) { - b = getGoogleBreaker() + b := getGoogleBreaker() markFailedWithDuration(b, 5, sleep/2) markSuccessWithDuration(b, 5, sleep/2) - accepts, total = b.history() - assert.Equal(t, int64(5), accepts) - assert.Equal(t, int64(10), total) + result := b.history() + assert.Equal(t, int64(5), result.accepts) + assert.Equal(t, int64(10), result.total) }) t.Run("auto reset rolling counter", func(t *testing.T) { - b = getGoogleBreaker() + b := getGoogleBreaker() time.Sleep(testInterval * testBuckets) - accepts, total = b.history() - assert.Equal(t, int64(0), accepts) - assert.Equal(t, int64(0), total) + result := b.history() + assert.Equal(t, int64(0), result.accepts) + assert.Equal(t, int64(0), result.total) }) } diff --git a/core/collection/rollingwindow.go b/core/collection/rollingwindow.go index 81894be3..1e17784f 100644 --- a/core/collection/rollingwindow.go +++ b/core/collection/rollingwindow.go @@ -4,18 +4,28 @@ import ( "sync" "time" + "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/timex" ) type ( - // RollingWindowOption let callers customize the RollingWindow. - RollingWindowOption func(rollingWindow *RollingWindow) + // BucketInterface is the interface that defines the buckets. + BucketInterface[T Numerical] interface { + Add(v T) + Reset() + } - // RollingWindow defines a rolling window to calculate the events in buckets with time interval. - RollingWindow struct { + // Numerical is the interface that restricts the numerical type. + Numerical = mathx.Numerical + + // RollingWindowOption let callers customize the RollingWindow. + RollingWindowOption[T Numerical, B BucketInterface[T]] func(rollingWindow *RollingWindow[T, B]) + + // RollingWindow defines a rolling window to calculate the events in buckets with the time interval. + RollingWindow[T Numerical, B BucketInterface[T]] struct { lock sync.RWMutex size int - win *window + win *window[T, B] interval time.Duration offset int ignoreCurrent bool @@ -25,14 +35,15 @@ type ( // NewRollingWindow returns a RollingWindow that with size buckets and time interval, // use opts to customize the RollingWindow. -func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow { +func NewRollingWindow[T Numerical, B BucketInterface[T]](newBucket func() B, size int, + interval time.Duration, opts ...RollingWindowOption[T, B]) *RollingWindow[T, B] { if size < 1 { panic("size must be greater than 0") } - w := &RollingWindow{ + w := &RollingWindow[T, B]{ size: size, - win: newWindow(size), + win: newWindow[T, B](newBucket, size), interval: interval, lastTime: timex.Now(), } @@ -43,7 +54,7 @@ func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOpt } // Add adds value to current bucket. -func (rw *RollingWindow) Add(v float64) { +func (rw *RollingWindow[T, B]) Add(v T) { rw.lock.Lock() defer rw.lock.Unlock() rw.updateOffset() @@ -51,13 +62,13 @@ func (rw *RollingWindow) Add(v float64) { } // Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set. -func (rw *RollingWindow) Reduce(fn func(b *Bucket)) { +func (rw *RollingWindow[T, B]) Reduce(fn func(b B)) { rw.lock.RLock() defer rw.lock.RUnlock() var diff int span := rw.span() - // ignore current bucket, because of partial data + // ignore the current bucket, because of partial data if span == 0 && rw.ignoreCurrent { diff = rw.size - 1 } else { @@ -69,7 +80,7 @@ func (rw *RollingWindow) Reduce(fn func(b *Bucket)) { } } -func (rw *RollingWindow) span() int { +func (rw *RollingWindow[T, B]) span() int { offset := int(timex.Since(rw.lastTime) / rw.interval) if 0 <= offset && offset < rw.size { return offset @@ -78,7 +89,7 @@ func (rw *RollingWindow) span() int { return rw.size } -func (rw *RollingWindow) updateOffset() { +func (rw *RollingWindow[T, B]) updateOffset() { span := rw.span() if span <= 0 { return @@ -97,54 +108,54 @@ func (rw *RollingWindow) updateOffset() { } // Bucket defines the bucket that holds sum and num of additions. -type Bucket struct { - Sum float64 +type Bucket[T Numerical] struct { + Sum T Count int64 } -func (b *Bucket) add(v float64) { +func (b *Bucket[T]) Add(v T) { b.Sum += v b.Count++ } -func (b *Bucket) reset() { +func (b *Bucket[T]) Reset() { b.Sum = 0 b.Count = 0 } -type window struct { - buckets []*Bucket +type window[T Numerical, B BucketInterface[T]] struct { + buckets []B size int } -func newWindow(size int) *window { - buckets := make([]*Bucket, size) +func newWindow[T Numerical, B BucketInterface[T]](newBucket func() B, size int) *window[T, B] { + buckets := make([]B, size) for i := 0; i < size; i++ { - buckets[i] = new(Bucket) + buckets[i] = newBucket() } - return &window{ + return &window[T, B]{ buckets: buckets, size: size, } } -func (w *window) add(offset int, v float64) { - w.buckets[offset%w.size].add(v) +func (w *window[T, B]) add(offset int, v T) { + w.buckets[offset%w.size].Add(v) } -func (w *window) reduce(start, count int, fn func(b *Bucket)) { +func (w *window[T, B]) reduce(start, count int, fn func(b B)) { for i := 0; i < count; i++ { fn(w.buckets[(start+i)%w.size]) } } -func (w *window) resetBucket(offset int) { - w.buckets[offset%w.size].reset() +func (w *window[T, B]) resetBucket(offset int) { + w.buckets[offset%w.size].Reset() } // IgnoreCurrentBucket lets the Reduce call ignore current bucket. -func IgnoreCurrentBucket() RollingWindowOption { - return func(w *RollingWindow) { +func IgnoreCurrentBucket[T Numerical, B BucketInterface[T]]() RollingWindowOption[T, B] { + return func(w *RollingWindow[T, B]) { w.ignoreCurrent = true } } diff --git a/core/collection/rollingwindow_test.go b/core/collection/rollingwindow_test.go index 201dce0e..05bd956a 100644 --- a/core/collection/rollingwindow_test.go +++ b/core/collection/rollingwindow_test.go @@ -12,18 +12,24 @@ import ( const duration = time.Millisecond * 50 func TestNewRollingWindow(t *testing.T) { - assert.NotNil(t, NewRollingWindow(10, time.Second)) + assert.NotNil(t, NewRollingWindow[int64, *Bucket[int64]](func() *Bucket[int64] { + return new(Bucket[int64]) + }, 10, time.Second)) assert.Panics(t, func() { - NewRollingWindow(0, time.Second) + NewRollingWindow[int64, *Bucket[int64]](func() *Bucket[int64] { + return new(Bucket[int64]) + }, 0, time.Second) }) } func TestRollingWindowAdd(t *testing.T) { const size = 3 - r := NewRollingWindow(size, duration) + r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] { + return new(Bucket[float64]) + }, size, duration) listBuckets := func() []float64 { var buckets []float64 - r.Reduce(func(b *Bucket) { + r.Reduce(func(b *Bucket[float64]) { buckets = append(buckets, b.Sum) }) return buckets @@ -47,10 +53,12 @@ func TestRollingWindowAdd(t *testing.T) { func TestRollingWindowReset(t *testing.T) { const size = 3 - r := NewRollingWindow(size, duration, IgnoreCurrentBucket()) + r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] { + return new(Bucket[float64]) + }, size, duration, IgnoreCurrentBucket[float64, *Bucket[float64]]()) listBuckets := func() []float64 { var buckets []float64 - r.Reduce(func(b *Bucket) { + r.Reduce(func(b *Bucket[float64]) { buckets = append(buckets, b.Sum) }) return buckets @@ -72,15 +80,19 @@ func TestRollingWindowReset(t *testing.T) { func TestRollingWindowReduce(t *testing.T) { const size = 4 tests := []struct { - win *RollingWindow + win *RollingWindow[float64, *Bucket[float64]] expect float64 }{ { - win: NewRollingWindow(size, duration), + win: NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] { + return new(Bucket[float64]) + }, size, duration), expect: 10, }, { - win: NewRollingWindow(size, duration, IgnoreCurrentBucket()), + win: NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] { + return new(Bucket[float64]) + }, size, duration, IgnoreCurrentBucket[float64, *Bucket[float64]]()), expect: 4, }, } @@ -97,7 +109,7 @@ func TestRollingWindowReduce(t *testing.T) { } } var result float64 - r.Reduce(func(b *Bucket) { + r.Reduce(func(b *Bucket[float64]) { result += b.Sum }) assert.Equal(t, test.expect, result) @@ -108,10 +120,12 @@ func TestRollingWindowReduce(t *testing.T) { func TestRollingWindowBucketTimeBoundary(t *testing.T) { const size = 3 interval := time.Millisecond * 30 - r := NewRollingWindow(size, interval) + r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] { + return new(Bucket[float64]) + }, size, interval) listBuckets := func() []float64 { var buckets []float64 - r.Reduce(func(b *Bucket) { + r.Reduce(func(b *Bucket[float64]) { buckets = append(buckets, b.Sum) }) return buckets @@ -138,7 +152,9 @@ func TestRollingWindowBucketTimeBoundary(t *testing.T) { func TestRollingWindowDataRace(t *testing.T) { const size = 3 - r := NewRollingWindow(size, duration) + r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] { + return new(Bucket[float64]) + }, size, duration) stop := make(chan bool) go func() { for { @@ -157,7 +173,7 @@ func TestRollingWindowDataRace(t *testing.T) { case <-stop: return default: - r.Reduce(func(b *Bucket) {}) + r.Reduce(func(b *Bucket[float64]) {}) } } }() diff --git a/core/load/adaptiveshedder.go b/core/load/adaptiveshedder.go index 2afc43a2..a1f9c21d 100644 --- a/core/load/adaptiveshedder.go +++ b/core/load/adaptiveshedder.go @@ -76,8 +76,8 @@ type ( avgFlyingLock syncx.SpinLock overloadTime *syncx.AtomicDuration droppedRecently *syncx.AtomicBool - passCounter *collection.RollingWindow - rtCounter *collection.RollingWindow + passCounter *collection.RollingWindow[int64, *collection.Bucket[int64]] + rtCounter *collection.RollingWindow[int64, *collection.Bucket[int64]] } ) @@ -107,15 +107,16 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder { opt(&options) } bucketDuration := options.window / time.Duration(options.buckets) + newBucket := func() *collection.Bucket[int64] { + return new(collection.Bucket[int64]) + } return &adaptiveShedder{ cpuThreshold: options.cpuThreshold, windowScale: float64(time.Second) / float64(bucketDuration) / millisecondsPerSecond, overloadTime: syncx.NewAtomicDuration(), droppedRecently: syncx.NewAtomicBool(), - passCounter: collection.NewRollingWindow(options.buckets, bucketDuration, - collection.IgnoreCurrentBucket()), - rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration, - collection.IgnoreCurrentBucket()), + passCounter: collection.NewRollingWindow[int64, *collection.Bucket[int64]](newBucket, options.buckets, bucketDuration, collection.IgnoreCurrentBucket[int64, *collection.Bucket[int64]]()), + rtCounter: collection.NewRollingWindow[int64, *collection.Bucket[int64]](newBucket, options.buckets, bucketDuration, collection.IgnoreCurrentBucket[int64, *collection.Bucket[int64]]()), } } @@ -167,15 +168,15 @@ func (as *adaptiveShedder) maxFlight() float64 { } func (as *adaptiveShedder) maxPass() int64 { - var result float64 = 1 + var result int64 = 1 - as.passCounter.Reduce(func(b *collection.Bucket) { + as.passCounter.Reduce(func(b *collection.Bucket[int64]) { if b.Sum > result { result = b.Sum } }) - return int64(result) + return result } func (as *adaptiveShedder) minRt() float64 { @@ -183,12 +184,12 @@ func (as *adaptiveShedder) minRt() float64 { // its a reasonable large value to avoid dropping requests. result := defaultMinRt - as.rtCounter.Reduce(func(b *collection.Bucket) { + as.rtCounter.Reduce(func(b *collection.Bucket[int64]) { if b.Count <= 0 { return } - avg := math.Round(b.Sum / float64(b.Count)) + avg := math.Round(float64(b.Sum) / float64(b.Count)) if avg < result { result = avg } @@ -283,6 +284,6 @@ func (p *promise) Fail() { func (p *promise) Pass() { rt := float64(timex.Since(p.start)) / float64(time.Millisecond) p.shedder.addFlying(-1) - p.shedder.rtCounter.Add(math.Ceil(rt)) + p.shedder.rtCounter.Add(int64(math.Ceil(rt))) p.shedder.passCounter.Add(1) } diff --git a/core/load/adaptiveshedder_test.go b/core/load/adaptiveshedder_test.go index 104be20e..c30460c3 100644 --- a/core/load/adaptiveshedder_test.go +++ b/core/load/adaptiveshedder_test.go @@ -58,7 +58,7 @@ func TestAdaptiveShedder(t *testing.T) { func TestAdaptiveShedderMaxPass(t *testing.T) { passCounter := newRollingWindow() for i := 1; i <= 10; i++ { - passCounter.Add(float64(i * 100)) + passCounter.Add(int64(i * 100)) time.Sleep(bucketDuration) } shedder := &adaptiveShedder{ @@ -83,7 +83,7 @@ func TestAdaptiveShedderMinRt(t *testing.T) { time.Sleep(bucketDuration) } for j := i*10 + 1; j <= i*10+10; j++ { - rtCounter.Add(float64(j)) + rtCounter.Add(int64(j)) } } shedder := &adaptiveShedder{ @@ -107,9 +107,9 @@ func TestAdaptiveShedderMaxFlight(t *testing.T) { if i > 0 { time.Sleep(bucketDuration) } - passCounter.Add(float64((i + 1) * 100)) + passCounter.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { - rtCounter.Add(float64(j)) + rtCounter.Add(int64(j)) } } shedder := &adaptiveShedder{ @@ -129,9 +129,9 @@ func TestAdaptiveShedderShouldDrop(t *testing.T) { if i > 0 { time.Sleep(bucketDuration) } - passCounter.Add(float64((i + 1) * 100)) + passCounter.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { - rtCounter.Add(float64(j)) + rtCounter.Add(int64(j)) } } shedder := &adaptiveShedder{ @@ -184,9 +184,9 @@ func TestAdaptiveShedderStillHot(t *testing.T) { if i > 0 { time.Sleep(bucketDuration) } - passCounter.Add(float64((i + 1) * 100)) + passCounter.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { - rtCounter.Add(float64(j)) + rtCounter.Add(int64(j)) } } shedder := &adaptiveShedder{ @@ -248,9 +248,9 @@ func BenchmarkMaxFlight(b *testing.B) { if i > 0 { time.Sleep(bucketDuration) } - passCounter.Add(float64((i + 1) * 100)) + passCounter.Add(int64((i + 1) * 100)) for j := i*10 + 1; j <= i*10+10; j++ { - rtCounter.Add(float64(j)) + rtCounter.Add(int64(j)) } } shedder := &adaptiveShedder{ @@ -265,6 +265,8 @@ func BenchmarkMaxFlight(b *testing.B) { } } -func newRollingWindow() *collection.RollingWindow { - return collection.NewRollingWindow(buckets, bucketDuration, collection.IgnoreCurrentBucket()) +func newRollingWindow() *collection.RollingWindow[int64, *collection.Bucket[int64]] { + return collection.NewRollingWindow[int64, *collection.Bucket[int64]](func() *collection.Bucket[int64] { + return new(collection.Bucket[int64]) + }, buckets, bucketDuration, collection.IgnoreCurrentBucket[int64, *collection.Bucket[int64]]()) } diff --git a/core/mathx/range.go b/core/mathx/range.go index d74ac4dd..17e63fd5 100644 --- a/core/mathx/range.go +++ b/core/mathx/range.go @@ -1,13 +1,13 @@ package mathx -type numerical interface { +type Numerical interface { ~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~float32 | ~float64 } // AtLeast returns the greater of x or lower. -func AtLeast[T numerical](x, lower T) T { +func AtLeast[T Numerical](x, lower T) T { if x < lower { return lower } @@ -15,7 +15,7 @@ func AtLeast[T numerical](x, lower T) T { } // AtMost returns the smaller of x or upper. -func AtMost[T numerical](x, upper T) T { +func AtMost[T Numerical](x, upper T) T { if x > upper { return upper } @@ -23,7 +23,7 @@ func AtMost[T numerical](x, upper T) T { } // Between returns the value of x clamped to the range [lower, upper]. -func Between[T numerical](x, lower, upper T) T { +func Between[T Numerical](x, lower, upper T) T { if x < lower { return lower } diff --git a/core/stores/sqlx/stmt_test.go b/core/stores/sqlx/stmt_test.go index a2b0b1bb..8d443379 100644 --- a/core/stores/sqlx/stmt_test.go +++ b/core/stores/sqlx/stmt_test.go @@ -302,10 +302,10 @@ func TestQueryRowsScanTimeout(t *testing.T) { Foo string } conn := NewSqlConnFromDB(db) - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*2) err := conn.QueryRowsCtx(ctx, &val, "any") assert.ErrorIs(t, err, context.DeadlineExceeded) + cancel() }) }