2024-01-04 16:36:36 +08:00
|
|
|
|
package kis
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
2024-03-26 14:54:50 +08:00
|
|
|
|
"github.com/aceld/kis-flow/common"
|
|
|
|
|
"github.com/aceld/kis-flow/log"
|
2024-03-19 19:16:03 +08:00
|
|
|
|
"reflect"
|
2024-01-04 16:36:36 +08:00
|
|
|
|
"sync"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var _poolOnce sync.Once
|
|
|
|
|
|
2024-03-19 19:16:03 +08:00
|
|
|
|
// kisPool 用于管理全部的Function和Flow配置的池子
|
2024-01-04 16:36:36 +08:00
|
|
|
|
type kisPool struct {
|
|
|
|
|
fnRouter funcRouter // 全部的Function管理路由
|
|
|
|
|
fnLock sync.RWMutex // fnRouter 锁
|
|
|
|
|
|
|
|
|
|
flowRouter flowRouter // 全部的flow对象
|
|
|
|
|
flowLock sync.RWMutex // flowRouter 锁
|
2024-01-09 17:30:58 +08:00
|
|
|
|
|
|
|
|
|
cInitRouter connInitRouter // 全部的Connector初始化路由
|
|
|
|
|
ciLock sync.RWMutex // cInitRouter 锁
|
|
|
|
|
|
2024-01-26 17:27:29 +08:00
|
|
|
|
cTree connTree // 全部Connector管理路由
|
|
|
|
|
cLock sync.RWMutex // cTree 锁
|
2024-01-04 16:36:36 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 单例
|
|
|
|
|
var _pool *kisPool
|
|
|
|
|
|
|
|
|
|
// Pool 单例构造
|
|
|
|
|
func Pool() *kisPool {
|
|
|
|
|
_poolOnce.Do(func() {
|
|
|
|
|
//创建kisPool对象
|
|
|
|
|
_pool = new(kisPool)
|
|
|
|
|
|
|
|
|
|
// fnRouter初始化
|
|
|
|
|
_pool.fnRouter = make(funcRouter)
|
|
|
|
|
|
|
|
|
|
// flowRouter初始化
|
|
|
|
|
_pool.flowRouter = make(flowRouter)
|
2024-01-09 17:30:58 +08:00
|
|
|
|
|
|
|
|
|
// connTree初始化
|
|
|
|
|
_pool.cTree = make(connTree)
|
|
|
|
|
_pool.cInitRouter = make(connInitRouter)
|
2024-01-04 16:36:36 +08:00
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return _pool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (pool *kisPool) AddFlow(name string, flow Flow) {
|
2024-01-09 17:30:58 +08:00
|
|
|
|
pool.flowLock.Lock() // 写锁
|
2024-01-04 16:36:36 +08:00
|
|
|
|
defer pool.flowLock.Unlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := pool.flowRouter[name]; !ok {
|
|
|
|
|
pool.flowRouter[name] = flow
|
|
|
|
|
} else {
|
|
|
|
|
errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name)
|
|
|
|
|
panic(errString)
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-23 16:21:02 +08:00
|
|
|
|
log.Logger().InfoF("Add FlowRouter FlowName=%s", name)
|
2024-01-04 16:36:36 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (pool *kisPool) GetFlow(name string) Flow {
|
2024-01-09 17:30:58 +08:00
|
|
|
|
pool.flowLock.RLock() // 读锁
|
2024-01-04 16:36:36 +08:00
|
|
|
|
defer pool.flowLock.RUnlock()
|
|
|
|
|
|
|
|
|
|
if flow, ok := pool.flowRouter[name]; ok {
|
|
|
|
|
return flow
|
|
|
|
|
} else {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
|
|
|
|
|
func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
2024-03-19 19:16:03 +08:00
|
|
|
|
|
2024-03-25 16:35:20 +08:00
|
|
|
|
// 当注册FaaS计算逻辑回调时,创建一个FaaSDesc描述对象
|
2024-03-19 19:16:03 +08:00
|
|
|
|
faaSDesc, err := NewFaaSDesc(fnName, f)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-09 17:30:58 +08:00
|
|
|
|
pool.fnLock.Lock() // 写锁
|
2024-01-04 16:36:36 +08:00
|
|
|
|
defer pool.fnLock.Unlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := pool.fnRouter[fnName]; !ok {
|
2024-03-25 16:35:20 +08:00
|
|
|
|
// 将FaaSDesc描述对象注册到fnRouter中
|
2024-03-19 19:16:03 +08:00
|
|
|
|
pool.fnRouter[fnName] = faaSDesc
|
2024-01-04 16:36:36 +08:00
|
|
|
|
} else {
|
|
|
|
|
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
|
|
|
|
|
panic(errString)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CallFunction 调度 Function
|
|
|
|
|
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
|
2024-03-29 10:13:35 +08:00
|
|
|
|
pool.fnLock.RLock() // 读锁
|
|
|
|
|
defer pool.fnLock.RUnlock()
|
2024-03-19 19:16:03 +08:00
|
|
|
|
if funcDesc, ok := pool.fnRouter[fnName]; ok {
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 被调度Function的形参列表
|
2024-03-19 19:16:03 +08:00
|
|
|
|
params := make([]reflect.Value, 0, funcDesc.ArgNum)
|
|
|
|
|
|
|
|
|
|
for _, argType := range funcDesc.ArgsType {
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 如果是Flow类型形参,则将 flow的值传入
|
2024-03-19 19:16:03 +08:00
|
|
|
|
if isFlowType(argType) {
|
|
|
|
|
params = append(params, reflect.ValueOf(flow))
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 如果是Context类型形参,则将 ctx的值传入
|
2024-03-19 19:16:03 +08:00
|
|
|
|
if isContextType(argType) {
|
|
|
|
|
params = append(params, reflect.ValueOf(ctx))
|
|
|
|
|
continue
|
|
|
|
|
}
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 如果是Slice类型形参,则将 flow.Input()的值传入
|
|
|
|
|
if isSliceType(argType) {
|
|
|
|
|
|
|
|
|
|
// 将flow.Input()中的原始数据,反序列化为argType类型的数据
|
|
|
|
|
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
|
2024-03-19 19:16:03 +08:00
|
|
|
|
if err != nil {
|
2024-03-25 16:35:20 +08:00
|
|
|
|
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
|
2024-03-20 11:50:08 +08:00
|
|
|
|
} else {
|
|
|
|
|
params = append(params, value)
|
|
|
|
|
continue
|
2024-03-19 19:16:03 +08:00
|
|
|
|
}
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
2024-03-19 19:16:03 +08:00
|
|
|
|
}
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
|
2024-03-19 19:16:03 +08:00
|
|
|
|
params = append(params, reflect.Zero(argType))
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-25 16:35:20 +08:00
|
|
|
|
// 调用当前Function 的计算逻辑
|
2024-03-19 19:16:03 +08:00
|
|
|
|
retValues := funcDesc.FuncValue.Call(params)
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 取出第一个返回值,如果是nil,则返回nil
|
2024-03-19 19:16:03 +08:00
|
|
|
|
ret := retValues[0].Interface()
|
|
|
|
|
if ret == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
|
|
|
|
// 如果返回值是error类型,则返回error
|
2024-03-19 19:16:03 +08:00
|
|
|
|
return retValues[0].Interface().(error)
|
2024-01-04 16:36:36 +08:00
|
|
|
|
|
|
|
|
|
}
|
2024-03-25 16:35:20 +08:00
|
|
|
|
|
2024-01-04 16:36:36 +08:00
|
|
|
|
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
|
|
|
|
|
|
|
|
|
|
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
|
|
|
|
|
}
|
2024-01-09 17:30:58 +08:00
|
|
|
|
|
|
|
|
|
// CaaSInit 注册Connector初始化业务
|
|
|
|
|
func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
|
|
|
|
|
pool.ciLock.Lock() // 写锁
|
|
|
|
|
defer pool.ciLock.Unlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := pool.cInitRouter[cname]; !ok {
|
|
|
|
|
pool.cInitRouter[cname] = c
|
|
|
|
|
} else {
|
|
|
|
|
errString := fmt.Sprintf("KisPool Reg CaaSInit Repeat CName=%s\n", cname)
|
|
|
|
|
panic(errString)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Logger().InfoF("Add KisPool CaaSInit CName=%s", cname)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CallConnInit 调度 ConnInit
|
|
|
|
|
func (pool *kisPool) CallConnInit(conn Connector) error {
|
|
|
|
|
pool.ciLock.RLock() // 读锁
|
|
|
|
|
defer pool.ciLock.RUnlock()
|
|
|
|
|
|
|
|
|
|
init, ok := pool.cInitRouter[conn.GetName()]
|
|
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
panic(errors.New(fmt.Sprintf("init connector cname = %s not reg..", conn.GetName())))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return init(conn)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CaaS 注册Connector Call业务
|
|
|
|
|
func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
|
|
|
|
|
pool.cLock.Lock() // 写锁
|
|
|
|
|
defer pool.cLock.Unlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := pool.cTree[cname]; !ok {
|
|
|
|
|
//cid 首次注册,不存在,创建二级树NsConnSL
|
|
|
|
|
pool.cTree[cname] = make(connSL)
|
|
|
|
|
|
|
|
|
|
//初始化各类型FunctionMode
|
|
|
|
|
pool.cTree[cname][common.S] = make(connFuncRouter)
|
|
|
|
|
pool.cTree[cname][common.L] = make(connFuncRouter)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _, ok := pool.cTree[cname][mode][fname]; !ok {
|
|
|
|
|
pool.cTree[cname][mode][fname] = c
|
|
|
|
|
} else {
|
|
|
|
|
errString := fmt.Sprintf("CaaS Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
|
|
|
|
|
panic(errString)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Logger().InfoF("Add KisPool CaaS CName=%s, FName=%s, Mode =%s", cname, fname, mode)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CallConnector 调度 Connector
|
2024-04-03 12:36:34 +08:00
|
|
|
|
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) (interface{}, error) {
|
2024-04-01 13:51:17 +08:00
|
|
|
|
pool.cLock.RLock() // 读锁
|
|
|
|
|
defer pool.cLock.RUnlock()
|
2024-01-09 17:30:58 +08:00
|
|
|
|
fn := flow.GetThisFunction()
|
|
|
|
|
fnConf := fn.GetConfig()
|
|
|
|
|
mode := common.KisMode(fnConf.FMode)
|
|
|
|
|
|
|
|
|
|
if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
|
|
|
|
|
return callback(ctx, conn, fn, flow, args)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)
|
|
|
|
|
|
2024-04-03 12:36:34 +08:00
|
|
|
|
return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
|
2024-01-09 17:30:58 +08:00
|
|
|
|
}
|
2024-01-12 17:27:43 +08:00
|
|
|
|
|
|
|
|
|
// GetFlows 得到全部的Flow
|
|
|
|
|
func (pool *kisPool) GetFlows() []Flow {
|
|
|
|
|
pool.flowLock.RLock() // 读锁
|
|
|
|
|
defer pool.flowLock.RUnlock()
|
|
|
|
|
|
|
|
|
|
var flows []Flow
|
|
|
|
|
|
|
|
|
|
for _, flow := range pool.flowRouter {
|
|
|
|
|
flows = append(flows, flow)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return flows
|
|
|
|
|
}
|