mirror of
https://github.com/bufanyun/hotgo.git
synced 2025-01-25 03:58:37 +08:00
145 lines
3.3 KiB
Go
145 lines
3.3 KiB
Go
package queue
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/gogf/gf/v2/errors/gerror"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gfile"
|
||
"hotgo/internal/library/queue/disk"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// Disk 磁盘队列
|
||
|
||
type DiskProducerMq struct {
|
||
config *disk.Config
|
||
producers map[string]*disk.Queue
|
||
sync.Mutex
|
||
}
|
||
|
||
type DiskConsumerMq struct {
|
||
config *disk.Config
|
||
}
|
||
|
||
func RegisterDiskMqConsumer(config *disk.Config) (client MqConsumer, err error) {
|
||
return &DiskConsumerMq{
|
||
config: config,
|
||
}, nil
|
||
}
|
||
|
||
// ListenReceiveMsgDo 消费数据
|
||
func (q *DiskConsumerMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error) {
|
||
if topic == "" {
|
||
return gerror.New("disk.ListenReceiveMsgDo topic is empty")
|
||
}
|
||
|
||
var (
|
||
queue = NewDiskQueue(topic, q.config)
|
||
sleep = time.Second
|
||
)
|
||
|
||
go func() {
|
||
for {
|
||
if index, offset, data, err := queue.Read(); err == nil {
|
||
var mqMsg MqMsg
|
||
if err = json.Unmarshal(data, &mqMsg); err != nil {
|
||
g.Log().Warningf(ctx, "disk.ListenReceiveMsgDo Unmarshal err:%+v, topic:%v, data:%+v .", err, topic, string(data))
|
||
continue
|
||
}
|
||
if mqMsg.MsgId != "" {
|
||
receiveDo(mqMsg)
|
||
queue.Commit(index, offset)
|
||
sleep = time.Millisecond * 10
|
||
}
|
||
} else {
|
||
sleep = time.Second
|
||
}
|
||
|
||
time.Sleep(sleep)
|
||
}
|
||
}()
|
||
|
||
select {}
|
||
}
|
||
|
||
func RegisterDiskMqProducer(config *disk.Config) (client MqProducer, err error) {
|
||
return &DiskProducerMq{
|
||
config: config,
|
||
producers: make(map[string]*disk.Queue),
|
||
}, nil
|
||
}
|
||
|
||
// SendMsg 按字符串类型生产数据
|
||
func (d *DiskProducerMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error) {
|
||
return d.SendByteMsg(topic, []byte(body))
|
||
}
|
||
|
||
// SendByteMsg 生产数据
|
||
func (d *DiskProducerMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error) {
|
||
if topic == "" {
|
||
return mqMsg, gerror.New("DiskMq topic is empty")
|
||
}
|
||
|
||
mqMsg = MqMsg{
|
||
RunType: SendMsg,
|
||
Topic: topic,
|
||
MsgId: getRandMsgId(),
|
||
Body: body,
|
||
Timestamp: time.Now(),
|
||
}
|
||
|
||
mqMsgJson, err := json.Marshal(mqMsg)
|
||
if err != nil {
|
||
return mqMsg, gerror.New(fmt.Sprint("queue redis 生产者解析json消息失败:", err))
|
||
}
|
||
|
||
queue := d.getProducer(topic)
|
||
if err = queue.Write(mqMsgJson); err != nil {
|
||
return mqMsg, gerror.New(fmt.Sprint("queue disk 生产者添加消息失败:", err))
|
||
}
|
||
return
|
||
}
|
||
|
||
func (d *DiskProducerMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg MqMsg, err error) {
|
||
err = gerror.New("implement me")
|
||
return
|
||
}
|
||
|
||
func (d *DiskProducerMq) getProducer(topic string) *disk.Queue {
|
||
queue, ok := d.producers[topic]
|
||
if ok {
|
||
return queue
|
||
}
|
||
queue = NewDiskQueue(topic, d.config)
|
||
d.Lock()
|
||
defer d.Unlock()
|
||
d.producers[topic] = queue
|
||
return queue
|
||
}
|
||
|
||
func NewDiskQueue(topic string, config *disk.Config) *disk.Queue {
|
||
conf := &disk.Config{
|
||
Path: fmt.Sprintf(config.Path + "/" + config.GroupName + "/" + topic),
|
||
BatchSize: config.BatchSize,
|
||
BatchTime: config.BatchTime * time.Second,
|
||
SegmentSize: config.SegmentSize,
|
||
SegmentLimit: config.SegmentLimit,
|
||
}
|
||
|
||
if !gfile.Exists(conf.Path) {
|
||
if err := gfile.Mkdir(conf.Path); err != nil {
|
||
g.Log().Errorf(ctx, "NewDiskQueue Failed to create the cache directory. Procedure, err:%+v", err)
|
||
return nil
|
||
}
|
||
}
|
||
|
||
queue, err := disk.New(conf)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "NewDiskQueue err:%v", err)
|
||
return nil
|
||
}
|
||
return queue
|
||
}
|