From f330c94f4ce948cb59bdbb0b1dd768aa0b6e310f Mon Sep 17 00:00:00 2001 From: aceld Date: Tue, 23 Jan 2024 16:21:02 +0800 Subject: [PATCH] add Action module --- common/const.go | 4 ++++ flow/kis_flow.go | 23 +++++++++++++-------- flow/kis_flow_data.go | 47 ++++++++++++++++++++++++++++++++++++++----- kis/flow.go | 2 ++ kis/pool.go | 2 +- 5 files changed, 64 insertions(+), 14 deletions(-) diff --git a/common/const.go b/common/const.go index 6a637fb..e81df25 100644 --- a/common/const.go +++ b/common/const.go @@ -54,3 +54,7 @@ const ( TIDB KisConnType = "tidb" ES KisConnType = "es" ) + +const ( + ActionNoJump = "NoJump" +) diff --git a/flow/kis_flow.go b/flow/kis_flow.go index be60604..f6baa56 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -37,6 +37,8 @@ type KisFlow struct { buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch data common.KisDataMap // 流式计算各个层级的数据源 inPut common.KisRowArr // 当前Function的计算输入数据 + abort bool // 是否中断Flow + action kis.Action // 当前Flow所携带的Action动作 } // NewKisFlow 创建一个KisFlow. @@ -149,6 +151,7 @@ func (flow *KisFlow) Run(ctx context.Context) error { var fn kis.Function fn = flow.FlowHead + flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { //flow被配置关闭 @@ -164,7 +167,7 @@ func (flow *KisFlow) Run(ctx context.Context) error { } //流式链式调用 - for fn != nil { + for fn != nil && flow.abort == false { // flow记录当前执行到的Function 标记 fid := fn.GetId() @@ -184,21 +187,25 @@ func (flow *KisFlow) Run(ctx context.Context) error { return err } else { //Success - - if err := flow.commitCurData(ctx); err != nil { + fn, err = flow.dealAction(ctx, fn) + if err != nil { return err } - - // 更新上一层FuncitonId游标 - flow.PrevFunctionId = flow.ThisFunctionId - - fn = fn.Next() } } return nil } +// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 +func (flow *KisFlow) Next(acts ...kis.ActionFunc) error { + + // 加载Function FaaS 传递的 Action动作 + flow.action = kis.LoadActions(acts) + + return nil +} + func (flow *KisFlow) GetName() string { return flow.Name } diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go index 627b3ab..b5adf5d 100644 --- a/flow/kis_flow_data.go +++ b/flow/kis_flow_data.go @@ -62,26 +62,63 @@ func (flow *KisFlow) getCurData() (common.KisRowArr, error) { return flow.data[flow.PrevFunctionId], nil } +// commitReuseData +func (flow *KisFlow) commitReuseData(ctx context.Context) error { + + // 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环 + if len(flow.data[flow.PrevFunctionId]) == 0 { + flow.abort = true + return nil + } + + // 本层结果数据等于上层结果数据(复用上层结果数据到本层) + flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId] + + // 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层) + flow.buffer = flow.buffer[0:0] + + log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) + + return nil +} + +func (flow *KisFlow) commitVoidData(ctx context.Context) error { + if len(flow.buffer) != 0 { + return nil + } + + // 制作空数据 + batch := make(common.KisRowArr, 0) + + // 将本层计算的缓冲数据提交到本层结果数据中 + flow.data[flow.ThisFunctionId] = batch + + log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) + + return nil +} + //commitCurData 提交Flow当前执行Function的结果数据 func (flow *KisFlow) commitCurData(ctx context.Context) error { - //判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环 + // 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环 if len(flow.buffer) == 0 { + flow.abort = true return nil } // 制作批量数据batch batch := make(common.KisRowArr, 0, len(flow.buffer)) - //如果strBuf为空,则没有添加任何数据 + // 如果strBuf为空,则没有添加任何数据 for _, row := range flow.buffer { batch = append(batch, row) } - //将本层计算的缓冲数据提交到本层结果数据中 + // 将本层计算的缓冲数据提交到本层结果数据中 flow.data[flow.ThisFunctionId] = batch - //清空缓冲Buf + // 清空缓冲Buf flow.buffer = flow.buffer[0:0] log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) @@ -89,7 +126,7 @@ func (flow *KisFlow) commitCurData(ctx context.Context) error { return nil } -//ClearData 清空flow所有数据 +// clearData 清空flow所有数据 func (flow *KisFlow) clearData(data common.KisDataMap) { for k := range data { delete(data, k) diff --git a/kis/flow.go b/kis/flow.go index 846db96..86d7f47 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -29,4 +29,6 @@ type Flow interface { GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig + // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 + Next(acts ...ActionFunc) error } diff --git a/kis/pool.go b/kis/pool.go index ba35857..75e275d 100644 --- a/kis/pool.go +++ b/kis/pool.go @@ -62,7 +62,7 @@ func (pool *kisPool) AddFlow(name string, flow Flow) { panic(errString) } - log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name) + log.Logger().InfoF("Add FlowRouter FlowName=%s", name) } func (pool *kisPool) GetFlow(name string) Flow {