mirror of
https://github.com/zeromicro/go-zero.git
synced 2025-01-23 17:20:24 +08:00
227 lines
5.2 KiB
Go
227 lines
5.2 KiB
Go
package p2c
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"github.com/zeromicro/go-zero/core/syncx"
|
|
"github.com/zeromicro/go-zero/core/timex"
|
|
"github.com/zeromicro/go-zero/zrpc/internal/codes"
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/balancer/base"
|
|
"google.golang.org/grpc/resolver"
|
|
)
|
|
|
|
const (
|
|
// Name is the name of p2c balancer.
|
|
Name = "p2c_ewma"
|
|
|
|
// default value from finagle
|
|
decayTime = int64(time.Second * 10)
|
|
// if a node is not selected for a period of time, it is forcibly selected.
|
|
forcePick = int64(time.Second)
|
|
// initial success count
|
|
initSuccess = 1000
|
|
// success count to trigger throttling
|
|
throttleSuccess = initSuccess / 2
|
|
// penalty value for load calculation
|
|
penalty = int64(math.MaxInt32)
|
|
// number of pick attempts
|
|
pickTimes = 3
|
|
// log interval for statistics
|
|
logInterval = time.Minute
|
|
)
|
|
|
|
var emptyPickResult balancer.PickResult
|
|
|
|
func init() {
|
|
balancer.Register(newBuilder())
|
|
}
|
|
|
|
type p2cPickerBuilder struct{}
|
|
|
|
func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
|
|
readySCs := info.ReadySCs
|
|
if len(readySCs) == 0 {
|
|
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
|
|
}
|
|
|
|
conns := make([]*subConn, 0, len(readySCs))
|
|
for conn, connInfo := range readySCs {
|
|
conns = append(conns, &subConn{
|
|
addr: connInfo.Address,
|
|
conn: conn,
|
|
success: initSuccess,
|
|
})
|
|
}
|
|
|
|
return &p2cPicker{
|
|
conns: conns,
|
|
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
stamp: syncx.NewAtomicDuration(),
|
|
}
|
|
}
|
|
|
|
func newBuilder() balancer.Builder {
|
|
return base.NewBalancerBuilder(Name, new(p2cPickerBuilder), base.Config{HealthCheck: true})
|
|
}
|
|
|
|
type p2cPicker struct {
|
|
conns []*subConn
|
|
r *rand.Rand
|
|
stamp *syncx.AtomicDuration
|
|
lock sync.Mutex
|
|
}
|
|
|
|
func (p *p2cPicker) Pick(_ balancer.PickInfo) (balancer.PickResult, error) {
|
|
p.lock.Lock()
|
|
defer p.lock.Unlock()
|
|
|
|
var chosen *subConn
|
|
switch len(p.conns) {
|
|
case 0:
|
|
return emptyPickResult, balancer.ErrNoSubConnAvailable
|
|
case 1:
|
|
chosen = p.choose(p.conns[0], nil)
|
|
case 2:
|
|
chosen = p.choose(p.conns[0], p.conns[1])
|
|
default:
|
|
var node1, node2 *subConn
|
|
for i := 0; i < pickTimes; i++ {
|
|
a := p.r.Intn(len(p.conns))
|
|
b := p.r.Intn(len(p.conns) - 1)
|
|
if b >= a {
|
|
b++
|
|
}
|
|
node1 = p.conns[a]
|
|
node2 = p.conns[b]
|
|
if node1.healthy() && node2.healthy() {
|
|
break
|
|
}
|
|
}
|
|
|
|
chosen = p.choose(node1, node2)
|
|
}
|
|
|
|
atomic.AddInt64(&chosen.inflight, 1)
|
|
atomic.AddInt64(&chosen.requests, 1)
|
|
|
|
return balancer.PickResult{
|
|
SubConn: chosen.conn,
|
|
Done: p.buildDoneFunc(chosen),
|
|
}, nil
|
|
}
|
|
|
|
func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
|
|
start := int64(timex.Now())
|
|
return func(info balancer.DoneInfo) {
|
|
atomic.AddInt64(&c.inflight, -1)
|
|
now := timex.Now()
|
|
last := atomic.SwapInt64(&c.last, int64(now))
|
|
td := int64(now) - last
|
|
if td < 0 {
|
|
td = 0
|
|
}
|
|
|
|
// As the td/decayTime value increases, indicating an increase in delay,
|
|
// the value of w (y-axis) will decrease, inversely proportional.
|
|
// The function curve of y = x^(-x) is as follows:
|
|
// https://github.com/zeromicro/zero-doc/blob/main/doc/images/y_e_x.png?raw=true
|
|
w := math.Exp(float64(-td) / float64(decayTime))
|
|
lag := int64(now) - start
|
|
if lag < 0 {
|
|
lag = 0
|
|
}
|
|
olag := atomic.LoadUint64(&c.lag)
|
|
if olag == 0 {
|
|
w = 0
|
|
}
|
|
|
|
// The smaller the value of w, the lower the impact of historical data.
|
|
atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
|
|
success := initSuccess
|
|
if info.Err != nil && !codes.Acceptable(info.Err) {
|
|
success = 0
|
|
}
|
|
osucc := atomic.LoadUint64(&c.success)
|
|
atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
|
|
|
|
stamp := p.stamp.Load()
|
|
if now-stamp >= logInterval {
|
|
if p.stamp.CompareAndSwap(stamp, now) {
|
|
p.logStats()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
|
|
start := int64(timex.Now())
|
|
if c2 == nil {
|
|
atomic.StoreInt64(&c1.pick, start)
|
|
return c1
|
|
}
|
|
|
|
if c1.load() > c2.load() {
|
|
c1, c2 = c2, c1
|
|
}
|
|
|
|
pick := atomic.LoadInt64(&c2.pick)
|
|
if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
|
|
return c2
|
|
}
|
|
|
|
atomic.StoreInt64(&c1.pick, start)
|
|
return c1
|
|
}
|
|
|
|
func (p *p2cPicker) logStats() {
|
|
stats := make([]string, 0, len(p.conns))
|
|
|
|
p.lock.Lock()
|
|
defer p.lock.Unlock()
|
|
|
|
for _, conn := range p.conns {
|
|
stats = append(stats, fmt.Sprintf("conn: %s, load: %d, reqs: %d",
|
|
conn.addr.Addr, conn.load(), atomic.SwapInt64(&conn.requests, 0)))
|
|
}
|
|
|
|
logx.Statf("p2c - %s", strings.Join(stats, "; "))
|
|
}
|
|
|
|
type subConn struct {
|
|
// The request latency measured by the weighted moving average algorithm.
|
|
lag uint64
|
|
|
|
// The value represents the number of requests that are either pending or just
|
|
// starting at the current node, and it is obtained through atomic addition.
|
|
inflight int64
|
|
success uint64
|
|
requests int64
|
|
last int64
|
|
pick int64
|
|
addr resolver.Address
|
|
conn balancer.SubConn
|
|
}
|
|
|
|
func (c *subConn) healthy() bool {
|
|
return atomic.LoadUint64(&c.success) > throttleSuccess
|
|
}
|
|
|
|
func (c *subConn) load() int64 {
|
|
// plus one to avoid multiply zero
|
|
lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
|
|
load := lag * (atomic.LoadInt64(&c.inflight) + 1)
|
|
if load == 0 {
|
|
return penalty
|
|
}
|
|
|
|
return load
|
|
}
|