go-zero/core/queue/queue_test.go

170 lines
3.5 KiB
Go
Raw Permalink Normal View History

2020-08-13 17:00:53 +08:00
package queue
import (
2023-05-27 23:57:33 +08:00
"errors"
2023-06-04 23:20:58 +08:00
"math"
2020-08-13 17:00:53 +08:00
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const (
consumers = 4
rounds = 100
)
func TestQueue(t *testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
consumer.wait.Add(consumers)
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
q.AddListener(new(mockedListener))
q.SetName("mockqueue")
q.SetNumConsumer(consumers)
q.SetNumProducer(1)
q.pause()
q.resume()
go func() {
producer.wait.Wait()
q.Stop()
}()
q.Start()
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
2023-05-27 23:57:33 +08:00
func TestQueue_Broadcast(t *testing.T) {
2023-06-04 23:20:58 +08:00
producer := newMockedProducer(math.MaxInt32)
2023-05-27 23:57:33 +08:00
consumer := newMockedConsumer()
consumer.wait.Add(consumers)
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
q.AddListener(new(mockedListener))
q.SetName("mockqueue")
q.SetNumConsumer(consumers)
q.SetNumProducer(1)
go func() {
2023-06-04 23:20:58 +08:00
time.Sleep(time.Millisecond * 100)
2023-05-27 23:57:33 +08:00
q.Stop()
}()
2023-06-04 23:20:58 +08:00
go q.Start()
time.Sleep(time.Millisecond * 50)
q.Broadcast("message")
2023-05-27 23:57:33 +08:00
consumer.wait.Wait()
assert.Equal(t, int32(consumers), atomic.LoadInt32(&consumer.events))
}
func TestQueue_PauseResume(t *testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
consumer.wait.Add(consumers)
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
q.AddListener(new(mockedListener))
q.SetName("mockqueue")
q.SetNumConsumer(consumers)
q.SetNumProducer(1)
go func() {
producer.wait.Wait()
q.Stop()
}()
q.Start()
producer.listener.OnProducerPause()
assert.Equal(t, int32(0), atomic.LoadInt32(&q.active))
producer.listener.OnProducerResume()
assert.Equal(t, int32(1), atomic.LoadInt32(&q.active))
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
func TestQueue_ConsumeError(t *testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
consumer.consumeErr = errors.New("consume error")
consumer.wait.Add(consumers)
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
q.AddListener(new(mockedListener))
q.SetName("mockqueue")
q.SetNumConsumer(consumers)
q.SetNumProducer(1)
go func() {
producer.wait.Wait()
q.Stop()
}()
q.Start()
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
2020-08-13 17:00:53 +08:00
type mockedConsumer struct {
2023-05-27 23:57:33 +08:00
count int32
events int32
consumeErr error
wait sync.WaitGroup
2020-08-13 17:00:53 +08:00
}
func newMockedConsumer() *mockedConsumer {
return new(mockedConsumer)
}
func (c *mockedConsumer) Consume(string) error {
atomic.AddInt32(&c.count, 1)
2023-05-27 23:57:33 +08:00
return c.consumeErr
2020-08-13 17:00:53 +08:00
}
func (c *mockedConsumer) OnEvent(any) {
2020-08-13 17:00:53 +08:00
if atomic.AddInt32(&c.events, 1) <= consumers {
c.wait.Done()
}
}
type mockedProducer struct {
2023-05-27 23:57:33 +08:00
total int32
count int32
listener ProduceListener
wait sync.WaitGroup
2020-08-13 17:00:53 +08:00
}
func newMockedProducer(total int32) *mockedProducer {
p := new(mockedProducer)
p.total = total
p.wait.Add(int(total))
return p
}
func (p *mockedProducer) AddListener(listener ProduceListener) {
2023-05-27 23:57:33 +08:00
p.listener = listener
2020-08-13 17:00:53 +08:00
}
func (p *mockedProducer) Produce() (string, bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return "item", true
}
2021-02-09 13:50:21 +08:00
time.Sleep(time.Second)
return "", false
2020-08-13 17:00:53 +08:00
}
2021-04-15 19:49:17 +08:00
type mockedListener struct{}
2020-08-13 17:00:53 +08:00
func (l *mockedListener) OnPause() {
}
func (l *mockedListener) OnResume() {
}