optimize: improve breaker algorithm on recovery time (#4077)

This commit is contained in:
Kevin Wan 2024-04-18 22:33:25 +08:00 committed by GitHub
parent 95b32b5779
commit 1540bdc4c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 311 additions and 121 deletions

48
core/breaker/bucket.go Normal file
View File

@ -0,0 +1,48 @@
package breaker
const (
success = iota
fail
drop
)
// bucket defines the bucket that holds sum and num of additions.
type bucket struct {
Sum int64
Success int64
Failure int64
Drop int64
}
func (b *bucket) Add(v int64) {
switch v {
case fail:
b.fail()
case drop:
b.drop()
default:
b.succeed()
}
}
func (b *bucket) Reset() {
b.Sum = 0
b.Success = 0
b.Failure = 0
b.Drop = 0
}
func (b *bucket) drop() {
b.Sum++
b.Drop++
}
func (b *bucket) fail() {
b.Sum++
b.Failure++
}
func (b *bucket) succeed() {
b.Sum++
b.Success++
}

View File

@ -0,0 +1,43 @@
package breaker
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestBucketAdd(t *testing.T) {
b := &bucket{}
// Test succeed
b.Add(0) // Using 0 for success
assert.Equal(t, int64(1), b.Sum, "Sum should be incremented")
assert.Equal(t, int64(1), b.Success, "Success should be incremented")
assert.Equal(t, int64(0), b.Failure, "Failure should not be incremented")
assert.Equal(t, int64(0), b.Drop, "Drop should not be incremented")
// Test failure
b.Add(fail)
assert.Equal(t, int64(2), b.Sum, "Sum should be incremented")
assert.Equal(t, int64(1), b.Failure, "Failure should be incremented")
assert.Equal(t, int64(0), b.Drop, "Drop should not be incremented")
// Test drop
b.Add(drop)
assert.Equal(t, int64(3), b.Sum, "Sum should be incremented")
assert.Equal(t, int64(1), b.Drop, "Drop should be incremented")
}
func TestBucketReset(t *testing.T) {
b := &bucket{
Sum: 3,
Success: 1,
Failure: 1,
Drop: 1,
}
b.Reset()
assert.Equal(t, int64(0), b.Sum, "Sum should be reset to 0")
assert.Equal(t, int64(0), b.Success, "Success should be reset to 0")
assert.Equal(t, int64(0), b.Failure, "Failure should be reset to 0")
assert.Equal(t, int64(0), b.Drop, "Drop should be reset to 0")
}

View File

@ -11,21 +11,36 @@ const (
// 250ms for bucket duration // 250ms for bucket duration
window = time.Second * 10 window = time.Second * 10
buckets = 40 buckets = 40
maxFailBucketsToDecreaseK = 30
minBucketsToSpeedUp = 3
k = 1.5 k = 1.5
minK = 1.1
recoveryK = 3 - k
protection = 5 protection = 5
) )
// googleBreaker is a netflixBreaker pattern from google. // googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/ // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct { type (
googleBreaker struct {
k float64 k float64
stat *collection.RollingWindow stat *collection.RollingWindow[int64, *bucket]
proba *mathx.Proba proba *mathx.Proba
} }
windowResult struct {
accepts int64
total int64
failingBuckets int64
workingBuckets int64
}
)
func newGoogleBreaker() *googleBreaker { func newGoogleBreaker() *googleBreaker {
bucketDuration := time.Duration(int64(window) / int64(buckets)) bucketDuration := time.Duration(int64(window) / int64(buckets))
st := collection.NewRollingWindow(buckets, bucketDuration) st := collection.NewRollingWindow[int64, *bucket](func() *bucket {
return new(bucket)
}, buckets, bucketDuration)
return &googleBreaker{ return &googleBreaker{
stat: st, stat: st,
k: k, k: k,
@ -34,15 +49,28 @@ func newGoogleBreaker() *googleBreaker {
} }
func (b *googleBreaker) accept() error { func (b *googleBreaker) accept() error {
accepts, total := b.history() var w float64
weightedAccepts := b.k * float64(accepts) history := b.history()
if history.failingBuckets >= minBucketsToSpeedUp {
w = b.k - float64(history.failingBuckets-1)*(b.k-minK)/maxFailBucketsToDecreaseK
w = mathx.AtLeast(w, minK)
} else {
w = b.k
}
weightedAccepts := w * float64(history.accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
// for better performance, no need to care about negative ratio // for better performance, no need to care about the negative ratio
dropRatio := (float64(total-protection) - weightedAccepts) / float64(total+1) dropRatio := (float64(history.total-protection) - weightedAccepts) / float64(history.total+1)
if dropRatio <= 0 { if dropRatio <= 0 {
return nil return nil
} }
// If we have more than 2 working buckets, we are in recovery mode,
// the latest bucket is the current one, so we ignore it.
if history.workingBuckets >= minBucketsToSpeedUp {
dropRatio /= recoveryK
}
if b.proba.TrueOnProba(dropRatio) { if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable return ErrServiceUnavailable
} }
@ -52,7 +80,7 @@ func (b *googleBreaker) accept() error {
func (b *googleBreaker) allow() (internalPromise, error) { func (b *googleBreaker) allow() (internalPromise, error) {
if err := b.accept(); err != nil { if err := b.accept(); err != nil {
b.markFailure() b.markDrop()
return nil, err return nil, err
} }
@ -63,7 +91,7 @@ func (b *googleBreaker) allow() (internalPromise, error) {
func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error { func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Acceptable) error {
if err := b.accept(); err != nil { if err := b.accept(); err != nil {
b.markFailure() b.markDrop()
if fallback != nil { if fallback != nil {
return fallback(err) return fallback(err)
} }
@ -71,10 +99,10 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac
return err return err
} }
var success bool var succ bool
defer func() { defer func() {
// if req() panic, success is false, mark as failure // if req() panic, success is false, mark as failure
if success { if succ {
b.markSuccess() b.markSuccess()
} else { } else {
b.markFailure() b.markFailure()
@ -83,27 +111,43 @@ func (b *googleBreaker) doReq(req func() error, fallback Fallback, acceptable Ac
err := req() err := req()
if acceptable(err) { if acceptable(err) {
success = true succ = true
} }
return err return err
} }
func (b *googleBreaker) markSuccess() { func (b *googleBreaker) markDrop() {
b.stat.Add(1) b.stat.Add(drop)
} }
func (b *googleBreaker) markFailure() { func (b *googleBreaker) markFailure() {
b.stat.Add(0) b.stat.Add(fail)
} }
func (b *googleBreaker) history() (accepts, total int64) { func (b *googleBreaker) markSuccess() {
b.stat.Reduce(func(b *collection.Bucket) { b.stat.Add(success)
accepts += int64(b.Sum) }
total += b.Count
func (b *googleBreaker) history() windowResult {
var result windowResult
b.stat.Reduce(func(b *bucket) {
result.accepts += b.Success
result.total += b.Sum
if b.Failure > 0 {
result.workingBuckets = 0
} else if b.Success > 0 {
result.workingBuckets++
}
if b.Drop > 0 && b.Failure > 0 {
result.failingBuckets++
} else {
result.failingBuckets = 0
}
}) })
return return result
} }
type googlePromise struct { type googlePromise struct {

View File

@ -22,7 +22,9 @@ func init() {
} }
func getGoogleBreaker() *googleBreaker { func getGoogleBreaker() *googleBreaker {
st := collection.NewRollingWindow(testBuckets, testInterval) st := collection.NewRollingWindow[int64, *bucket](func() *bucket {
return new(bucket)
}, testBuckets, testInterval)
return &googleBreaker{ return &googleBreaker{
stat: st, stat: st,
k: 5, k: 5,
@ -63,6 +65,32 @@ func TestGoogleBreakerOpen(t *testing.T) {
}) })
} }
func TestGoogleBreakerRecover(t *testing.T) {
st := collection.NewRollingWindow[int64, *bucket](func() *bucket {
return new(bucket)
}, testBuckets*2, testInterval)
b := &googleBreaker{
stat: st,
k: k,
proba: mathx.NewProba(),
}
for i := 0; i < testBuckets; i++ {
for j := 0; j < 100; j++ {
b.stat.Add(1)
}
time.Sleep(testInterval)
}
for i := 0; i < testBuckets; i++ {
for j := 0; j < 100; j++ {
b.stat.Add(0)
}
time.Sleep(testInterval)
}
verify(t, func() bool {
return b.accept() == nil
})
}
func TestGoogleBreakerFallback(t *testing.T) { func TestGoogleBreakerFallback(t *testing.T) {
b := getGoogleBreaker() b := getGoogleBreaker()
markSuccess(b, 1) markSuccess(b, 1)
@ -164,41 +192,38 @@ func TestGoogleBreakerSelfProtection(t *testing.T) {
} }
func TestGoogleBreakerHistory(t *testing.T) { func TestGoogleBreakerHistory(t *testing.T) {
var b *googleBreaker
var accepts, total int64
sleep := testInterval sleep := testInterval
t.Run("accepts == total", func(t *testing.T) { t.Run("accepts == total", func(t *testing.T) {
b = getGoogleBreaker() b := getGoogleBreaker()
markSuccessWithDuration(b, 10, sleep/2) markSuccessWithDuration(b, 10, sleep/2)
accepts, total = b.history() result := b.history()
assert.Equal(t, int64(10), accepts) assert.Equal(t, int64(10), result.accepts)
assert.Equal(t, int64(10), total) assert.Equal(t, int64(10), result.total)
}) })
t.Run("fail == total", func(t *testing.T) { t.Run("fail == total", func(t *testing.T) {
b = getGoogleBreaker() b := getGoogleBreaker()
markFailedWithDuration(b, 10, sleep/2) markFailedWithDuration(b, 10, sleep/2)
accepts, total = b.history() result := b.history()
assert.Equal(t, int64(0), accepts) assert.Equal(t, int64(0), result.accepts)
assert.Equal(t, int64(10), total) assert.Equal(t, int64(10), result.total)
}) })
t.Run("accepts = 1/2 * total, fail = 1/2 * total", func(t *testing.T) { t.Run("accepts = 1/2 * total, fail = 1/2 * total", func(t *testing.T) {
b = getGoogleBreaker() b := getGoogleBreaker()
markFailedWithDuration(b, 5, sleep/2) markFailedWithDuration(b, 5, sleep/2)
markSuccessWithDuration(b, 5, sleep/2) markSuccessWithDuration(b, 5, sleep/2)
accepts, total = b.history() result := b.history()
assert.Equal(t, int64(5), accepts) assert.Equal(t, int64(5), result.accepts)
assert.Equal(t, int64(10), total) assert.Equal(t, int64(10), result.total)
}) })
t.Run("auto reset rolling counter", func(t *testing.T) { t.Run("auto reset rolling counter", func(t *testing.T) {
b = getGoogleBreaker() b := getGoogleBreaker()
time.Sleep(testInterval * testBuckets) time.Sleep(testInterval * testBuckets)
accepts, total = b.history() result := b.history()
assert.Equal(t, int64(0), accepts) assert.Equal(t, int64(0), result.accepts)
assert.Equal(t, int64(0), total) assert.Equal(t, int64(0), result.total)
}) })
} }

View File

@ -4,18 +4,28 @@ import (
"sync" "sync"
"time" "time"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/timex" "github.com/zeromicro/go-zero/core/timex"
) )
type ( type (
// RollingWindowOption let callers customize the RollingWindow. // BucketInterface is the interface that defines the buckets.
RollingWindowOption func(rollingWindow *RollingWindow) BucketInterface[T Numerical] interface {
Add(v T)
Reset()
}
// RollingWindow defines a rolling window to calculate the events in buckets with time interval. // Numerical is the interface that restricts the numerical type.
RollingWindow struct { Numerical = mathx.Numerical
// RollingWindowOption let callers customize the RollingWindow.
RollingWindowOption[T Numerical, B BucketInterface[T]] func(rollingWindow *RollingWindow[T, B])
// RollingWindow defines a rolling window to calculate the events in buckets with the time interval.
RollingWindow[T Numerical, B BucketInterface[T]] struct {
lock sync.RWMutex lock sync.RWMutex
size int size int
win *window win *window[T, B]
interval time.Duration interval time.Duration
offset int offset int
ignoreCurrent bool ignoreCurrent bool
@ -25,14 +35,15 @@ type (
// NewRollingWindow returns a RollingWindow that with size buckets and time interval, // NewRollingWindow returns a RollingWindow that with size buckets and time interval,
// use opts to customize the RollingWindow. // use opts to customize the RollingWindow.
func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow { func NewRollingWindow[T Numerical, B BucketInterface[T]](newBucket func() B, size int,
interval time.Duration, opts ...RollingWindowOption[T, B]) *RollingWindow[T, B] {
if size < 1 { if size < 1 {
panic("size must be greater than 0") panic("size must be greater than 0")
} }
w := &RollingWindow{ w := &RollingWindow[T, B]{
size: size, size: size,
win: newWindow(size), win: newWindow[T, B](newBucket, size),
interval: interval, interval: interval,
lastTime: timex.Now(), lastTime: timex.Now(),
} }
@ -43,7 +54,7 @@ func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOpt
} }
// Add adds value to current bucket. // Add adds value to current bucket.
func (rw *RollingWindow) Add(v float64) { func (rw *RollingWindow[T, B]) Add(v T) {
rw.lock.Lock() rw.lock.Lock()
defer rw.lock.Unlock() defer rw.lock.Unlock()
rw.updateOffset() rw.updateOffset()
@ -51,13 +62,13 @@ func (rw *RollingWindow) Add(v float64) {
} }
// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set. // Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) { func (rw *RollingWindow[T, B]) Reduce(fn func(b B)) {
rw.lock.RLock() rw.lock.RLock()
defer rw.lock.RUnlock() defer rw.lock.RUnlock()
var diff int var diff int
span := rw.span() span := rw.span()
// ignore current bucket, because of partial data // ignore the current bucket, because of partial data
if span == 0 && rw.ignoreCurrent { if span == 0 && rw.ignoreCurrent {
diff = rw.size - 1 diff = rw.size - 1
} else { } else {
@ -69,7 +80,7 @@ func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
} }
} }
func (rw *RollingWindow) span() int { func (rw *RollingWindow[T, B]) span() int {
offset := int(timex.Since(rw.lastTime) / rw.interval) offset := int(timex.Since(rw.lastTime) / rw.interval)
if 0 <= offset && offset < rw.size { if 0 <= offset && offset < rw.size {
return offset return offset
@ -78,7 +89,7 @@ func (rw *RollingWindow) span() int {
return rw.size return rw.size
} }
func (rw *RollingWindow) updateOffset() { func (rw *RollingWindow[T, B]) updateOffset() {
span := rw.span() span := rw.span()
if span <= 0 { if span <= 0 {
return return
@ -97,54 +108,54 @@ func (rw *RollingWindow) updateOffset() {
} }
// Bucket defines the bucket that holds sum and num of additions. // Bucket defines the bucket that holds sum and num of additions.
type Bucket struct { type Bucket[T Numerical] struct {
Sum float64 Sum T
Count int64 Count int64
} }
func (b *Bucket) add(v float64) { func (b *Bucket[T]) Add(v T) {
b.Sum += v b.Sum += v
b.Count++ b.Count++
} }
func (b *Bucket) reset() { func (b *Bucket[T]) Reset() {
b.Sum = 0 b.Sum = 0
b.Count = 0 b.Count = 0
} }
type window struct { type window[T Numerical, B BucketInterface[T]] struct {
buckets []*Bucket buckets []B
size int size int
} }
func newWindow(size int) *window { func newWindow[T Numerical, B BucketInterface[T]](newBucket func() B, size int) *window[T, B] {
buckets := make([]*Bucket, size) buckets := make([]B, size)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
buckets[i] = new(Bucket) buckets[i] = newBucket()
} }
return &window{ return &window[T, B]{
buckets: buckets, buckets: buckets,
size: size, size: size,
} }
} }
func (w *window) add(offset int, v float64) { func (w *window[T, B]) add(offset int, v T) {
w.buckets[offset%w.size].add(v) w.buckets[offset%w.size].Add(v)
} }
func (w *window) reduce(start, count int, fn func(b *Bucket)) { func (w *window[T, B]) reduce(start, count int, fn func(b B)) {
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
fn(w.buckets[(start+i)%w.size]) fn(w.buckets[(start+i)%w.size])
} }
} }
func (w *window) resetBucket(offset int) { func (w *window[T, B]) resetBucket(offset int) {
w.buckets[offset%w.size].reset() w.buckets[offset%w.size].Reset()
} }
// IgnoreCurrentBucket lets the Reduce call ignore current bucket. // IgnoreCurrentBucket lets the Reduce call ignore current bucket.
func IgnoreCurrentBucket() RollingWindowOption { func IgnoreCurrentBucket[T Numerical, B BucketInterface[T]]() RollingWindowOption[T, B] {
return func(w *RollingWindow) { return func(w *RollingWindow[T, B]) {
w.ignoreCurrent = true w.ignoreCurrent = true
} }
} }

View File

@ -12,18 +12,24 @@ import (
const duration = time.Millisecond * 50 const duration = time.Millisecond * 50
func TestNewRollingWindow(t *testing.T) { func TestNewRollingWindow(t *testing.T) {
assert.NotNil(t, NewRollingWindow(10, time.Second)) assert.NotNil(t, NewRollingWindow[int64, *Bucket[int64]](func() *Bucket[int64] {
return new(Bucket[int64])
}, 10, time.Second))
assert.Panics(t, func() { assert.Panics(t, func() {
NewRollingWindow(0, time.Second) NewRollingWindow[int64, *Bucket[int64]](func() *Bucket[int64] {
return new(Bucket[int64])
}, 0, time.Second)
}) })
} }
func TestRollingWindowAdd(t *testing.T) { func TestRollingWindowAdd(t *testing.T) {
const size = 3 const size = 3
r := NewRollingWindow(size, duration) r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] {
return new(Bucket[float64])
}, size, duration)
listBuckets := func() []float64 { listBuckets := func() []float64 {
var buckets []float64 var buckets []float64
r.Reduce(func(b *Bucket) { r.Reduce(func(b *Bucket[float64]) {
buckets = append(buckets, b.Sum) buckets = append(buckets, b.Sum)
}) })
return buckets return buckets
@ -47,10 +53,12 @@ func TestRollingWindowAdd(t *testing.T) {
func TestRollingWindowReset(t *testing.T) { func TestRollingWindowReset(t *testing.T) {
const size = 3 const size = 3
r := NewRollingWindow(size, duration, IgnoreCurrentBucket()) r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] {
return new(Bucket[float64])
}, size, duration, IgnoreCurrentBucket[float64, *Bucket[float64]]())
listBuckets := func() []float64 { listBuckets := func() []float64 {
var buckets []float64 var buckets []float64
r.Reduce(func(b *Bucket) { r.Reduce(func(b *Bucket[float64]) {
buckets = append(buckets, b.Sum) buckets = append(buckets, b.Sum)
}) })
return buckets return buckets
@ -72,15 +80,19 @@ func TestRollingWindowReset(t *testing.T) {
func TestRollingWindowReduce(t *testing.T) { func TestRollingWindowReduce(t *testing.T) {
const size = 4 const size = 4
tests := []struct { tests := []struct {
win *RollingWindow win *RollingWindow[float64, *Bucket[float64]]
expect float64 expect float64
}{ }{
{ {
win: NewRollingWindow(size, duration), win: NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] {
return new(Bucket[float64])
}, size, duration),
expect: 10, expect: 10,
}, },
{ {
win: NewRollingWindow(size, duration, IgnoreCurrentBucket()), win: NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] {
return new(Bucket[float64])
}, size, duration, IgnoreCurrentBucket[float64, *Bucket[float64]]()),
expect: 4, expect: 4,
}, },
} }
@ -97,7 +109,7 @@ func TestRollingWindowReduce(t *testing.T) {
} }
} }
var result float64 var result float64
r.Reduce(func(b *Bucket) { r.Reduce(func(b *Bucket[float64]) {
result += b.Sum result += b.Sum
}) })
assert.Equal(t, test.expect, result) assert.Equal(t, test.expect, result)
@ -108,10 +120,12 @@ func TestRollingWindowReduce(t *testing.T) {
func TestRollingWindowBucketTimeBoundary(t *testing.T) { func TestRollingWindowBucketTimeBoundary(t *testing.T) {
const size = 3 const size = 3
interval := time.Millisecond * 30 interval := time.Millisecond * 30
r := NewRollingWindow(size, interval) r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] {
return new(Bucket[float64])
}, size, interval)
listBuckets := func() []float64 { listBuckets := func() []float64 {
var buckets []float64 var buckets []float64
r.Reduce(func(b *Bucket) { r.Reduce(func(b *Bucket[float64]) {
buckets = append(buckets, b.Sum) buckets = append(buckets, b.Sum)
}) })
return buckets return buckets
@ -138,7 +152,9 @@ func TestRollingWindowBucketTimeBoundary(t *testing.T) {
func TestRollingWindowDataRace(t *testing.T) { func TestRollingWindowDataRace(t *testing.T) {
const size = 3 const size = 3
r := NewRollingWindow(size, duration) r := NewRollingWindow[float64, *Bucket[float64]](func() *Bucket[float64] {
return new(Bucket[float64])
}, size, duration)
stop := make(chan bool) stop := make(chan bool)
go func() { go func() {
for { for {
@ -157,7 +173,7 @@ func TestRollingWindowDataRace(t *testing.T) {
case <-stop: case <-stop:
return return
default: default:
r.Reduce(func(b *Bucket) {}) r.Reduce(func(b *Bucket[float64]) {})
} }
} }
}() }()

View File

@ -76,8 +76,8 @@ type (
avgFlyingLock syncx.SpinLock avgFlyingLock syncx.SpinLock
overloadTime *syncx.AtomicDuration overloadTime *syncx.AtomicDuration
droppedRecently *syncx.AtomicBool droppedRecently *syncx.AtomicBool
passCounter *collection.RollingWindow passCounter *collection.RollingWindow[int64, *collection.Bucket[int64]]
rtCounter *collection.RollingWindow rtCounter *collection.RollingWindow[int64, *collection.Bucket[int64]]
} }
) )
@ -107,15 +107,16 @@ func NewAdaptiveShedder(opts ...ShedderOption) Shedder {
opt(&options) opt(&options)
} }
bucketDuration := options.window / time.Duration(options.buckets) bucketDuration := options.window / time.Duration(options.buckets)
newBucket := func() *collection.Bucket[int64] {
return new(collection.Bucket[int64])
}
return &adaptiveShedder{ return &adaptiveShedder{
cpuThreshold: options.cpuThreshold, cpuThreshold: options.cpuThreshold,
windowScale: float64(time.Second) / float64(bucketDuration) / millisecondsPerSecond, windowScale: float64(time.Second) / float64(bucketDuration) / millisecondsPerSecond,
overloadTime: syncx.NewAtomicDuration(), overloadTime: syncx.NewAtomicDuration(),
droppedRecently: syncx.NewAtomicBool(), droppedRecently: syncx.NewAtomicBool(),
passCounter: collection.NewRollingWindow(options.buckets, bucketDuration, passCounter: collection.NewRollingWindow[int64, *collection.Bucket[int64]](newBucket, options.buckets, bucketDuration, collection.IgnoreCurrentBucket[int64, *collection.Bucket[int64]]()),
collection.IgnoreCurrentBucket()), rtCounter: collection.NewRollingWindow[int64, *collection.Bucket[int64]](newBucket, options.buckets, bucketDuration, collection.IgnoreCurrentBucket[int64, *collection.Bucket[int64]]()),
rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
collection.IgnoreCurrentBucket()),
} }
} }
@ -167,15 +168,15 @@ func (as *adaptiveShedder) maxFlight() float64 {
} }
func (as *adaptiveShedder) maxPass() int64 { func (as *adaptiveShedder) maxPass() int64 {
var result float64 = 1 var result int64 = 1
as.passCounter.Reduce(func(b *collection.Bucket) { as.passCounter.Reduce(func(b *collection.Bucket[int64]) {
if b.Sum > result { if b.Sum > result {
result = b.Sum result = b.Sum
} }
}) })
return int64(result) return result
} }
func (as *adaptiveShedder) minRt() float64 { func (as *adaptiveShedder) minRt() float64 {
@ -183,12 +184,12 @@ func (as *adaptiveShedder) minRt() float64 {
// its a reasonable large value to avoid dropping requests. // its a reasonable large value to avoid dropping requests.
result := defaultMinRt result := defaultMinRt
as.rtCounter.Reduce(func(b *collection.Bucket) { as.rtCounter.Reduce(func(b *collection.Bucket[int64]) {
if b.Count <= 0 { if b.Count <= 0 {
return return
} }
avg := math.Round(b.Sum / float64(b.Count)) avg := math.Round(float64(b.Sum) / float64(b.Count))
if avg < result { if avg < result {
result = avg result = avg
} }
@ -283,6 +284,6 @@ func (p *promise) Fail() {
func (p *promise) Pass() { func (p *promise) Pass() {
rt := float64(timex.Since(p.start)) / float64(time.Millisecond) rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
p.shedder.addFlying(-1) p.shedder.addFlying(-1)
p.shedder.rtCounter.Add(math.Ceil(rt)) p.shedder.rtCounter.Add(int64(math.Ceil(rt)))
p.shedder.passCounter.Add(1) p.shedder.passCounter.Add(1)
} }

View File

@ -58,7 +58,7 @@ func TestAdaptiveShedder(t *testing.T) {
func TestAdaptiveShedderMaxPass(t *testing.T) { func TestAdaptiveShedderMaxPass(t *testing.T) {
passCounter := newRollingWindow() passCounter := newRollingWindow()
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
passCounter.Add(float64(i * 100)) passCounter.Add(int64(i * 100))
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
shedder := &adaptiveShedder{ shedder := &adaptiveShedder{
@ -83,7 +83,7 @@ func TestAdaptiveShedderMinRt(t *testing.T) {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtCounter.Add(float64(j)) rtCounter.Add(int64(j))
} }
} }
shedder := &adaptiveShedder{ shedder := &adaptiveShedder{
@ -107,9 +107,9 @@ func TestAdaptiveShedderMaxFlight(t *testing.T) {
if i > 0 { if i > 0 {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
passCounter.Add(float64((i + 1) * 100)) passCounter.Add(int64((i + 1) * 100))
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtCounter.Add(float64(j)) rtCounter.Add(int64(j))
} }
} }
shedder := &adaptiveShedder{ shedder := &adaptiveShedder{
@ -129,9 +129,9 @@ func TestAdaptiveShedderShouldDrop(t *testing.T) {
if i > 0 { if i > 0 {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
passCounter.Add(float64((i + 1) * 100)) passCounter.Add(int64((i + 1) * 100))
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtCounter.Add(float64(j)) rtCounter.Add(int64(j))
} }
} }
shedder := &adaptiveShedder{ shedder := &adaptiveShedder{
@ -184,9 +184,9 @@ func TestAdaptiveShedderStillHot(t *testing.T) {
if i > 0 { if i > 0 {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
passCounter.Add(float64((i + 1) * 100)) passCounter.Add(int64((i + 1) * 100))
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtCounter.Add(float64(j)) rtCounter.Add(int64(j))
} }
} }
shedder := &adaptiveShedder{ shedder := &adaptiveShedder{
@ -248,9 +248,9 @@ func BenchmarkMaxFlight(b *testing.B) {
if i > 0 { if i > 0 {
time.Sleep(bucketDuration) time.Sleep(bucketDuration)
} }
passCounter.Add(float64((i + 1) * 100)) passCounter.Add(int64((i + 1) * 100))
for j := i*10 + 1; j <= i*10+10; j++ { for j := i*10 + 1; j <= i*10+10; j++ {
rtCounter.Add(float64(j)) rtCounter.Add(int64(j))
} }
} }
shedder := &adaptiveShedder{ shedder := &adaptiveShedder{
@ -265,6 +265,8 @@ func BenchmarkMaxFlight(b *testing.B) {
} }
} }
func newRollingWindow() *collection.RollingWindow { func newRollingWindow() *collection.RollingWindow[int64, *collection.Bucket[int64]] {
return collection.NewRollingWindow(buckets, bucketDuration, collection.IgnoreCurrentBucket()) return collection.NewRollingWindow[int64, *collection.Bucket[int64]](func() *collection.Bucket[int64] {
return new(collection.Bucket[int64])
}, buckets, bucketDuration, collection.IgnoreCurrentBucket[int64, *collection.Bucket[int64]]())
} }

View File

@ -1,13 +1,13 @@
package mathx package mathx
type numerical interface { type Numerical interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 | ~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~float32 | ~float64 ~float32 | ~float64
} }
// AtLeast returns the greater of x or lower. // AtLeast returns the greater of x or lower.
func AtLeast[T numerical](x, lower T) T { func AtLeast[T Numerical](x, lower T) T {
if x < lower { if x < lower {
return lower return lower
} }
@ -15,7 +15,7 @@ func AtLeast[T numerical](x, lower T) T {
} }
// AtMost returns the smaller of x or upper. // AtMost returns the smaller of x or upper.
func AtMost[T numerical](x, upper T) T { func AtMost[T Numerical](x, upper T) T {
if x > upper { if x > upper {
return upper return upper
} }
@ -23,7 +23,7 @@ func AtMost[T numerical](x, upper T) T {
} }
// Between returns the value of x clamped to the range [lower, upper]. // Between returns the value of x clamped to the range [lower, upper].
func Between[T numerical](x, lower, upper T) T { func Between[T Numerical](x, lower, upper T) T {
if x < lower { if x < lower {
return lower return lower
} }

View File

@ -302,10 +302,10 @@ func TestQueryRowsScanTimeout(t *testing.T) {
Foo string Foo string
} }
conn := NewSqlConnFromDB(db) conn := NewSqlConnFromDB(db)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*2)
defer cancel()
err := conn.QueryRowsCtx(ctx, &val, "any") err := conn.QueryRowsCtx(ctx, &val, "any")
assert.ErrorIs(t, err, context.DeadlineExceeded) assert.ErrorIs(t, err, context.DeadlineExceeded)
cancel()
}) })
} }