From 0ffbf3f1abad94835dda214b898fa8548f778075 Mon Sep 17 00:00:00 2001 From: aceld Date: Wed, 3 Jan 2024 17:22:35 +0800 Subject: [PATCH] add kis flow data model --- common/const.go | 2 +- common/data_type.go | 14 ++++++ flow/kis_flow.go | 41 +++++++++++++++- flow/kis_flow_data.go | 97 ++++++++++++++++++++++++++++++++++++++ function/kis_function_c.go | 10 +++- function/kis_function_e.go | 5 ++ kis/flow.go | 5 ++ test/kis_flow_test.go | 49 +++++++++++++++++++ 8 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 common/data_type.go create mode 100644 flow/kis_flow_data.go diff --git a/common/const.go b/common/const.go index 635b0e7..6a637fb 100644 --- a/common/const.go +++ b/common/const.go @@ -1,6 +1,6 @@ package common -// KisIdType 用户生成KisId的字符串前缀 +// 用户生成KisId的字符串前缀 const ( KisIdTypeFlow = "flow" KisIdTypeConnnector = "conn" diff --git a/common/data_type.go b/common/data_type.go new file mode 100644 index 0000000..35ed934 --- /dev/null +++ b/common/data_type.go @@ -0,0 +1,14 @@ +package common + +// KisRow 一行数据 +type KisRow interface{} + +// KisRowArr 一次业务的批量数据 +type KisRowArr []KisRow + +/* + KisDataMap 当前Flow承载的全部数据 + key : 数据所在的Function ID + value: 对应的KisRow +*/ +type KisDataMap map[string]KisRowArr diff --git a/flow/kis_flow.go b/flow/kis_flow.go index ff822e2..3009747 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -8,6 +8,7 @@ import ( "kis-flow/function" "kis-flow/id" "kis-flow/kis" + "kis-flow/log" "sync" ) @@ -28,8 +29,13 @@ type KisFlow struct { PrevFunctionId string // 当前执行到的Function 上一层FunctionID(策略配置ID) // Function列表参数 - funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例NsID, value:FParam + funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam fplock sync.RWMutex // 管理funcParams的读写锁 + + // 数据 + buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch + data common.KisDataMap // 流式计算各个层级的数据源 + inPut common.KisRowArr // 当前Function的计算输入数据 } // NewKisFlow 创建一个KisFlow. @@ -46,6 +52,9 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow.Funcs = make(map[string]kis.Function) flow.funcParams = make(map[string]config.FParam) + // 数据data + flow.data = make(common.KisDataMap) + return flow } @@ -124,13 +133,43 @@ func (flow *KisFlow) Run(ctx context.Context) error { return nil } + // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function + flow.PrevFunctionId = common.FunctionIdFirstVirtual + + // 提交数据流原始数据 + if err := flow.commitSrcData(ctx); err != nil { + return err + } + //流式链式调用 for fn != nil { + + // flow记录当前执行到的Function 标记 + fid := fn.GetId() + flow.ThisFunction = fn + flow.ThisFunctionId = fid + + // 得到当前Function要处理与的源数据 + if inputData, err := flow.getCurData(); err != nil { + log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) + return err + } else { + flow.inPut = inputData + } + if err := fn.Call(ctx, flow); err != nil { //Error return err } else { //Success + + if err := flow.commitCurData(ctx); err != nil { + return err + } + + // 更新上一层FuncitonId游标 + flow.PrevFunctionId = flow.ThisFunctionId + fn = fn.Next() } } diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go new file mode 100644 index 0000000..627b3ab --- /dev/null +++ b/flow/kis_flow_data.go @@ -0,0 +1,97 @@ +package flow + +import ( + "context" + "errors" + "fmt" + "kis-flow/common" + "kis-flow/log" +) + +// CommitRow 提交Flow数据, 一行数据,如果是批量数据可以提交多次 +func (flow *KisFlow) CommitRow(row interface{}) error { + + flow.buffer = append(flow.buffer, row) + + return nil +} + +// Input 得到flow当前执行Function的输入源数据 +func (flow *KisFlow) Input() common.KisRowArr { + return flow.inPut +} + +// commitSrcData 提交当前Flow的数据源数据, 表示首次提交当前Flow的原始数据源 +// 将flow的临时数据buffer,提交到flow的data中,(data为各个Function层级的源数据备份) +// 会清空之前所有的flow数据 +func (flow *KisFlow) commitSrcData(ctx context.Context) error { + + // 制作批量数据batch + dataCnt := len(flow.buffer) + batch := make(common.KisRowArr, 0, dataCnt) + + for _, row := range flow.buffer { + batch = append(batch, row) + } + + // 清空之前所有数据 + flow.clearData(flow.data) + + // 首次提交,记录flow原始数据 + // 因为首次提交,所以PrevFunctionId为FirstVirtual 因为没有上一层Function + flow.data[common.FunctionIdFirstVirtual] = batch + + // 清空缓冲Buf + flow.buffer = flow.buffer[0:0] + + log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) + + return nil +} + +// getCurData 获取flow当前Function层级的输入数据 +func (flow *KisFlow) getCurData() (common.KisRowArr, error) { + if flow.PrevFunctionId == "" { + return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set")) + } + + if _, ok := flow.data[flow.PrevFunctionId]; !ok { + return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId)) + } + + return flow.data[flow.PrevFunctionId], nil +} + +//commitCurData 提交Flow当前执行Function的结果数据 +func (flow *KisFlow) commitCurData(ctx context.Context) error { + + //判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环 + if len(flow.buffer) == 0 { + return nil + } + + // 制作批量数据batch + batch := make(common.KisRowArr, 0, len(flow.buffer)) + + //如果strBuf为空,则没有添加任何数据 + for _, row := range flow.buffer { + batch = append(batch, row) + } + + //将本层计算的缓冲数据提交到本层结果数据中 + flow.data[flow.ThisFunctionId] = batch + + //清空缓冲Buf + flow.buffer = flow.buffer[0:0] + + log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) + + return nil +} + +//ClearData 清空flow所有数据 +func (flow *KisFlow) clearData(data common.KisDataMap) { + for k := range data { + delete(data, k) + } +} diff --git a/function/kis_function_c.go b/function/kis_function_c.go index a52daf6..257e3fa 100644 --- a/function/kis_function_c.go +++ b/function/kis_function_c.go @@ -2,6 +2,7 @@ package function import ( "context" + "fmt" "kis-flow/kis" "kis-flow/log" ) @@ -13,7 +14,14 @@ type KisFunctionC struct { func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow) - // TODO 调用具体的Function执行方法 + //TODO 调用具体的Function执行方法 + //处理业务数据 + for i, row := range flow.Input() { + fmt.Printf("In KisFunctionC, row = %+v\n", row) + + // 提交本层计算结果数据 + _ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i)) + } return nil } diff --git a/function/kis_function_e.go b/function/kis_function_e.go index 7a87817..6a32c13 100644 --- a/function/kis_function_e.go +++ b/function/kis_function_e.go @@ -2,6 +2,7 @@ package function import ( "context" + "fmt" "kis-flow/kis" "kis-flow/log" ) @@ -14,6 +15,10 @@ func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 + //处理业务数据 + for _, row := range flow.Input() { + fmt.Printf("In KisFunctionE, row = %+v\n", row) + } return nil } diff --git a/kis/flow.go b/kis/flow.go index b142ef5..7b2a46d 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -2,6 +2,7 @@ package kis import ( "context" + "kis-flow/common" "kis-flow/config" ) @@ -10,4 +11,8 @@ type Flow interface { Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error + // CommitRow 提交Flow数据到即将执行的Function层 + CommitRow(row interface{}) error + // Input 得到flow当前执行Function的输入源数据 + Input() common.KisRowArr } diff --git a/test/kis_flow_test.go b/test/kis_flow_test.go index 1ed7ddc..557d1ad 100644 --- a/test/kis_flow_test.go +++ b/test/kis_flow_test.go @@ -51,3 +51,52 @@ func TestNewKisFlow(t *testing.T) { panic(err) } } + +func TestNewKisFlowData(t *testing.T) { + ctx := context.Background() + + // 1. 创建2个KisFunction配置实例 + source1 := config.KisSource{ + Name: "公众号抖音商城户订单数据", + Must: []string{"order_id", "user_id"}, + } + + source2 := config.KisSource{ + Name: "用户订单错误率", + Must: []string{"order_id", "user_id"}, + } + + myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil) + if myFuncConfig1 == nil { + panic("myFuncConfig1 is nil") + } + + myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil) + if myFuncConfig2 == nil { + panic("myFuncConfig2 is nil") + } + + // 2. 创建一个 KisFlow 配置实例 + myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable) + + // 3. 创建一个KisFlow对象 + flow1 := flow.NewKisFlow(myFlowConfig1) + + // 4. 拼接Functioin 到 Flow 上 + if err := flow1.Link(myFuncConfig1, nil); err != nil { + panic(err) + } + if err := flow1.Link(myFuncConfig2, nil); err != nil { + panic(err) + } + + // 5. 提交原始数据 + _ = flow1.CommitRow("This is Data1 from Test") + _ = flow1.CommitRow("This is Data2 from Test") + _ = flow1.CommitRow("This is Data3 from Test") + + // 6. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +}