diff --git a/flow/kis_flow.go b/flow/kis_flow.go index a883670..7fcaac5 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -76,6 +76,30 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { return flow } +// Fork 得到Flow的一个副本(深拷贝) +func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { + + config := flow.Conf + + // 通过之前的配置生成一个新的Flow + newFlow := NewKisFlow(config) + + for _, fp := range flow.Conf.Flows { + if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok { + //当前function没有配置Params + newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil) + } else { + //当前function有配置Params + newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) + } + } + + log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams) + log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs()) + + return newFlow +} + // Link 将Function链接到Flow中 // fConf: 当前Function策略 // fParams: 当前Flow携带的Function动态参数 diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go index dd223ab..0dffdcb 100644 --- a/flow/kis_flow_data.go +++ b/flow/kis_flow_data.go @@ -200,3 +200,11 @@ func (flow *KisFlow) GetFuncParamAll() config.FParam { return param } + +// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value +func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam { + flow.fplock.RLock() + defer flow.fplock.RUnlock() + + return flow.funcParams +} diff --git a/kis/flow.go b/kis/flow.go index ddd8a43..6c631f1 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -44,4 +44,8 @@ type Flow interface { GetFuncParam(key string) string // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value GetFuncParamAll() config.FParam + // GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value + GetFuncParamsAllFuncs() map[string]config.FParam + // Fork 得到Flow的一个副本(深拷贝) + Fork(ctx context.Context) Flow } diff --git a/test/kis_fork_test.go b/test/kis_fork_test.go new file mode 100644 index 0000000..718b3f7 --- /dev/null +++ b/test/kis_fork_test.go @@ -0,0 +1,44 @@ +package test + +import ( + "context" + "kis-flow/common" + "kis-flow/file" + "kis-flow/kis" + "kis-flow/test/caas" + "kis-flow/test/faas" + "testing" +) + +func TestForkFlow(t *testing.T) { + ctx := context.Background() + + // 0. 注册Function 回调业务 + kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) + kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) + kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) + + // 0. 注册ConnectorInit 和 Connector 回调业务 + kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) + kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) + + // 1. 加载配置文件并构建Flow + if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + panic(err) + } + + // 2. 获取Flow + flow1 := kis.Pool().GetFlow("flowName1") + + flow1Clone1 := flow1.Fork(ctx) + + // 3. 提交原始数据 + _ = flow1Clone1.CommitRow("This is Data1 from Test") + _ = flow1Clone1.CommitRow("This is Data2 from Test") + _ = flow1Clone1.CommitRow("This is Data3 from Test") + + // 4. 执行flow1 + if err := flow1Clone1.Run(ctx); err != nil { + panic(err) + } +}