go-zero/core/queue/queue.go

252 lines
5.2 KiB
Go
Raw Normal View History

2020-08-13 17:00:53 +08:00
package queue
import (
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/rescue"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/core/timex"
2020-08-13 17:00:53 +08:00
)
const queueName = "queue"
type (
2021-02-22 16:38:42 +08:00
// A Queue is a message queue.
2020-08-13 17:00:53 +08:00
Queue struct {
name string
metrics *stat.Metrics
producerFactory ProducerFactory
producerRoutineGroup *threading.RoutineGroup
consumerFactory ConsumerFactory
consumerRoutineGroup *threading.RoutineGroup
producerCount int
consumerCount int
active int32
channel chan string
quit chan struct{}
listeners []Listener
eventLock sync.Mutex
eventChannels []chan any
2020-08-13 17:00:53 +08:00
}
2021-02-22 16:38:42 +08:00
// A Listener interface represents a listener that can be notified with queue events.
2020-08-13 17:00:53 +08:00
Listener interface {
OnPause()
OnResume()
}
2021-02-22 16:38:42 +08:00
// A Poller interface wraps the method Poll.
2020-08-13 17:00:53 +08:00
Poller interface {
Name() string
Poll() string
}
2021-02-22 16:38:42 +08:00
// A Pusher interface wraps the method Push.
2020-08-13 17:00:53 +08:00
Pusher interface {
Name() string
Push(string) error
}
)
2021-02-22 16:38:42 +08:00
// NewQueue returns a Queue.
2020-08-13 17:00:53 +08:00
func NewQueue(producerFactory ProducerFactory, consumerFactory ConsumerFactory) *Queue {
2021-02-22 16:38:42 +08:00
q := &Queue{
2020-08-13 17:00:53 +08:00
metrics: stat.NewMetrics(queueName),
producerFactory: producerFactory,
producerRoutineGroup: threading.NewRoutineGroup(),
consumerFactory: consumerFactory,
consumerRoutineGroup: threading.NewRoutineGroup(),
producerCount: runtime.NumCPU(),
consumerCount: runtime.NumCPU() << 1,
channel: make(chan string),
quit: make(chan struct{}),
}
2021-02-22 16:38:42 +08:00
q.SetName(queueName)
2020-08-13 17:00:53 +08:00
2021-02-22 16:38:42 +08:00
return q
2020-08-13 17:00:53 +08:00
}
2021-05-10 00:09:00 +08:00
// AddListener adds a listener to q.
2021-02-22 16:38:42 +08:00
func (q *Queue) AddListener(listener Listener) {
q.listeners = append(q.listeners, listener)
2020-08-13 17:00:53 +08:00
}
2021-02-22 16:38:42 +08:00
// Broadcast broadcasts message to all event channels.
func (q *Queue) Broadcast(message any) {
2020-08-13 17:00:53 +08:00
go func() {
2021-02-22 16:38:42 +08:00
q.eventLock.Lock()
defer q.eventLock.Unlock()
2020-08-13 17:00:53 +08:00
2021-02-22 16:38:42 +08:00
for _, channel := range q.eventChannels {
2020-08-13 17:00:53 +08:00
channel <- message
}
}()
}
2021-02-22 16:38:42 +08:00
// SetName sets the name of q.
func (q *Queue) SetName(name string) {
q.name = name
q.metrics.SetName(name)
2020-08-13 17:00:53 +08:00
}
2021-02-22 16:38:42 +08:00
// SetNumConsumer sets the number of consumers.
func (q *Queue) SetNumConsumer(count int) {
q.consumerCount = count
2020-08-13 17:00:53 +08:00
}
2021-02-22 16:38:42 +08:00
// SetNumProducer sets the number of producers.
func (q *Queue) SetNumProducer(count int) {
q.producerCount = count
2020-08-13 17:00:53 +08:00
}
2021-02-22 16:38:42 +08:00
// Start starts q.
func (q *Queue) Start() {
q.startProducers(q.producerCount)
q.startConsumers(q.consumerCount)
2020-08-13 17:00:53 +08:00
2021-02-22 16:38:42 +08:00
q.producerRoutineGroup.Wait()
close(q.channel)
q.consumerRoutineGroup.Wait()
2020-08-13 17:00:53 +08:00
}
2021-02-22 16:38:42 +08:00
// Stop stops q.
func (q *Queue) Stop() {
close(q.quit)
2020-08-13 17:00:53 +08:00
}
func (q *Queue) consume(eventChan chan any) {
2020-08-13 17:00:53 +08:00
var consumer Consumer
for {
var err error
2021-02-22 16:38:42 +08:00
if consumer, err = q.consumerFactory(); err != nil {
2020-08-13 17:00:53 +08:00
logx.Errorf("Error on creating consumer: %v", err)
time.Sleep(time.Second)
} else {
break
}
}
for {
select {
2021-02-22 16:38:42 +08:00
case message, ok := <-q.channel:
2020-08-13 17:00:53 +08:00
if ok {
2021-02-22 16:38:42 +08:00
q.consumeOne(consumer, message)
2020-08-13 17:00:53 +08:00
} else {
logx.Info("Task channel was closed, quitting consumer...")
return
}
case event := <-eventChan:
consumer.OnEvent(event)
}
}
}
2021-02-22 16:38:42 +08:00
func (q *Queue) consumeOne(consumer Consumer, message string) {
2020-08-13 17:00:53 +08:00
threading.RunSafe(func() {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
2021-02-22 16:38:42 +08:00
q.metrics.Add(stat.Task{
2020-08-13 17:00:53 +08:00
Duration: duration,
})
logx.WithDuration(duration).Infof("%s", message)
}()
if err := consumer.Consume(message); err != nil {
logx.Errorf("Error occurred while consuming %v: %v", message, err)
}
})
}
2021-02-22 16:38:42 +08:00
func (q *Queue) pause() {
for _, listener := range q.listeners {
2020-08-13 17:00:53 +08:00
listener.OnPause()
}
}
2021-02-22 16:38:42 +08:00
func (q *Queue) produce() {
2020-08-13 17:00:53 +08:00
var producer Producer
for {
var err error
2021-02-22 16:38:42 +08:00
if producer, err = q.producerFactory(); err != nil {
2020-08-13 17:00:53 +08:00
logx.Errorf("Error on creating producer: %v", err)
time.Sleep(time.Second)
} else {
break
}
}
2021-02-22 16:38:42 +08:00
atomic.AddInt32(&q.active, 1)
2020-08-13 17:00:53 +08:00
producer.AddListener(routineListener{
2021-02-22 16:38:42 +08:00
queue: q,
2020-08-13 17:00:53 +08:00
})
for {
select {
2021-02-22 16:38:42 +08:00
case <-q.quit:
2020-08-13 17:00:53 +08:00
logx.Info("Quitting producer")
return
default:
2021-02-22 16:38:42 +08:00
if v, ok := q.produceOne(producer); ok {
q.channel <- v
2020-08-13 17:00:53 +08:00
}
}
}
}
2021-02-22 16:38:42 +08:00
func (q *Queue) produceOne(producer Producer) (string, bool) {
2020-08-13 17:00:53 +08:00
// avoid panic quit the producer, just log it and continue
defer rescue.Recover()
return producer.Produce()
}
2021-02-22 16:38:42 +08:00
func (q *Queue) resume() {
for _, listener := range q.listeners {
2020-08-13 17:00:53 +08:00
listener.OnResume()
}
}
2021-02-22 16:38:42 +08:00
func (q *Queue) startConsumers(number int) {
2020-08-13 17:00:53 +08:00
for i := 0; i < number; i++ {
eventChan := make(chan any)
2021-02-22 16:38:42 +08:00
q.eventLock.Lock()
q.eventChannels = append(q.eventChannels, eventChan)
q.eventLock.Unlock()
q.consumerRoutineGroup.Run(func() {
q.consume(eventChan)
2020-08-13 17:00:53 +08:00
})
}
}
2021-02-22 16:38:42 +08:00
func (q *Queue) startProducers(number int) {
2020-08-13 17:00:53 +08:00
for i := 0; i < number; i++ {
2021-02-22 16:38:42 +08:00
q.producerRoutineGroup.Run(func() {
q.produce()
2020-08-13 17:00:53 +08:00
})
}
}
type routineListener struct {
queue *Queue
}
func (rl routineListener) OnProducerPause() {
if atomic.AddInt32(&rl.queue.active, -1) <= 0 {
rl.queue.pause()
}
}
func (rl routineListener) OnProducerResume() {
if atomic.AddInt32(&rl.queue.active, 1) == 1 {
rl.queue.resume()
}
}