diff --git a/server/internal/controller/websocket/handler/common/site.go b/server/internal/controller/websocket/handler/common/site.go index fd9e983..400165e 100644 --- a/server/internal/controller/websocket/handler/common/site.go +++ b/server/internal/controller/websocket/handler/common/site.go @@ -3,7 +3,6 @@ // @Copyright Copyright (c) 2023 HotGo CLI // @Author Ms <133814250@qq.com> // @License https://github.com/bufanyun/hotgo/blob/master/LICENSE -// package common import ( diff --git a/server/internal/dao/sys_dict_type.go b/server/internal/dao/sys_dict_type.go index a874bf7..d5a1a9b 100644 --- a/server/internal/dao/sys_dict_type.go +++ b/server/internal/dao/sys_dict_type.go @@ -57,7 +57,7 @@ func (dao *sysDictTypeDao) GetTypes(ctx context.Context, id int64) (types []stri columns, err := dao.Ctx(ctx).Fields("type"). Where("id", id). WhereOr("pid", id). - Where("status", consts.StatusEnabled).All() + Where("status", consts.StatusEnabled).Array() types = g.NewVar(columns).Strings() return } diff --git a/server/internal/library/network/tcp/client.go b/server/internal/library/network/tcp/client.go index 23c22cf..c56b799 100644 --- a/server/internal/library/network/tcp/client.go +++ b/server/internal/library/network/tcp/client.go @@ -12,6 +12,7 @@ import ( "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" "hotgo/utility/simple" "reflect" @@ -49,6 +50,7 @@ type Client struct { closeEvent CallbackEvent // 连接关闭事件 sync.Mutex // 状态锁 heartbeat int64 // 心跳 + msgGo *grpool.Pool // 消息处理协程池 routers map[string]RouterHandler // 已注册的路由 conn *gtcp.Conn // 连接对象 wg sync.WaitGroup // 状态控制 @@ -100,7 +102,8 @@ func NewClient(config *ClientConfig) (client *Client, err error) { client.timeout = config.Timeout } - client.rpc = NewRpc(client.Ctx) + client.msgGo = grpool.New(5) + client.rpc = NewRpc(client.Ctx, client.msgGo, client.Logger) return } @@ -252,8 +255,7 @@ func (client *Client) read() { switch msg.Router { case "ResponseServerLogin", "ResponseServerHeartbeat": // 服务登录、心跳无需验证签名 - ctx, cancel := initCtx(gctx.New(), &Context{}) - doHandleRouterMsg(f, ctx, cancel, msg.Data) + client.doHandleRouterMsg(initCtx(gctx.New(), &Context{}), f, msg.Data) default: // 通用路由消息处理 in, err := VerifySign(msg.Data, client.auth.AppId, client.auth.SecretKey) if err != nil { @@ -261,18 +263,18 @@ func (client *Client) read() { continue } - ctx, cancel := initCtx(gctx.New(), &Context{ + ctx := initCtx(gctx.New(), &Context{ Conn: client.conn, Auth: client.auth, TraceID: in.TraceID, }) // 响应rpc消息 - if client.rpc.HandleMsg(ctx, cancel, msg.Data) { + if client.rpc.HandleMsg(ctx, msg.Data) { return } - doHandleRouterMsg(f, ctx, cancel, msg.Data) + client.doHandleRouterMsg(ctx, f, msg.Data) } } }) @@ -377,3 +379,23 @@ func (client *Client) RpcRequest(ctx context.Context, data interface{}) (res int _ = client.Write(data) }) } + +// doHandleRouterMsg 处理路由消息 +func (client *Client) doHandleRouterMsg(ctx context.Context, fun RouterHandler, args ...interface{}) { + ctx, cancel := context.WithCancel(ctx) + err := client.msgGo.AddWithRecover(ctx, + func(ctx context.Context) { + fun(ctx, args...) + cancel() + }, + func(ctx context.Context, err error) { + client.Logger.Warningf(ctx, "doHandleRouterMsg msgGo exec err:%+v", err) + cancel() + }, + ) + + if err != nil { + client.Logger.Warningf(ctx, "doHandleRouterMsg msgGo Add err:%+v", err) + return + } +} diff --git a/server/internal/library/network/tcp/context.go b/server/internal/library/network/tcp/context.go index cc00cce..3f2dfbe 100644 --- a/server/internal/library/network/tcp/context.go +++ b/server/internal/library/network/tcp/context.go @@ -12,14 +12,13 @@ import ( ) // initCtx 初始化上下文对象指针到上下文对象中,以便后续的请求流程中可以修改 -func initCtx(ctx context.Context, model *Context) (newCtx context.Context, cancel context.CancelFunc) { +func initCtx(ctx context.Context, model *Context) (newCtx context.Context) { if model.TraceID != "" { newCtx, _ = gtrace.WithTraceID(ctx, model.TraceID) } else { newCtx = ctx } newCtx = context.WithValue(newCtx, consts.ContextTCPKey, model) - newCtx, cancel = context.WithCancel(newCtx) return } diff --git a/server/internal/library/network/tcp/router.go b/server/internal/library/network/tcp/router.go index 39ce024..a5ce3c8 100644 --- a/server/internal/library/network/tcp/router.go +++ b/server/internal/library/network/tcp/router.go @@ -10,13 +10,9 @@ import ( "encoding/json" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/gtcp" - "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/util/gconv" ) -// GoPool 初始化一个协程池,用于处理消息处理 -var GoPool = grpool.New(20) - // RouterHandler 路由消息处理器 type RouterHandler func(ctx context.Context, args ...interface{}) @@ -64,11 +60,3 @@ func MsgPkg(data interface{}, auth *AuthMeta, traceID string) string { } return msg.TraceID } - -// doHandleRouterMsg 处理路由消息 -func doHandleRouterMsg(fun RouterHandler, ctx context.Context, cancel context.CancelFunc, args ...interface{}) { - _ = GoPool.Add(ctx, func(ctx context.Context) { - fun(ctx, args...) - cancel() - }) -} diff --git a/server/internal/library/network/tcp/rpc.go b/server/internal/library/network/tcp/rpc.go index 2dfc691..4c6c3f5 100644 --- a/server/internal/library/network/tcp/rpc.go +++ b/server/internal/library/network/tcp/rpc.go @@ -10,6 +10,8 @@ import ( "fmt" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/gtcp" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/grpool" "hotgo/internal/consts" "hotgo/utility/simple" "sync" @@ -20,6 +22,8 @@ type Rpc struct { ctx context.Context mutex sync.Mutex callbacks map[string]RpcRespFunc + msgGo *grpool.Pool // 消息处理协程池 + logger *glog.Logger // 日志处理器 } // RpcResp 响应结构 @@ -31,10 +35,12 @@ type RpcResp struct { type RpcRespFunc func(resp interface{}, err error) // NewRpc 初始化一个rpc协议 -func NewRpc(ctx context.Context) *Rpc { +func NewRpc(ctx context.Context, msgGo *grpool.Pool, logger *glog.Logger) *Rpc { return &Rpc{ ctx: ctx, callbacks: make(map[string]RpcRespFunc), + msgGo: msgGo, + logger: logger, } } @@ -44,7 +50,7 @@ func (r *Rpc) GetCallId(client *gtcp.Conn, traceID string) string { } // HandleMsg 处理rpc消息 -func (r *Rpc) HandleMsg(ctx context.Context, cancel context.CancelFunc, data interface{}) bool { +func (r *Rpc) HandleMsg(ctx context.Context, data interface{}) bool { user := GetCtx(ctx) callId := r.GetCallId(user.Conn, user.TraceID) @@ -53,10 +59,19 @@ func (r *Rpc) HandleMsg(ctx context.Context, cancel context.CancelFunc, data int delete(r.callbacks, callId) r.mutex.Unlock() - simple.SafeGo(ctx, func(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + + err := r.msgGo.AddWithRecover(ctx, func(ctx context.Context) { call(data, nil) cancel() + }, func(ctx context.Context, err error) { + r.logger.Warningf(ctx, "rpc HandleMsg msgGo exec err:%+v", err) + cancel() }) + + if err != nil { + r.logger.Warningf(ctx, "rpc HandleMsg msgGo Add err:%+v", err) + } return true } return false diff --git a/server/internal/library/network/tcp/server.go b/server/internal/library/network/tcp/server.go index 8263b1b..649c514 100644 --- a/server/internal/library/network/tcp/server.go +++ b/server/internal/library/network/tcp/server.go @@ -12,6 +12,7 @@ import ( "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/grpool" "hotgo/internal/consts" "hotgo/utility/simple" "reflect" @@ -45,6 +46,7 @@ type Server struct { closeFlag bool // 服务关闭标签 clients map[string]*ClientConn // 已登录的认证客户端 mutexConns sync.Mutex // 连接锁,主要用于客户端上下线 + msgGo *grpool.Pool // 消息处理协程池 cronRouters map[string]RouterHandler // 定时任务路由 queueRouters map[string]RouterHandler // 队列路由 authRouters map[string]RouterHandler // 任务路由 @@ -68,15 +70,15 @@ func NewServer(config *ServerConfig) (server *Server, err error) { if config.Name == "" { config.Name = simple.AppName(server.Ctx) } - + server.addr = config.Addr server.name = config.Name server.ln = gtcp.NewServer(server.addr, server.accept, config.Name) server.clients = make(map[string]*ClientConn) server.closeFlag = false server.Logger = g.Log("tcpServer") - server.rpc = NewRpc(server.Ctx) - + server.msgGo = grpool.New(20) + server.rpc = NewRpc(server.Ctx, server.msgGo, server.Logger) server.startCron() return } @@ -103,18 +105,18 @@ func (server *Server) accept(conn *gtcp.Conn) { switch msg.Router { case "ServerLogin": // 服务登录 // 初始化上下文 - ctx, cancel := initCtx(gctx.New(), &Context{ + ctx := initCtx(gctx.New(), &Context{ Conn: conn, }) - doHandleRouterMsg(server.onServerLogin, ctx, cancel, msg.Data) + server.doHandleRouterMsg(ctx, server.onServerLogin, msg.Data) case "ServerHeartbeat": // 心跳 if client == nil { server.Logger.Infof(server.Ctx, "conn not connected, ignore the heartbeat, msg:%+v", msg) continue } // 初始化上下文 - ctx, cancel := initCtx(gctx.New(), &Context{}) - doHandleRouterMsg(server.onServerHeartbeat, ctx, cancel, msg.Data, client) + ctx := initCtx(gctx.New(), &Context{}) + server.doHandleRouterMsg(ctx, server.onServerHeartbeat, msg.Data, client) default: // 通用路由消息处理 if client == nil { server.Logger.Warningf(server.Ctx, "conn is not logged in but sends a routing message. actively conn disconnect, msg:%+v", msg) @@ -137,14 +139,14 @@ func (server *Server) handleRouterMsg(msg *Message, client *ClientConn) { } // 初始化上下文 - ctx, cancel := initCtx(gctx.New(), &Context{ + ctx := initCtx(gctx.New(), &Context{ Conn: client.Conn, Auth: client.Auth, TraceID: in.TraceID, }) // 响应rpc消息 - if server.rpc.HandleMsg(ctx, cancel, msg.Data) { + if server.rpc.HandleMsg(ctx, msg.Data) { return } @@ -159,7 +161,7 @@ func (server *Server) handleRouterMsg(msg *Message, client *ClientConn) { return } - doHandleRouterMsg(f, ctx, cancel, msg.Data) + server.doHandleRouterMsg(ctx, f, msg.Data) } switch client.Auth.Group { @@ -174,6 +176,26 @@ func (server *Server) handleRouterMsg(msg *Message, client *ClientConn) { } } +// doHandleRouterMsg 处理路由消息 +func (server *Server) doHandleRouterMsg(ctx context.Context, fun RouterHandler, args ...interface{}) { + ctx, cancel := context.WithCancel(ctx) + err := server.msgGo.AddWithRecover(ctx, + func(ctx context.Context) { + fun(ctx, args...) + cancel() + }, + func(ctx context.Context, err error) { + server.Logger.Warningf(ctx, "doHandleRouterMsg msgGo exec err:%+v", err) + cancel() + }, + ) + + if err != nil { + server.Logger.Warningf(ctx, "doHandleRouterMsg msgGo Add err:%+v", err) + return + } +} + // getLoginConn 获取指定已登录的连接 func (server *Server) getLoginConn(conn *gtcp.Conn) *ClientConn { client, ok := server.clients[conn.RemoteAddr().String()] diff --git a/server/internal/logic/admin/dept.go b/server/internal/logic/admin/dept.go index fcbf6a5..fd092d7 100644 --- a/server/internal/logic/admin/dept.go +++ b/server/internal/logic/admin/dept.go @@ -190,13 +190,13 @@ func (s *sAdminDept) List(ctx context.Context, in adminin.DeptListInp) (res *adm // 部门名称 if in.Name != "" { - columns, err := dao.AdminDept.Ctx(ctx).Fields("pid").WhereLike("name", "%"+in.Name+"%").All() + columns, err := dao.AdminDept.Ctx(ctx).Fields("pid").WhereLike("name", "%"+in.Name+"%").Array() if err != nil { err = gerror.Wrap(err, "过滤部门列表失败-1!") return nil, err } - ds := g.NewVar(columns.Array()).Int64s() + ds := g.NewVar(columns).Int64s() ids = append(ids, ds...) pids = append(pids, ds...) if len(ids) == 0 { @@ -205,13 +205,13 @@ func (s *sAdminDept) List(ctx context.Context, in adminin.DeptListInp) (res *adm } if in.Code != "" { - columns, err := dao.AdminDept.Ctx(ctx).Fields("pid").WhereLike("code", "%"+in.Code+"%").All() + columns, err := dao.AdminDept.Ctx(ctx).Fields("pid").WhereLike("code", "%"+in.Code+"%").Array() if err != nil { err = gerror.Wrap(err, "过滤部门列表失败-2!") return nil, err } - ds := g.NewVar(columns.Array()).Int64s() + ds := g.NewVar(columns).Int64s() ids = append(ids, ds...) pids = append(pids, ds...) if len(ids) == 0 { diff --git a/server/internal/logic/admin/member.go b/server/internal/logic/admin/member.go index d5cc8a3..85d70d2 100644 --- a/server/internal/logic/admin/member.go +++ b/server/internal/logic/admin/member.go @@ -585,16 +585,13 @@ func (s *sAdminMember) List(ctx context.Context, in adminin.MemberListInp) (list } for _, v := range list { - columns, err := dao.AdminMemberPost.Ctx(ctx). - Fields(dao.AdminMemberPost.Columns().PostId). - Where(dao.AdminMemberPost.Columns().MemberId, v.Id).All() - + columns, err := dao.AdminMemberPost.Ctx(ctx).Fields(dao.AdminMemberPost.Columns().PostId).Where(dao.AdminMemberPost.Columns().MemberId, v.Id).Array() if err != nil { err = gerror.Wrap(err, "获取用户岗位数据失败!") return nil, 0, err } - v.PostIds = g.NewVar(columns.Array()).Int64s() + v.PostIds = g.NewVar(columns).Int64s() } return } diff --git a/server/internal/logic/admin/menu.go b/server/internal/logic/admin/menu.go index 2e95455..1e02dd8 100644 --- a/server/internal/logic/admin/menu.go +++ b/server/internal/logic/admin/menu.go @@ -81,6 +81,7 @@ func (s *sAdminMenu) VerifyUnique(ctx context.Context, in adminin.VerifyUniqueIn func (s *sAdminMenu) Edit(ctx context.Context, in adminin.MenuEditInp) (err error) { // 验证唯一性 err = s.VerifyUnique(ctx, adminin.VerifyUniqueInp{ + Id: in.Id, Where: g.Map{ dao.AdminMenu.Columns().Title: in.Title, dao.AdminMenu.Columns().Name: in.Name, diff --git a/server/internal/logic/admin/notice.go b/server/internal/logic/admin/notice.go index 11712a7..96ff499 100644 --- a/server/internal/logic/admin/notice.go +++ b/server/internal/logic/admin/notice.go @@ -327,7 +327,7 @@ func (s *sAdminNotice) messageIds(ctx context.Context, memberId int64) (ids []in Where("status", consts.StatusEnabled). Where("(`type` IN(?) OR (`type` = ? and JSON_CONTAINS(`receiver`,'"+gconv.String(memberId)+"')))", []int{consts.NoticeTypeNotify, consts.NoticeTypeNotice}, consts.NoticeTypeLetter, - ).All() + ).Array() if err != nil { err = gerror.Wrap(err, "获取我的消息失败!") return diff --git a/server/internal/queues/serve_log.go b/server/internal/queues/serve_log.go index f3a9d38..3592aa1 100644 --- a/server/internal/queues/serve_log.go +++ b/server/internal/queues/serve_log.go @@ -29,10 +29,15 @@ func (q *qServeLog) GetTopic() string { } // Handle 处理消息 -func (q *qServeLog) Handle(ctx context.Context, mqMsg queue.MqMsg) (err error) { +func (q *qServeLog) Handle(ctx context.Context, mqMsg queue.MqMsg) error { var data entity.SysServeLog - if err = json.Unmarshal(mqMsg.Body, &data); err != nil { - return err + if err := json.Unmarshal(mqMsg.Body, &data); err != nil { + queue.Logger().Infof(ctx, "ServeLog Handle Unmarshal err:%+v", err) + return nil } - return service.SysServeLog().RealWrite(ctx, data) + + if err := service.SysServeLog().RealWrite(ctx, data); err != nil { + queue.Logger().Infof(ctx, "ServeLog Handle Write err:%+v", err) + } + return nil } diff --git a/server/internal/router/websocket.go b/server/internal/router/websocket.go index 6e69e8c..4d8a895 100644 --- a/server/internal/router/websocket.go +++ b/server/internal/router/websocket.go @@ -43,5 +43,4 @@ func WebSocket(ctx context.Context, group *ghttp.RouterGroup) { "admin/monitor/trends": admin.Monitor.Trends, // 后台监控,动态数据 "admin/monitor/runInfo": admin.Monitor.RunInfo, // 后台监控,运行信息 }) - } diff --git a/server/internal/websocket/init.go b/server/internal/websocket/init.go index 05085fa..65233e6 100644 --- a/server/internal/websocket/init.go +++ b/server/internal/websocket/init.go @@ -3,13 +3,13 @@ // @Copyright Copyright (c) 2023 HotGo CLI // @Author Ms <133814250@qq.com> // @License https://github.com/bufanyun/hotgo/blob/master/LICENSE -// package websocket import ( "context" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" "github.com/gorilla/websocket" "net/http" @@ -19,6 +19,7 @@ var ( ctxManager context.Context // 主上下文 clientManager = NewClientManager() // 客户端管理 routers = make(map[string]EventHandler) // 消息路由 + msgGo = grpool.New(20) upGrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, diff --git a/server/internal/websocket/router.go b/server/internal/websocket/router.go index 21c78b4..1e00ee0 100644 --- a/server/internal/websocket/router.go +++ b/server/internal/websocket/router.go @@ -3,10 +3,10 @@ // @Copyright Copyright (c) 2023 HotGo CLI // @Author Ms <133814250@qq.com> // @License https://github.com/bufanyun/hotgo/blob/master/LICENSE -// package websocket import ( + "context" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" "runtime/debug" @@ -36,7 +36,20 @@ func handlerMsg(client *Client, message []byte) { g.Log().Warningf(ctxManager, "handlerMsg function id %v: not registered", request.Event) return } - fun(client, request) + + err := msgGo.AddWithRecover(ctxManager, + func(ctx context.Context) { + fun(client, request) + }, + func(ctx context.Context, err error) { + g.Log().Warningf(ctxManager, "handlerMsg msgGo exec err:%+v", err) + }, + ) + + if err != nil { + g.Log().Warningf(ctxManager, "handlerMsg msgGo Add err:%+v", err) + return + } } // RegisterMsg 注册消息