add flow fork

This commit is contained in:
aceld 2024-02-04 16:27:28 +08:00
parent d093901741
commit 188b08f33c
4 changed files with 80 additions and 0 deletions

View File

@ -76,6 +76,30 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
return flow
}
// Fork 得到Flow的一个副本(深拷贝)
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
config := flow.Conf
// 通过之前的配置生成一个新的Flow
newFlow := NewKisFlow(config)
for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
//当前function没有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else {
//当前function有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
}
}
log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams)
log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs())
return newFlow
}
// Link 将Function链接到Flow中
// fConf: 当前Function策略
// fParams: 当前Flow携带的Function动态参数

View File

@ -200,3 +200,11 @@ func (flow *KisFlow) GetFuncParamAll() config.FParam {
return param
}
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams取出全部Key-Value
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}

View File

@ -44,4 +44,8 @@ type Flow interface {
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数取出全部Key-Value
GetFuncParamAll() config.FParam
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams取出全部Key-Value
GetFuncParamsAllFuncs() map[string]config.FParam
// Fork 得到Flow的一个副本(深拷贝)
Fork(ctx context.Context) Flow
}

44
test/kis_fork_test.go Normal file
View File

@ -0,0 +1,44 @@
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestForkFlow(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
flow1Clone1 := flow1.Fork(ctx)
// 3. 提交原始数据
_ = flow1Clone1.CommitRow("This is Data1 from Test")
_ = flow1Clone1.CommitRow("This is Data2 from Test")
_ = flow1Clone1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}
}