add Action module

This commit is contained in:
aceld 2024-01-23 16:21:02 +08:00
parent c066ebae48
commit f330c94f4c
5 changed files with 64 additions and 14 deletions

View File

@ -54,3 +54,7 @@ const (
TIDB KisConnType = "tidb"
ES KisConnType = "es"
)
const (
ActionNoJump = "NoJump"
)

View File

@ -37,6 +37,8 @@ type KisFlow struct {
buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
data common.KisDataMap // 流式计算各个层级的数据源
inPut common.KisRowArr // 当前Function的计算输入数据
abort bool // 是否中断Flow
action kis.Action // 当前Flow所携带的Action动作
}
// NewKisFlow 创建一个KisFlow.
@ -149,6 +151,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
fn = flow.FlowHead
flow.abort = false
if flow.Conf.Status == int(common.FlowDisable) {
//flow被配置关闭
@ -164,7 +167,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
}
//流式链式调用
for fn != nil {
for fn != nil && flow.abort == false {
// flow记录当前执行到的Function 标记
fid := fn.GetId()
@ -184,21 +187,25 @@ func (flow *KisFlow) Run(ctx context.Context) error {
return err
} else {
//Success
if err := flow.commitCurData(ctx); err != nil {
fn, err = flow.dealAction(ctx, fn)
if err != nil {
return err
}
// 更新上一层FuncitonId游标
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
}
}
return nil
}
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
// 加载Function FaaS 传递的 Action动作
flow.action = kis.LoadActions(acts)
return nil
}
func (flow *KisFlow) GetName() string {
return flow.Name
}

View File

@ -62,11 +62,48 @@ func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
return flow.data[flow.PrevFunctionId], nil
}
// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
// 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
if len(flow.data[flow.PrevFunctionId]) == 0 {
flow.abort = true
return nil
}
// 本层结果数据等于上层结果数据(复用上层结果数据到本层)
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
// 清空缓冲Buf (如果是ReuseData选项那么提交的全部数据都将不会携带到下一层)
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
func (flow *KisFlow) commitVoidData(ctx context.Context) error {
if len(flow.buffer) != 0 {
return nil
}
// 制作空数据
batch := make(common.KisRowArr, 0)
// 将本层计算的缓冲数据提交到本层结果数据中
flow.data[flow.ThisFunctionId] = batch
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
//commitCurData 提交Flow当前执行Function的结果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {
// 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
if len(flow.buffer) == 0 {
flow.abort = true
return nil
}
@ -89,7 +126,7 @@ func (flow *KisFlow) commitCurData(ctx context.Context) error {
return nil
}
//ClearData 清空flow所有数据
// clearData 清空flow所有数据
func (flow *KisFlow) clearData(data common.KisDataMap) {
for k := range data {
delete(data, k)

View File

@ -29,4 +29,6 @@ type Flow interface {
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
}

View File

@ -62,7 +62,7 @@ func (pool *kisPool) AddFlow(name string, flow Flow) {
panic(errString)
}
log.Logger().InfoF("Add FlowRouter FlowName=%s\n", name)
log.Logger().InfoF("Add FlowRouter FlowName=%s", name)
}
func (pool *kisPool) GetFlow(name string) Flow {