go-zero/core/queue/balancedpusher.go

50 lines
1.0 KiB
Go
Raw Normal View History

2020-08-13 17:00:53 +08:00
package queue
import (
"errors"
"sync/atomic"
"github.com/zeromicro/go-zero/core/logx"
2020-08-13 17:00:53 +08:00
)
2021-02-22 16:38:42 +08:00
// ErrNoAvailablePusher indicates no pusher available.
2020-08-13 17:00:53 +08:00
var ErrNoAvailablePusher = errors.New("no available pusher")
2021-02-22 16:38:42 +08:00
// A BalancedPusher is used to push messages to multiple pusher with round robin algorithm.
2020-08-16 23:07:21 +08:00
type BalancedPusher struct {
2020-08-13 17:00:53 +08:00
name string
pushers []Pusher
index uint64
}
2021-02-22 16:38:42 +08:00
// NewBalancedPusher returns a BalancedPusher.
2020-08-16 23:07:21 +08:00
func NewBalancedPusher(pushers []Pusher) Pusher {
return &BalancedPusher{
2020-08-13 17:00:53 +08:00
name: generateName(pushers),
pushers: pushers,
}
}
2021-02-22 16:38:42 +08:00
// Name returns the name of pusher.
2020-08-16 23:07:21 +08:00
func (pusher *BalancedPusher) Name() string {
2020-08-13 17:00:53 +08:00
return pusher.name
}
2021-02-22 16:38:42 +08:00
// Push pushes message to one of the underlying pushers.
2020-08-16 23:07:21 +08:00
func (pusher *BalancedPusher) Push(message string) error {
2020-08-13 17:00:53 +08:00
size := len(pusher.pushers)
for i := 0; i < size; i++ {
index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
target := pusher.pushers[index]
if err := target.Push(message); err != nil {
logx.Error(err)
} else {
return nil
}
}
return ErrNoAvailablePusher
}