mirror of
https://github.com/bufanyun/hotgo.git
synced 2025-02-03 10:58:44 +08:00
102 lines
1.9 KiB
Go
102 lines
1.9 KiB
Go
// Package tcp
|
|
// @Link https://github.com/bufanyun/hotgo
|
|
// @Copyright Copyright (c) 2023 HotGo CLI
|
|
// @Author Ms <133814250@qq.com>
|
|
// @License https://github.com/bufanyun/hotgo/blob/master/LICENSE
|
|
package tcp
|
|
|
|
import (
|
|
"context"
|
|
"github.com/gogf/gf/v2/container/gtype"
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// RPC .
|
|
type RPC struct {
|
|
mutex sync.Mutex
|
|
callbacks map[string]RPCResponseFunc
|
|
task RoutineTask
|
|
}
|
|
|
|
// RPCResponse 响应结构
|
|
type RPCResponse struct {
|
|
res interface{}
|
|
err error
|
|
}
|
|
|
|
type RPCResponseFunc func(resp interface{}, err error)
|
|
|
|
// NewRPC 初始化RPC
|
|
func NewRPC(task RoutineTask) *RPC {
|
|
return &RPC{
|
|
task: task,
|
|
callbacks: make(map[string]RPCResponseFunc),
|
|
}
|
|
}
|
|
|
|
// Request 发起RPC请求
|
|
func (r *RPC) Request(ctx context.Context, msgId string, send func()) (res interface{}, err error) {
|
|
resCh := make(chan RPCResponse, 1)
|
|
isClose := gtype.NewBool(false)
|
|
|
|
defer func() {
|
|
isClose.Set(true)
|
|
close(resCh)
|
|
r.popCallback(msgId)
|
|
}()
|
|
|
|
r.mutex.Lock()
|
|
r.callbacks[msgId] = func(res interface{}, err error) {
|
|
if !isClose.Val() {
|
|
resCh <- RPCResponse{res: res, err: err}
|
|
}
|
|
}
|
|
r.mutex.Unlock()
|
|
|
|
r.task(ctx, send)
|
|
|
|
select {
|
|
case <-time.After(time.Second * RPCTimeout):
|
|
err = gerror.New("RPC response timeout")
|
|
return
|
|
case got := <-resCh:
|
|
return got.res, got.err
|
|
}
|
|
}
|
|
|
|
// Response RPC消息响应
|
|
func (r *RPC) Response(ctx context.Context, msg *Message) bool {
|
|
if len(msg.MsgId) == 0 {
|
|
return false
|
|
}
|
|
|
|
f, ok := r.popCallback(msg.MsgId)
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
var msgError error
|
|
if len(msg.Error) > 0 {
|
|
msgError = gerror.New(msg.Error)
|
|
}
|
|
|
|
r.task(ctx, func() {
|
|
f(msg.Data, msgError)
|
|
})
|
|
return true
|
|
}
|
|
|
|
// popCallback 弹出回调
|
|
func (r *RPC) popCallback(msgId string) (RPCResponseFunc, bool) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
|
|
call, ok := r.callbacks[msgId]
|
|
if ok {
|
|
delete(r.callbacks, msgId)
|
|
}
|
|
return call, ok
|
|
}
|