Merge pull request #16 from aceld/feature/aceld

fix: use Link Fork dosen't work
This commit is contained in:
刘丹冰 2024-03-29 18:09:14 +08:00 committed by GitHub
commit 8e9b124ec4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 72 additions and 8 deletions

View File

@ -164,7 +164,7 @@ func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow,
}
// flow add function
if err := newFlow.Link(funcConfig, fp.Params); err != nil {
if err := newFlow.AppendNewFunction(funcConfig, fp.Params); err != nil {
return err
}
}

View File

@ -90,10 +90,10 @@ func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
// 当前function没有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
_ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else {
// 当前function有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
_ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
}
}
@ -103,10 +103,26 @@ func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
return newFlow
}
// Link 将Function链接到Flow中
// Link 将Function链接到Flow中, 同时会将Function的配置参数添加到Flow的配置中
// fConf: 当前Function策略
// fParams: 当前Flow携带的Function动态参数
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
// Flow 添加Function
_ = flow.AppendNewFunction(fConf, fParams)
// FlowConfig 添加Function
flowFuncParam := config.KisFlowFunctionParam{
FuncName: fConf.FName,
Params: fParams,
}
flow.Conf.AppendFunctionConfig(flowFuncParam)
return nil
}
// AppendNewFunction 将一个新的Function追加到到Flow中
func (flow *KisFlow) AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error {
// 创建Function实例
f := function.NewKisFunction(flow, fConf)

View File

@ -10,8 +10,10 @@ import (
type Flow interface {
// Run 调度Flow依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接
// Link 将Flow中的Function按照配置文件中的配置进行连接, 同时Flow的配置也会更新
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// AppendNewFunction 将一个新的Function追加到到Flow中
AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交)

View File

@ -2,7 +2,11 @@ package test
import (
"context"
"fmt"
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/flow"
"github.com/aceld/kis-flow/kis"
"testing"
)
@ -16,14 +20,46 @@ func TestForkFlow(t *testing.T) {
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
flow1 := kis.Pool().GetFlow("flowFork1")
fmt.Println("----> flow1: ", flow1.GetFuncParamsAllFuncs())
flow1Clone1 := flow1.Fork(ctx)
fmt.Println("----> flow1Clone1: ", flow1Clone1.GetFuncParamsAllFuncs())
// 3. 提交原始数据
_ = flow1Clone1.CommitRow("This is Data1 from Test")
// 4. 执行flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}
}
func TestForkFlowWithLink(t *testing.T) {
ctx := context.Background()
// Create a new flow configuration
myFlowConfig1 := config.NewFlowConfig("flowFork1", common.FlowEnable)
// Create new function configuration
func1Config := config.NewFuncConfig("funcName1", common.V, nil, nil)
func3Config := config.NewFuncConfig("funcName3", common.E, nil, nil)
// Create a new flow
flow1 := flow.NewKisFlow(myFlowConfig1)
_ = flow1.Link(func1Config, config.FParam{"school": "TsingHua1", "country": "China1"})
_ = flow1.Link(func3Config, config.FParam{"school": "TsingHua3", "country": "China3"})
fmt.Println("----> flow1: ", flow1.GetFuncParamsAllFuncs())
flow1Clone1 := flow1.Fork(ctx)
fmt.Println("----> flow1Clone1: ", flow1Clone1.GetFuncParamsAllFuncs())
// 3. 提交原始数据
_ = flow1Clone1.CommitRow("This is Data1 from Test")
_ = flow1Clone1.CommitRow("This is Data2 from Test")
_ = flow1Clone1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1Clone1.Run(ctx); err != nil {

View File

@ -0,0 +1,10 @@
kistype: flow
status: 1
flow_name: flowFork1
flows:
- fname: funcName1
params:
myKey1: flowValue1-1
- fname: funcName3
params:
myKey1: flowValue3-1