kis-flow/flow/kis_flow_data.go
2024-04-16 14:54:11 +08:00

237 lines
6.6 KiB
Go

package flow
import (
"context"
"errors"
"fmt"
"reflect"
"time"
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/log"
"github.com/aceld/kis-flow/metrics"
)
// CommitRow submits a single row of data to the Flow; multiple rows can be submitted multiple times
func (flow *KisFlow) CommitRow(row interface{}) error {
flow.buffer = append(flow.buffer, row)
return nil
}
// CommitRowBatch submits a batch of data to the Flow
func (flow *KisFlow) CommitRowBatch(rows interface{}) error {
v := reflect.ValueOf(rows)
if v.Kind() != reflect.Slice {
return fmt.Errorf("Commit Data is not a slice")
}
for i := 0; i < v.Len(); i++ {
row := v.Index(i).Interface().(common.KisRow)
flow.buffer = append(flow.buffer, row)
}
return nil
}
// Input gets the input data for the currently executing Function in the Flow
func (flow *KisFlow) Input() common.KisRowArr {
return flow.inPut
}
// commitSrcData submits the data source data for the current Flow, indicating the first submission of the original data source for the current Flow
// The flow's temporary data buffer is submitted to the flow's data (data is the source data backup for each Function level)
// All previous flow data will be cleared
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// Create a batch of data
dataCnt := len(flow.buffer)
batch := make(common.KisRowArr, 0, dataCnt)
for _, row := range flow.buffer {
batch = append(batch, row)
}
// Clear all previous data
flow.clearData(flow.data)
// Record the original data for the flow for the first time
// Because it is the first submission, PrevFunctionId is FirstVirtual because there is no upper Function
flow.data[common.FunctionIDFirstVirtual] = batch
// Clear the buffer
flow.buffer = flow.buffer[0:0]
// The first submission of data source data, for statistical total data
if config.GlobalConfig.EnableProm == true {
// Statistics for total data Metrics.DataTotal accumulates by 1
metrics.Metrics.DataTotal.Add(float64(dataCnt))
// Statistics for current Flow quantity index
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
}
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
// getCurData gets the input data for the current Function level of the flow
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
if flow.PrevFunctionId == "" {
return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
}
if _, ok := flow.data[flow.PrevFunctionId]; !ok {
return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
}
return flow.data[flow.PrevFunctionId], nil
}
// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
// Check if there are result data from the upper layer; if not, exit the current Flow Run loop
if len(flow.data[flow.PrevFunctionId]) == 0 {
flow.abort = true
return nil
}
// This layer's result data is equal to the upper layer's result data (reuse the upper layer's result data to this layer)
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
// Clear the buffer (If it is a ReuseData option, all the submitted data will not be carried to the next layer)
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
func (flow *KisFlow) commitVoidData(ctx context.Context) error {
if len(flow.buffer) != 0 {
return nil
}
// Create empty data
batch := make(common.KisRowArr, 0)
// Submit the calculated buffer data of this layer to the result data of this layer
flow.data[flow.ThisFunctionId] = batch
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
// commitCurData submits the result data of the currently executing Function in the Flow
func (flow *KisFlow) commitCurData(ctx context.Context) error {
// Check if this layer's calculation has result data; if not, exit the current Flow Run loop
if len(flow.buffer) == 0 {
flow.abort = true
return nil
}
// Create a batch of data
batch := make(common.KisRowArr, 0, len(flow.buffer))
// If strBuf is empty, no data has been added
for _, row := range flow.buffer {
batch = append(batch, row)
}
// Submit the calculated buffer data of this layer to the result data of this layer
flow.data[flow.ThisFunctionId] = batch
// Clear the buffer
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
// clearData clears all flow data
func (flow *KisFlow) clearData(data common.KisDataMap) {
for k := range data {
delete(data, k)
}
}
func (flow *KisFlow) GetCacheData(key string) interface{} {
if data, found := flow.cache.Get(key); found {
return data
}
return nil
}
func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
if Exp == common.DefaultExpiration {
flow.cache.Set(key, value, 0)
} else {
flow.cache.Set(key, value, Exp)
}
}
// GetMetaData gets the temporary data of the current Flow object
func (flow *KisFlow) GetMetaData(key string) interface{} {
flow.mLock.RLock()
defer flow.mLock.RUnlock()
data, ok := flow.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData sets the temporary data of the current Flow object
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.mLock.Lock()
defer flow.mLock.Unlock()
flow.metaData[key] = value
}
// GetFuncParam gets the default configuration parameters of the currently executing Function in the Flow, retrieves a key-value pair
func (flow *KisFlow) GetFuncParam(key string) string {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
if value, vok := param[key]; vok {
return value
}
}
return ""
}
// GetFuncParamAll gets the default configuration parameters of the currently executing Function in the Flow, retrieves all Key-Value pairs
func (flow *KisFlow) GetFuncParamAll() config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
param, ok := flow.funcParams[flow.ThisFunctionId]
if !ok {
return nil
}
return param
}
// GetFuncParamsAllFuncs gets the FuncParams of all Functions in the Flow, retrieves all Key-Value pairs
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}