2023-02-23 17:53:04 +08:00
|
|
|
|
package queue
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
|
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
|
|
|
"github.com/gogf/gf/v2/os/gfile"
|
|
|
|
|
"hotgo/internal/library/queue/disk"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
2023-02-26 14:18:22 +08:00
|
|
|
|
// Disk 磁盘队列
|
|
|
|
|
|
2023-02-23 17:53:04 +08:00
|
|
|
|
type DiskProducerMq struct {
|
|
|
|
|
config *disk.Config
|
|
|
|
|
producers map[string]*disk.Queue
|
|
|
|
|
sync.Mutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type DiskConsumerMq struct {
|
|
|
|
|
config *disk.Config
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func RegisterDiskMqConsumer(config *disk.Config) (client MqConsumer, err error) {
|
|
|
|
|
return &DiskConsumerMq{
|
|
|
|
|
config: config,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ListenReceiveMsgDo 消费数据
|
|
|
|
|
func (q *DiskConsumerMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error) {
|
|
|
|
|
if topic == "" {
|
|
|
|
|
return gerror.New("disk.ListenReceiveMsgDo topic is empty")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
queue = NewDiskQueue(topic, q.config)
|
|
|
|
|
sleep = time.Second
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
if index, offset, data, err := queue.Read(); err == nil {
|
|
|
|
|
var mqMsg MqMsg
|
|
|
|
|
if err = json.Unmarshal(data, &mqMsg); err != nil {
|
|
|
|
|
g.Log().Warningf(ctx, "disk.ListenReceiveMsgDo Unmarshal err:%+v, topic:%v, data:%+v .", err, topic, string(data))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if mqMsg.MsgId != "" {
|
|
|
|
|
receiveDo(mqMsg)
|
|
|
|
|
queue.Commit(index, offset)
|
|
|
|
|
sleep = time.Millisecond * 1
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
sleep = time.Second
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time.Sleep(sleep)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
select {}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func RegisterDiskMqProducer(config *disk.Config) (client MqProducer, err error) {
|
|
|
|
|
return &DiskProducerMq{
|
|
|
|
|
config: config,
|
|
|
|
|
producers: make(map[string]*disk.Queue),
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SendMsg 按字符串类型生产数据
|
|
|
|
|
func (d *DiskProducerMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error) {
|
|
|
|
|
return d.SendByteMsg(topic, []byte(body))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SendByteMsg 生产数据
|
|
|
|
|
func (d *DiskProducerMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error) {
|
|
|
|
|
if topic == "" {
|
|
|
|
|
return mqMsg, gerror.New("DiskMq topic is empty")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mqMsg = MqMsg{
|
|
|
|
|
RunType: SendMsg,
|
|
|
|
|
Topic: topic,
|
|
|
|
|
MsgId: getRandMsgId(),
|
|
|
|
|
Body: body,
|
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mqMsgJson, err := json.Marshal(mqMsg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return mqMsg, gerror.New(fmt.Sprint("queue redis 生产者解析json消息失败:", err))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue := d.getProducer(topic)
|
|
|
|
|
if err = queue.Write(mqMsgJson); err != nil {
|
|
|
|
|
return mqMsg, gerror.New(fmt.Sprint("queue disk 生产者添加消息失败:", err))
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (d *DiskProducerMq) getProducer(topic string) *disk.Queue {
|
|
|
|
|
queue, ok := d.producers[topic]
|
|
|
|
|
if ok {
|
|
|
|
|
return queue
|
|
|
|
|
}
|
|
|
|
|
queue = NewDiskQueue(topic, d.config)
|
|
|
|
|
d.Lock()
|
|
|
|
|
defer d.Unlock()
|
|
|
|
|
d.producers[topic] = queue
|
|
|
|
|
return queue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewDiskQueue(topic string, config *disk.Config) *disk.Queue {
|
|
|
|
|
conf := &disk.Config{
|
|
|
|
|
Path: fmt.Sprintf(config.Path + "/" + config.GroupName + "/" + topic),
|
|
|
|
|
BatchSize: config.BatchSize,
|
|
|
|
|
BatchTime: config.BatchTime * time.Second,
|
|
|
|
|
SegmentSize: config.SegmentSize,
|
|
|
|
|
SegmentLimit: config.SegmentLimit,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !gfile.Exists(conf.Path) {
|
|
|
|
|
if err := gfile.Mkdir(conf.Path); err != nil {
|
|
|
|
|
g.Log().Errorf(ctx, "NewDiskQueue Failed to create the cache directory. Procedure, err:%+v", err)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
queue, err := disk.New(conf)
|
|
|
|
|
if err != nil {
|
|
|
|
|
g.Log().Errorf(ctx, "NewDiskQueue err:%v", err)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return queue
|
|
|
|
|
}
|