From 41ea72741671fc32ce8807f56422eb4256eec656 Mon Sep 17 00:00:00 2001 From: aceld Date: Fri, 29 Mar 2024 18:07:57 +0800 Subject: [PATCH] fix: use Link Fork dosen't work --- file/config_import.go | 2 +- flow/kis_flow.go | 22 ++++++++++++-- kis/flow.go | 4 ++- test/kis_fork_test.go | 42 ++++++++++++++++++++++++-- test/load_conf/flow/flow-FlowFork1.yml | 10 ++++++ 5 files changed, 72 insertions(+), 8 deletions(-) create mode 100644 test/load_conf/flow/flow-FlowFork1.yml diff --git a/file/config_import.go b/file/config_import.go index a552e1a..122c9cc 100644 --- a/file/config_import.go +++ b/file/config_import.go @@ -164,7 +164,7 @@ func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, } // flow add function - if err := newFlow.Link(funcConfig, fp.Params); err != nil { + if err := newFlow.AppendNewFunction(funcConfig, fp.Params); err != nil { return err } } diff --git a/flow/kis_flow.go b/flow/kis_flow.go index 0f4320b..1ca0099 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -90,10 +90,10 @@ func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { for _, fp := range flow.Conf.Flows { if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok { // 当前function没有配置Params - newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil) + _ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), nil) } else { // 当前function有配置Params - newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) + _ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) } } @@ -103,10 +103,26 @@ func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { return newFlow } -// Link 将Function链接到Flow中 +// Link 将Function链接到Flow中, 同时会将Function的配置参数添加到Flow的配置中 // fConf: 当前Function策略 // fParams: 当前Flow携带的Function动态参数 func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error { + + // Flow 添加Function + _ = flow.AppendNewFunction(fConf, fParams) + + // FlowConfig 添加Function + flowFuncParam := config.KisFlowFunctionParam{ + FuncName: fConf.FName, + Params: fParams, + } + flow.Conf.AppendFunctionConfig(flowFuncParam) + + return nil +} + +// AppendNewFunction 将一个新的Function追加到到Flow中 +func (flow *KisFlow) AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error { // 创建Function实例 f := function.NewKisFunction(flow, fConf) diff --git a/kis/flow.go b/kis/flow.go index 25b8c86..11d4b3e 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -10,8 +10,10 @@ import ( type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error - // Link 将Flow中的Function按照配置文件中的配置进行连接 + // Link 将Flow中的Function按照配置文件中的配置进行连接, 同时Flow的配置也会更新 Link(fConf *config.KisFuncConfig, fParams config.FParam) error + // AppendNewFunction 将一个新的Function追加到到Flow中 + AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交) diff --git a/test/kis_fork_test.go b/test/kis_fork_test.go index a139b5e..eb5bb16 100644 --- a/test/kis_fork_test.go +++ b/test/kis_fork_test.go @@ -2,7 +2,11 @@ package test import ( "context" + "fmt" + "github.com/aceld/kis-flow/common" + "github.com/aceld/kis-flow/config" "github.com/aceld/kis-flow/file" + "github.com/aceld/kis-flow/flow" "github.com/aceld/kis-flow/kis" "testing" ) @@ -16,14 +20,46 @@ func TestForkFlow(t *testing.T) { } // 2. 获取Flow - flow1 := kis.Pool().GetFlow("flowName1") + flow1 := kis.Pool().GetFlow("flowFork1") + + fmt.Println("----> flow1: ", flow1.GetFuncParamsAllFuncs()) flow1Clone1 := flow1.Fork(ctx) + fmt.Println("----> flow1Clone1: ", flow1Clone1.GetFuncParamsAllFuncs()) + + // 3. 提交原始数据 + _ = flow1Clone1.CommitRow("This is Data1 from Test") + + // 4. 执行flow1 + if err := flow1Clone1.Run(ctx); err != nil { + panic(err) + } +} + +func TestForkFlowWithLink(t *testing.T) { + ctx := context.Background() + + // Create a new flow configuration + myFlowConfig1 := config.NewFlowConfig("flowFork1", common.FlowEnable) + + // Create new function configuration + func1Config := config.NewFuncConfig("funcName1", common.V, nil, nil) + func3Config := config.NewFuncConfig("funcName3", common.E, nil, nil) + + // Create a new flow + flow1 := flow.NewKisFlow(myFlowConfig1) + + _ = flow1.Link(func1Config, config.FParam{"school": "TsingHua1", "country": "China1"}) + _ = flow1.Link(func3Config, config.FParam{"school": "TsingHua3", "country": "China3"}) + + fmt.Println("----> flow1: ", flow1.GetFuncParamsAllFuncs()) + + flow1Clone1 := flow1.Fork(ctx) + + fmt.Println("----> flow1Clone1: ", flow1Clone1.GetFuncParamsAllFuncs()) // 3. 提交原始数据 _ = flow1Clone1.CommitRow("This is Data1 from Test") - _ = flow1Clone1.CommitRow("This is Data2 from Test") - _ = flow1Clone1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1Clone1.Run(ctx); err != nil { diff --git a/test/load_conf/flow/flow-FlowFork1.yml b/test/load_conf/flow/flow-FlowFork1.yml new file mode 100644 index 0000000..d4c718e --- /dev/null +++ b/test/load_conf/flow/flow-FlowFork1.yml @@ -0,0 +1,10 @@ +kistype: flow +status: 1 +flow_name: flowFork1 +flows: + - fname: funcName1 + params: + myKey1: flowValue1-1 + - fname: funcName3 + params: + myKey1: flowValue3-1