mirror of
https://github.com/zeromicro/go-zero.git
synced 2025-01-23 09:00:20 +08:00
339 lines
6.7 KiB
Go
339 lines
6.7 KiB
Go
package rq
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"zero/core/discov"
|
|
"zero/core/logx"
|
|
"zero/core/queue"
|
|
"zero/core/service"
|
|
"zero/core/stores/redis"
|
|
"zero/core/stringx"
|
|
"zero/core/threading"
|
|
"zero/rq/internal"
|
|
"zero/rq/internal/update"
|
|
)
|
|
|
|
const keyLen = 6
|
|
|
|
var (
|
|
ErrTimeout = errors.New("timeout error")
|
|
|
|
eventHandlerPlaceholder = dummyEventHandler(0)
|
|
)
|
|
|
|
type (
|
|
ConsumeHandle func(string) error
|
|
|
|
ConsumeHandler interface {
|
|
Consume(string) error
|
|
}
|
|
|
|
EventHandler interface {
|
|
OnEvent(event interface{})
|
|
}
|
|
|
|
QueueOption func(queue *MessageQueue)
|
|
|
|
queueOptions struct {
|
|
renewId int64
|
|
}
|
|
|
|
MessageQueue struct {
|
|
c RmqConf
|
|
redisQueue *queue.Queue
|
|
consumerFactory queue.ConsumerFactory
|
|
options queueOptions
|
|
eventLock sync.Mutex
|
|
lastEvent string
|
|
}
|
|
)
|
|
|
|
func MustNewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) queue.MessageQueue {
|
|
q, err := NewMessageQueue(c, factory, opts...)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
return q
|
|
}
|
|
|
|
func NewMessageQueue(c RmqConf, factory queue.ConsumerFactory, opts ...QueueOption) (queue.MessageQueue, error) {
|
|
if err := c.SetUp(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
q := &MessageQueue{
|
|
c: c,
|
|
}
|
|
|
|
if len(q.c.Redis.Key) == 0 {
|
|
if len(q.c.Name) == 0 {
|
|
q.c.Redis.Key = stringx.Randn(keyLen)
|
|
} else {
|
|
q.c.Redis.Key = fmt.Sprintf("%s-%s", q.c.Name, stringx.Randn(keyLen))
|
|
}
|
|
}
|
|
if q.c.Timeout > 0 {
|
|
factory = wrapWithTimeout(factory, time.Duration(q.c.Timeout)*time.Millisecond)
|
|
}
|
|
factory = wrapWithServerSensitive(q, factory)
|
|
q.consumerFactory = factory
|
|
q.redisQueue = q.buildQueue()
|
|
|
|
for _, opt := range opts {
|
|
opt(q)
|
|
}
|
|
|
|
return q, nil
|
|
}
|
|
|
|
func (q *MessageQueue) Start() {
|
|
serviceGroup := service.NewServiceGroup()
|
|
serviceGroup.Add(q.redisQueue)
|
|
q.maybeAppendRenewer(serviceGroup, q.redisQueue)
|
|
serviceGroup.Start()
|
|
}
|
|
|
|
func (q *MessageQueue) Stop() {
|
|
logx.Close()
|
|
}
|
|
|
|
func (q *MessageQueue) buildQueue() *queue.Queue {
|
|
inboundStore := redis.NewRedis(q.c.Redis.Host, q.c.Redis.Type, q.c.Redis.Pass)
|
|
producerFactory := internal.NewProducerFactory(inboundStore, q.c.Redis.Key,
|
|
internal.TimeSensitive(q.c.DropBefore))
|
|
mq := queue.NewQueue(producerFactory, q.consumerFactory)
|
|
|
|
if len(q.c.Name) > 0 {
|
|
mq.SetName(q.c.Name)
|
|
}
|
|
if q.c.NumConsumers > 0 {
|
|
mq.SetNumConsumer(q.c.NumConsumers)
|
|
}
|
|
if q.c.NumProducers > 0 {
|
|
mq.SetNumProducer(q.c.NumProducers)
|
|
}
|
|
|
|
return mq
|
|
}
|
|
|
|
func (q *MessageQueue) compareAndSetEvent(event string) bool {
|
|
q.eventLock.Lock()
|
|
defer q.eventLock.Unlock()
|
|
|
|
if q.lastEvent == event {
|
|
return false
|
|
}
|
|
|
|
q.lastEvent = event
|
|
return true
|
|
}
|
|
|
|
func (q *MessageQueue) maybeAppendRenewer(group *service.ServiceGroup, mq *queue.Queue) {
|
|
if len(q.c.Etcd.Hosts) > 0 || len(q.c.Etcd.Key) > 0 {
|
|
etcdValue := MarshalQueue(q.c.Redis)
|
|
if q.c.DropBefore > 0 {
|
|
etcdValue = strings.Join([]string{etcdValue, internal.TimedQueueType}, internal.Delimeter)
|
|
}
|
|
keepAliver := discov.NewRenewer(q.c.Etcd.Hosts, q.c.Etcd.Key, etcdValue, q.options.renewId)
|
|
mq.AddListener(pauseResumeHandler{
|
|
Renewer: keepAliver,
|
|
})
|
|
group.Add(keepAliver)
|
|
}
|
|
}
|
|
|
|
func MarshalQueue(rds redis.RedisKeyConf) string {
|
|
return strings.Join([]string{
|
|
rds.Host,
|
|
rds.Type,
|
|
rds.Pass,
|
|
rds.Key,
|
|
}, internal.Delimeter)
|
|
}
|
|
|
|
func WithHandle(handle ConsumeHandle) queue.ConsumerFactory {
|
|
return WithHandler(innerConsumerHandler{handle})
|
|
}
|
|
|
|
func WithHandler(handler ConsumeHandler, eventHandlers ...EventHandler) queue.ConsumerFactory {
|
|
return func() (queue.Consumer, error) {
|
|
if len(eventHandlers) < 1 {
|
|
return eventConsumer{
|
|
consumeHandler: handler,
|
|
eventHandler: eventHandlerPlaceholder,
|
|
}, nil
|
|
} else {
|
|
return eventConsumer{
|
|
consumeHandler: handler,
|
|
eventHandler: eventHandlers[0],
|
|
}, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func WithHandlerFactory(factory func() (ConsumeHandler, error)) queue.ConsumerFactory {
|
|
return func() (queue.Consumer, error) {
|
|
if handler, err := factory(); err != nil {
|
|
return nil, err
|
|
} else {
|
|
return eventlessHandler{handler}, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func WithRenewId(id int64) QueueOption {
|
|
return func(mq *MessageQueue) {
|
|
mq.options.renewId = id
|
|
}
|
|
}
|
|
|
|
func wrapWithServerSensitive(mq *MessageQueue, factory queue.ConsumerFactory) queue.ConsumerFactory {
|
|
return func() (queue.Consumer, error) {
|
|
consumer, err := factory()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &serverSensitiveConsumer{
|
|
mq: mq,
|
|
consumer: consumer,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
func wrapWithTimeout(factory queue.ConsumerFactory, dt time.Duration) queue.ConsumerFactory {
|
|
return func() (queue.Consumer, error) {
|
|
consumer, err := factory()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &timeoutConsumer{
|
|
consumer: consumer,
|
|
dt: dt,
|
|
timer: time.NewTimer(dt),
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
type innerConsumerHandler struct {
|
|
handle ConsumeHandle
|
|
}
|
|
|
|
func (h innerConsumerHandler) Consume(v string) error {
|
|
return h.handle(v)
|
|
}
|
|
|
|
type serverSensitiveConsumer struct {
|
|
mq *MessageQueue
|
|
consumer queue.Consumer
|
|
}
|
|
|
|
func (c *serverSensitiveConsumer) Consume(msg string) error {
|
|
if update.IsServerChange(msg) {
|
|
change, err := update.UnmarshalServerChange(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
code := change.GetCode()
|
|
if !c.mq.compareAndSetEvent(code) {
|
|
return nil
|
|
}
|
|
|
|
oldHash := change.CreatePrevHash()
|
|
newHash := change.CreateCurrentHash()
|
|
hashChange := internal.NewHashChange(oldHash, newHash)
|
|
c.mq.redisQueue.Broadcast(hashChange)
|
|
|
|
return nil
|
|
}
|
|
|
|
return c.consumer.Consume(msg)
|
|
}
|
|
|
|
func (c *serverSensitiveConsumer) OnEvent(event interface{}) {
|
|
c.consumer.OnEvent(event)
|
|
}
|
|
|
|
type timeoutConsumer struct {
|
|
consumer queue.Consumer
|
|
dt time.Duration
|
|
timer *time.Timer
|
|
}
|
|
|
|
func (c *timeoutConsumer) Consume(msg string) error {
|
|
done := make(chan error)
|
|
threading.GoSafe(func() {
|
|
if err := c.consumer.Consume(msg); err != nil {
|
|
done <- err
|
|
}
|
|
close(done)
|
|
})
|
|
|
|
c.timer.Reset(c.dt)
|
|
select {
|
|
case err, ok := <-done:
|
|
c.timer.Stop()
|
|
if ok {
|
|
return err
|
|
} else {
|
|
return nil
|
|
}
|
|
case <-c.timer.C:
|
|
return ErrTimeout
|
|
}
|
|
}
|
|
|
|
func (c *timeoutConsumer) OnEvent(event interface{}) {
|
|
c.consumer.OnEvent(event)
|
|
}
|
|
|
|
type pauseResumeHandler struct {
|
|
discov.Renewer
|
|
}
|
|
|
|
func (pr pauseResumeHandler) OnPause() {
|
|
pr.Pause()
|
|
}
|
|
|
|
func (pr pauseResumeHandler) OnResume() {
|
|
pr.Resume()
|
|
}
|
|
|
|
type eventConsumer struct {
|
|
consumeHandler ConsumeHandler
|
|
eventHandler EventHandler
|
|
}
|
|
|
|
func (ec eventConsumer) Consume(msg string) error {
|
|
return ec.consumeHandler.Consume(msg)
|
|
}
|
|
|
|
func (ec eventConsumer) OnEvent(event interface{}) {
|
|
ec.eventHandler.OnEvent(event)
|
|
}
|
|
|
|
type eventlessHandler struct {
|
|
handler ConsumeHandler
|
|
}
|
|
|
|
func (h eventlessHandler) Consume(msg string) error {
|
|
return h.handler.Consume(msg)
|
|
}
|
|
|
|
func (h eventlessHandler) OnEvent(event interface{}) {
|
|
}
|
|
|
|
type dummyEventHandler int
|
|
|
|
func (eh dummyEventHandler) OnEvent(event interface{}) {
|
|
}
|