kis-flow/flow/kis_flow_data.go

237 lines
6.2 KiB
Go
Raw Normal View History

2024-01-03 17:22:35 +08:00
package flow
import (
"context"
"errors"
"fmt"
2024-03-26 14:54:50 +08:00
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/log"
"github.com/aceld/kis-flow/metrics"
2024-01-26 17:27:29 +08:00
"github.com/patrickmn/go-cache"
2024-03-26 16:47:34 +08:00
"reflect"
2024-01-26 17:27:29 +08:00
"time"
2024-01-03 17:22:35 +08:00
)
// CommitRow 提交Flow数据, 一行数据,如果是批量数据可以提交多次
func (flow *KisFlow) CommitRow(row interface{}) error {
flow.buffer = append(flow.buffer, row)
return nil
}
2024-03-26 16:47:34 +08:00
// CommitRowBatch 提交Flow数据, 批量数据
func (flow *KisFlow) CommitRowBatch(rows interface{}) error {
v := reflect.ValueOf(rows)
if v.Kind() != reflect.Slice {
return fmt.Errorf("Commit Data is not a slice")
}
for i := 0; i < v.Len(); i++ {
row := v.Index(i).Interface().(common.KisRow)
flow.buffer = append(flow.buffer, row)
}
return nil
}
2024-01-03 17:22:35 +08:00
// Input 得到flow当前执行Function的输入源数据
func (flow *KisFlow) Input() common.KisRowArr {
return flow.inPut
}
// commitSrcData 提交当前Flow的数据源数据, 表示首次提交当前Flow的原始数据源
// 将flow的临时数据buffer提交到flow的data中,(data为各个Function层级的源数据备份)
// 会清空之前所有的flow数据
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// 制作批量数据batch
dataCnt := len(flow.buffer)
batch := make(common.KisRowArr, 0, dataCnt)
for _, row := range flow.buffer {
batch = append(batch, row)
}
// 清空之前所有数据
flow.clearData(flow.data)
// 首次提交记录flow原始数据
// 因为首次提交所以PrevFunctionId为FirstVirtual 因为没有上一层Function
flow.data[common.FunctionIdFirstVirtual] = batch
// 清空缓冲Buf
flow.buffer = flow.buffer[0:0]
2024-03-04 14:53:29 +08:00
// 首次提交数据源数据,进行统计数据总量
if config.GlobalConfig.EnableProm == true {
// 统计数据总量 Metrics.DataTota 指标累计加1
metrics.Metrics.DataTotal.Add(float64(dataCnt))
2024-03-18 09:29:05 +08:00
//统计当前Flow数量指标
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
2024-03-04 14:53:29 +08:00
}
2024-01-03 17:22:35 +08:00
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
// getCurData 获取flow当前Function层级的输入数据
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
if flow.PrevFunctionId == "" {
return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
}
if _, ok := flow.data[flow.PrevFunctionId]; !ok {
return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
}
return flow.data[flow.PrevFunctionId], nil
}
2024-01-23 16:21:02 +08:00
// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
// 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
if len(flow.data[flow.PrevFunctionId]) == 0 {
flow.abort = true
return nil
}
// 本层结果数据等于上层结果数据(复用上层结果数据到本层)
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
// 清空缓冲Buf (如果是ReuseData选项那么提交的全部数据都将不会携带到下一层)
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
func (flow *KisFlow) commitVoidData(ctx context.Context) error {
if len(flow.buffer) != 0 {
return nil
}
// 制作空数据
batch := make(common.KisRowArr, 0)
// 将本层计算的缓冲数据提交到本层结果数据中
flow.data[flow.ThisFunctionId] = batch
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
2024-01-03 17:22:35 +08:00
//commitCurData 提交Flow当前执行Function的结果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {
2024-01-23 16:21:02 +08:00
// 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
2024-01-03 17:22:35 +08:00
if len(flow.buffer) == 0 {
2024-01-23 16:21:02 +08:00
flow.abort = true
2024-01-03 17:22:35 +08:00
return nil
}
// 制作批量数据batch
batch := make(common.KisRowArr, 0, len(flow.buffer))
2024-01-23 16:21:02 +08:00
// 如果strBuf为空则没有添加任何数据
2024-01-03 17:22:35 +08:00
for _, row := range flow.buffer {
batch = append(batch, row)
}
2024-01-23 16:21:02 +08:00
// 将本层计算的缓冲数据提交到本层结果数据中
2024-01-03 17:22:35 +08:00
flow.data[flow.ThisFunctionId] = batch
2024-01-23 16:21:02 +08:00
// 清空缓冲Buf
2024-01-03 17:22:35 +08:00
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
2024-01-23 16:21:02 +08:00
// clearData 清空flow所有数据
2024-01-03 17:22:35 +08:00
func (flow *KisFlow) clearData(data common.KisDataMap) {
for k := range data {
delete(data, k)
}
}
2024-01-26 17:27:29 +08:00
func (flow *KisFlow) GetCacheData(key string) interface{} {
if data, found := flow.cache.Get(key); found {
return data
}
return nil
}
func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
if Exp == common.DefaultExpiration {
flow.cache.Set(key, value, cache.DefaultExpiration)
} else {
flow.cache.Set(key, value, Exp)
}
}
// GetMetaData 得到当前Flow对象的临时数据
func (flow *KisFlow) GetMetaData(key string) interface{} {
flow.mLock.RLock()
defer flow.mLock.RUnlock()
data, ok := flow.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData 设置当前Flow对象的临时数据
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.mLock.Lock()
defer flow.mLock.Unlock()
flow.metaData[key] = value
}
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数取出一对key-value
func (flow *KisFlow) GetFuncParam(key string) string {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
if value, vok := param[key]; vok {
return value
}
}
return ""
}
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数取出全部Key-Value
func (flow *KisFlow) GetFuncParamAll() config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
param, ok := flow.funcParams[flow.ThisFunctionId]
if !ok {
return nil
}
return param
}
2024-02-04 16:27:28 +08:00
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams取出全部Key-Value
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}