diff --git a/flow/kis_flow.go b/flow/kis_flow.go index 3009747..3fe5f78 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -176,3 +176,15 @@ func (flow *KisFlow) Run(ctx context.Context) error { return nil } + +func (flow *KisFlow) GetName() string { + return flow.Name +} + +func (flow *KisFlow) GetThisFunction() kis.Function { + return flow.ThisFunction +} + +func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig { + return flow.ThisFunction.GetConfig() +} diff --git a/function/kis_function_c.go b/function/kis_function_c.go index 257e3fa..4bb870e 100644 --- a/function/kis_function_c.go +++ b/function/kis_function_c.go @@ -2,7 +2,6 @@ package function import ( "context" - "fmt" "kis-flow/kis" "kis-flow/log" ) @@ -14,13 +13,10 @@ type KisFunctionC struct { func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow) - //TODO 调用具体的Function执行方法 - //处理业务数据 - for i, row := range flow.Input() { - fmt.Printf("In KisFunctionC, row = %+v\n", row) - - // 提交本层计算结果数据 - _ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i)) + // 通过KisPool 路由到具体的执行计算Function中 + if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { + log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) + return err } return nil diff --git a/function/kis_function_e.go b/function/kis_function_e.go index 6a32c13..88b1e4c 100644 --- a/function/kis_function_e.go +++ b/function/kis_function_e.go @@ -2,7 +2,6 @@ package function import ( "context" - "fmt" "kis-flow/kis" "kis-flow/log" ) @@ -14,10 +13,10 @@ type KisFunctionE struct { func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow) - // TODO 调用具体的Function执行方法 - //处理业务数据 - for _, row := range flow.Input() { - fmt.Printf("In KisFunctionE, row = %+v\n", row) + // 通过KisPool 路由到具体的执行计算Function中 + if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { + log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) + return err } return nil diff --git a/function/kis_function_l.go b/function/kis_function_l.go index a0ef4ec..9149e7b 100644 --- a/function/kis_function_l.go +++ b/function/kis_function_l.go @@ -13,7 +13,11 @@ type KisFunctionL struct { func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionL, flow = %+v\n", flow) - // TODO 调用具体的Function执行方法 + // 通过KisPool 路由到具体的执行计算Function中 + if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { + log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) + return err + } return nil } diff --git a/function/kis_function_s.go b/function/kis_function_s.go index 36a1a4d..d9a9516 100644 --- a/function/kis_function_s.go +++ b/function/kis_function_s.go @@ -13,7 +13,11 @@ type KisFunctionS struct { func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionS, flow = %+v\n", flow) - // TODO 调用具体的Function执行方法 + // 通过KisPool 路由到具体的执行计算Function中 + if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { + log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) + return err + } return nil } diff --git a/function/kis_function_v.go b/function/kis_function_v.go index 07f9de6..aa2eb44 100644 --- a/function/kis_function_v.go +++ b/function/kis_function_v.go @@ -13,7 +13,11 @@ type KisFunctionV struct { func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionV, flow = %+v\n", flow) - // TODO 调用具体的Function执行方法 + // 通过KisPool 路由到具体的执行计算Function中 + if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil { + log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err) + return err + } return nil } diff --git a/kis/flow.go b/kis/flow.go index 7b2a46d..61d54f5 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -15,4 +15,10 @@ type Flow interface { CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr + // GetName 得到Flow的名称 + GetName() string + // GetThisFunction 得到当前正在执行的Function + GetThisFunction() Function + // GetThisFuncConf 得到当前正在执行的Function的配置 + GetThisFuncConf() *config.KisFuncConfig } diff --git a/kis/pool.go b/kis/pool.go new file mode 100644 index 0000000..84e4991 --- /dev/null +++ b/kis/pool.go @@ -0,0 +1,91 @@ +package kis + +import ( + "context" + "errors" + "fmt" + "kis-flow/log" + "sync" +) + +var _poolOnce sync.Once + +// kisPool 用于管理全部的Function和Flow配置的池子 +type kisPool struct { + fnRouter funcRouter // 全部的Function管理路由 + fnLock sync.RWMutex // fnRouter 锁 + + flowRouter flowRouter // 全部的flow对象 + flowLock sync.RWMutex // flowRouter 锁 +} + +// 单例 +var _pool *kisPool + +// Pool 单例构造 +func Pool() *kisPool { + _poolOnce.Do(func() { + //创建kisPool对象 + _pool = new(kisPool) + + // fnRouter初始化 + _pool.fnRouter = make(funcRouter) + + // flowRouter初始化 + _pool.flowRouter = make(flowRouter) + }) + + return _pool +} + +func (pool *kisPool) AddFlow(name string, flow Flow) { + pool.flowLock.Lock() + defer pool.flowLock.Unlock() + + if _, ok := pool.flowRouter[name]; !ok { + pool.flowRouter[name] = flow + } else { + errString := fmt.Sprintf("Pool AddFlow Repeat FlowName=%s\n", name) + panic(errString) + } + + log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name) +} + +func (pool *kisPool) GetFlow(name string) Flow { + pool.flowLock.RLock() + defer pool.flowLock.RUnlock() + + if flow, ok := pool.flowRouter[name]; ok { + return flow + } else { + return nil + } +} + +// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册 +func (pool *kisPool) FaaS(fnName string, f FaaS) { + pool.fnLock.Lock() + defer pool.fnLock.Unlock() + + if _, ok := pool.fnRouter[fnName]; !ok { + pool.fnRouter[fnName] = f + } else { + errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName) + panic(errString) + } + + log.Logger().InfoF("Add KisPool FuncName=%s", fnName) +} + +// CallFunction 调度 Function +func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error { + + if f, ok := pool.fnRouter[fnName]; ok { + return f(ctx, flow) + } + + log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName) + + return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.") +} diff --git a/kis/router.go b/kis/router.go new file mode 100644 index 0000000..fb8eae6 --- /dev/null +++ b/kis/router.go @@ -0,0 +1,16 @@ +package kis + +import "context" + +// FaaS Function as a Service +type FaaS func(context.Context, Flow) error + +// funcRouter +// key: Function Name +// value: Function 回调自定义业务 +type funcRouter map[string]FaaS + +// flowRouter +// key: Flow Name +// value: Flow +type flowRouter map[string]Flow diff --git a/test/kis_pool_test.go b/test/kis_pool_test.go new file mode 100644 index 0000000..1ee5b7e --- /dev/null +++ b/test/kis_pool_test.go @@ -0,0 +1,93 @@ +package test + +import ( + "context" + "fmt" + "kis-flow/common" + "kis-flow/config" + "kis-flow/flow" + "kis-flow/kis" + "testing" +) + +func funcName1Handler(ctx context.Context, flow kis.Flow) error { + fmt.Println("---> Call funcName1Handler ----") + + for index, row := range flow.Input() { + // 打印数据 + str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) + fmt.Println(str) + + // 计算结果数据 + resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) + + // 提交结果数据 + _ = flow.CommitRow(resultStr) + } + + return nil +} + +func funcName2Handler(ctx context.Context, flow kis.Flow) error { + + for _, row := range flow.Input() { + str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) + fmt.Println(str) + } + + return nil +} + +func TestNewKisPool(t *testing.T) { + + ctx := context.Background() + + // 0. 注册Function + kis.Pool().FaaS("funcName1", funcName1Handler) + kis.Pool().FaaS("funcName2", funcName2Handler) + + // 1. 创建2个KisFunction配置实例 + source1 := config.KisSource{ + Name: "公众号抖音商城户订单数据", + Must: []string{"order_id", "user_id"}, + } + + source2 := config.KisSource{ + Name: "用户订单错误率", + Must: []string{"order_id", "user_id"}, + } + + myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil) + if myFuncConfig1 == nil { + panic("myFuncConfig1 is nil") + } + + myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil) + if myFuncConfig2 == nil { + panic("myFuncConfig2 is nil") + } + + // 2. 创建一个 KisFlow 配置实例 + myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable) + + // 3. 创建一个KisFlow对象 + flow1 := flow.NewKisFlow(myFlowConfig1) + + // 4. 拼接Functioin 到 Flow 上 + if err := flow1.Link(myFuncConfig1, nil); err != nil { + panic(err) + } + if err := flow1.Link(myFuncConfig2, nil); err != nil { + panic(err) + } + + // 5. 提交原始数据 + _ = flow1.CommitRow("This is Data1 from Test") + _ = flow1.CommitRow("This is Data2 from Test") + _ = flow1.CommitRow("This is Data3 from Test") + + // 6. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +}