mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-22 23:20:24 +08:00
237 lines
6.6 KiB
Go
237 lines
6.6 KiB
Go
package flow
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/aceld/kis-flow/common"
|
|
"github.com/aceld/kis-flow/config"
|
|
"github.com/aceld/kis-flow/log"
|
|
"github.com/aceld/kis-flow/metrics"
|
|
)
|
|
|
|
// CommitRow submits a single row of data to the Flow; multiple rows can be submitted multiple times
|
|
func (flow *KisFlow) CommitRow(row interface{}) error {
|
|
|
|
flow.buffer = append(flow.buffer, row)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CommitRowBatch submits a batch of data to the Flow
|
|
func (flow *KisFlow) CommitRowBatch(rows interface{}) error {
|
|
v := reflect.ValueOf(rows)
|
|
if v.Kind() != reflect.Slice {
|
|
return fmt.Errorf("Commit Data is not a slice")
|
|
}
|
|
|
|
for i := 0; i < v.Len(); i++ {
|
|
row := v.Index(i).Interface().(common.KisRow)
|
|
flow.buffer = append(flow.buffer, row)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Input gets the input data for the currently executing Function in the Flow
|
|
func (flow *KisFlow) Input() common.KisRowArr {
|
|
return flow.inPut
|
|
}
|
|
|
|
// commitSrcData submits the data source data for the current Flow, indicating the first submission of the original data source for the current Flow
|
|
// The flow's temporary data buffer is submitted to the flow's data (data is the source data backup for each Function level)
|
|
// All previous flow data will be cleared
|
|
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
|
|
|
|
// Create a batch of data
|
|
dataCnt := len(flow.buffer)
|
|
batch := make(common.KisRowArr, 0, dataCnt)
|
|
|
|
for _, row := range flow.buffer {
|
|
batch = append(batch, row)
|
|
}
|
|
|
|
// Clear all previous data
|
|
flow.clearData(flow.data)
|
|
|
|
// Record the original data for the flow for the first time
|
|
// Because it is the first submission, PrevFunctionId is FirstVirtual because there is no upper Function
|
|
flow.data[common.FunctionIDFirstVirtual] = batch
|
|
|
|
// Clear the buffer
|
|
flow.buffer = flow.buffer[0:0]
|
|
|
|
// The first submission of data source data, for statistical total data
|
|
if config.GlobalConfig.EnableProm == true {
|
|
// Statistics for total data Metrics.DataTotal accumulates by 1
|
|
metrics.Metrics.DataTotal.Add(float64(dataCnt))
|
|
|
|
// Statistics for current Flow quantity index
|
|
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
|
|
}
|
|
|
|
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
|
|
|
return nil
|
|
}
|
|
|
|
// getCurData gets the input data for the current Function level of the flow
|
|
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
|
|
if flow.PrevFunctionId == "" {
|
|
return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
|
|
}
|
|
|
|
if _, ok := flow.data[flow.PrevFunctionId]; !ok {
|
|
return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId))
|
|
}
|
|
|
|
return flow.data[flow.PrevFunctionId], nil
|
|
}
|
|
|
|
// commitReuseData
|
|
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
|
|
|
|
// Check if there are result data from the upper layer; if not, exit the current Flow Run loop
|
|
if len(flow.data[flow.PrevFunctionId]) == 0 {
|
|
flow.abort = true
|
|
return nil
|
|
}
|
|
|
|
// This layer's result data is equal to the upper layer's result data (reuse the upper layer's result data to this layer)
|
|
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
|
|
|
|
// Clear the buffer (If it is a ReuseData option, all the submitted data will not be carried to the next layer)
|
|
flow.buffer = flow.buffer[0:0]
|
|
|
|
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (flow *KisFlow) commitVoidData(ctx context.Context) error {
|
|
if len(flow.buffer) != 0 {
|
|
return nil
|
|
}
|
|
|
|
// Create empty data
|
|
batch := make(common.KisRowArr, 0)
|
|
|
|
// Submit the calculated buffer data of this layer to the result data of this layer
|
|
flow.data[flow.ThisFunctionId] = batch
|
|
|
|
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
|
|
|
return nil
|
|
}
|
|
|
|
// commitCurData submits the result data of the currently executing Function in the Flow
|
|
func (flow *KisFlow) commitCurData(ctx context.Context) error {
|
|
|
|
// Check if this layer's calculation has result data; if not, exit the current Flow Run loop
|
|
if len(flow.buffer) == 0 {
|
|
flow.abort = true
|
|
return nil
|
|
}
|
|
|
|
// Create a batch of data
|
|
batch := make(common.KisRowArr, 0, len(flow.buffer))
|
|
|
|
// If strBuf is empty, no data has been added
|
|
for _, row := range flow.buffer {
|
|
batch = append(batch, row)
|
|
}
|
|
|
|
// Submit the calculated buffer data of this layer to the result data of this layer
|
|
flow.data[flow.ThisFunctionId] = batch
|
|
|
|
// Clear the buffer
|
|
flow.buffer = flow.buffer[0:0]
|
|
|
|
log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
|
|
|
return nil
|
|
}
|
|
|
|
// clearData clears all flow data
|
|
func (flow *KisFlow) clearData(data common.KisDataMap) {
|
|
for k := range data {
|
|
delete(data, k)
|
|
}
|
|
}
|
|
|
|
func (flow *KisFlow) GetCacheData(key string) interface{} {
|
|
|
|
if data, found := flow.cache.Get(key); found {
|
|
return data
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
|
|
if Exp == common.DefaultExpiration {
|
|
flow.cache.Set(key, value, 0)
|
|
} else {
|
|
flow.cache.Set(key, value, Exp)
|
|
}
|
|
}
|
|
|
|
// GetMetaData gets the temporary data of the current Flow object
|
|
func (flow *KisFlow) GetMetaData(key string) interface{} {
|
|
flow.mLock.RLock()
|
|
defer flow.mLock.RUnlock()
|
|
|
|
data, ok := flow.metaData[key]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return data
|
|
}
|
|
|
|
// SetMetaData sets the temporary data of the current Flow object
|
|
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
|
|
flow.mLock.Lock()
|
|
defer flow.mLock.Unlock()
|
|
|
|
flow.metaData[key] = value
|
|
}
|
|
|
|
// GetFuncParam gets the default configuration parameters of the currently executing Function in the Flow, retrieves a key-value pair
|
|
func (flow *KisFlow) GetFuncParam(key string) string {
|
|
flow.fplock.RLock()
|
|
defer flow.fplock.RUnlock()
|
|
|
|
if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
|
|
if value, vok := param[key]; vok {
|
|
return value
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// GetFuncParamAll gets the default configuration parameters of the currently executing Function in the Flow, retrieves all Key-Value pairs
|
|
func (flow *KisFlow) GetFuncParamAll() config.FParam {
|
|
flow.fplock.RLock()
|
|
defer flow.fplock.RUnlock()
|
|
|
|
param, ok := flow.funcParams[flow.ThisFunctionId]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
return param
|
|
}
|
|
|
|
// GetFuncParamsAllFuncs gets the FuncParams of all Functions in the Flow, retrieves all Key-Value pairs
|
|
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
|
|
flow.fplock.RLock()
|
|
defer flow.fplock.RUnlock()
|
|
|
|
return flow.funcParams
|
|
}
|