mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-22 23:20:24 +08:00
358 lines
11 KiB
Go
358 lines
11 KiB
Go
package flow
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aceld/kis-flow/common"
|
|
"github.com/aceld/kis-flow/config"
|
|
"github.com/aceld/kis-flow/conn"
|
|
"github.com/aceld/kis-flow/function"
|
|
"github.com/aceld/kis-flow/id"
|
|
"github.com/aceld/kis-flow/kis"
|
|
"github.com/aceld/kis-flow/log"
|
|
"github.com/aceld/kis-flow/metrics"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
cache "github.com/patrickmn/go-cache"
|
|
)
|
|
|
|
// KisFlow is used to manage the context environment of the entire streaming computation.
|
|
type KisFlow struct {
|
|
// Basic information
|
|
Id string // Distributed instance ID of the Flow (used internally by KisFlow to distinguish different instances)
|
|
Name string // Readable name of the Flow
|
|
Conf *config.KisFlowConfig // Flow configuration policy
|
|
|
|
// List of Functions
|
|
Funcs map[string]kis.Function // All managed Function objects of the current flow, key: FunctionName
|
|
FlowHead kis.Function // Head of the Function list owned by the current Flow
|
|
FlowTail kis.Function // Tail of the Function list owned by the current Flow
|
|
flock sync.RWMutex // Lock for managing linked list insertion and reading
|
|
ThisFunction kis.Function // KisFunction object currently being executed in the Flow
|
|
ThisFunctionId string // ID of the Function currently being executed
|
|
PrevFunctionId string // ID of the previous layer Function
|
|
|
|
// Function list parameters
|
|
funcParams map[string]config.FParam // Custom fixed configuration parameters of the Flow in the current Function, Key: KisID of the function instance, value: FParam
|
|
fplock sync.RWMutex // Lock for managing funcParams read and write
|
|
|
|
// Data
|
|
buffer common.KisRowArr // Internal buffer used to temporarily store input byte data, one data is interface{}, multiple data is []interface{} i.e. KisBatch
|
|
data common.KisDataMap // Data sources at various levels of the streaming computation
|
|
inPut common.KisRowArr // Input data for the current Function computation
|
|
abort bool // Whether to abort the Flow
|
|
action kis.Action // Action carried by the current Flow
|
|
|
|
// Local cache of the flow
|
|
cache *cache.Cache // Temporary cache context environment of the Flow
|
|
|
|
// metaData of the flow
|
|
metaData map[string]interface{} // Custom temporary data of the Flow
|
|
mLock sync.RWMutex // Lock for managing metaData read and write
|
|
}
|
|
|
|
// NewKisFlow creates a KisFlow.
|
|
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
|
|
flow := new(KisFlow)
|
|
// Instance Id
|
|
flow.Id = id.KisID(common.KisIDTypeFlow)
|
|
|
|
// Basic information
|
|
flow.Name = conf.FlowName
|
|
flow.Conf = conf
|
|
|
|
// List of Functions
|
|
flow.Funcs = make(map[string]kis.Function)
|
|
flow.funcParams = make(map[string]config.FParam)
|
|
|
|
// Data
|
|
flow.data = make(common.KisDataMap)
|
|
|
|
// Initialize local cache
|
|
flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)
|
|
|
|
// Initialize temporary data
|
|
flow.metaData = make(map[string]interface{})
|
|
|
|
return flow
|
|
}
|
|
|
|
// Fork gets a copy (deep copy) of the Flow.
|
|
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
|
|
|
|
cfg := flow.Conf
|
|
|
|
// Generate a new Flow based on the previous configuration
|
|
newFlow := NewKisFlow(cfg)
|
|
|
|
for _, fp := range flow.Conf.Flows {
|
|
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetID()]; !ok {
|
|
// The current function has no Params configured
|
|
_ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), nil)
|
|
} else {
|
|
// The current function has configured Params
|
|
_ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
|
|
}
|
|
}
|
|
|
|
log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams)
|
|
log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs())
|
|
|
|
return newFlow
|
|
}
|
|
|
|
// Link links the Function to the Flow, and also adds the Function's configuration parameters to the Flow's configuration.
|
|
// fConf: Current Function strategy
|
|
// fParams: Dynamic parameters carried by the current Flow's Function
|
|
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
|
|
|
|
// Add Function to Flow
|
|
_ = flow.AppendNewFunction(fConf, fParams)
|
|
|
|
// Add Function to FlowConfig
|
|
flowFuncParam := config.KisFlowFunctionParam{
|
|
FuncName: fConf.FName,
|
|
Params: fParams,
|
|
}
|
|
flow.Conf.AppendFunctionConfig(flowFuncParam)
|
|
|
|
return nil
|
|
}
|
|
|
|
// AppendNewFunction appends a new Function to the Flow.
|
|
func (flow *KisFlow) AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error {
|
|
// Create Function instance
|
|
f := function.NewKisFunction(flow, fConf)
|
|
|
|
if fConf.Option.CName != "" {
|
|
// The current Function has a Connector association and needs to initialize the Connector instance
|
|
|
|
// Get Connector configuration
|
|
connConfig, err := fConf.GetConnConfig()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Create Connector object
|
|
connector := conn.NewKisConnector(connConfig)
|
|
|
|
// Initialize Connector, execute the Connector Init method
|
|
if err = connector.Init(); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Associate the Function instance with the Connector instance
|
|
_ = f.AddConnector(connector)
|
|
}
|
|
|
|
// Add Function to Flow
|
|
if err := flow.appendFunc(f, fParams); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// appendFunc adds the Function to the Flow, linked list operation
|
|
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
|
|
|
|
if function == nil {
|
|
return errors.New("AppendFunc append nil to List")
|
|
}
|
|
|
|
flow.flock.Lock()
|
|
defer flow.flock.Unlock()
|
|
|
|
if flow.FlowHead == nil {
|
|
// First time adding a node
|
|
flow.FlowHead = function
|
|
flow.FlowTail = function
|
|
|
|
function.SetN(nil)
|
|
function.SetP(nil)
|
|
|
|
} else {
|
|
// Insert the function at the end of the linked list
|
|
function.SetP(flow.FlowTail)
|
|
function.SetN(nil)
|
|
|
|
flow.FlowTail.SetN(function)
|
|
flow.FlowTail = function
|
|
}
|
|
|
|
// Add the detailed Function Name-Hash correspondence to the flow object
|
|
flow.Funcs[function.GetConfig().FName] = function
|
|
|
|
// First add the Params parameters carried by the function by default
|
|
params := make(config.FParam)
|
|
for key, value := range function.GetConfig().Option.Params {
|
|
params[key] = value
|
|
}
|
|
|
|
// Then add the function definition parameters carried by the flow (overwriting duplicates)
|
|
for key, value := range fParam {
|
|
params[key] = value
|
|
}
|
|
|
|
// Store the obtained FParams in the flow structure for direct access by the function
|
|
// The key is the KisId of the current Function, not using Fid to prevent adding two Functions with the same strategy Id to a Flow
|
|
flow.funcParams[function.GetID()] = params
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run starts the streaming computation of KisFlow, starting from the initial Function.
|
|
func (flow *KisFlow) Run(ctx context.Context) error {
|
|
|
|
var fn kis.Function
|
|
|
|
fn = flow.FlowHead
|
|
flow.abort = false
|
|
|
|
if flow.Conf.Status == int(common.FlowDisable) {
|
|
// Flow is configured to be disabled
|
|
return nil
|
|
}
|
|
|
|
// Metrics
|
|
var funcStart time.Time
|
|
var flowStart time.Time
|
|
|
|
// Since no Function has been executed at this time, PrevFunctionId is FirstVirtual because there is no previous layer Function
|
|
flow.PrevFunctionId = common.FunctionIDFirstVirtual
|
|
|
|
// Commit the original data stream
|
|
if err := flow.commitSrcData(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Metrics
|
|
if config.GlobalConfig.EnableProm == true {
|
|
// Count the number of Flow schedules
|
|
metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
|
|
// Count the execution time of Flow
|
|
flowStart = time.Now()
|
|
}
|
|
|
|
// Streaming chain call
|
|
for fn != nil && flow.abort == false {
|
|
|
|
// Record the current Function being executed by the flow
|
|
fid := fn.GetID()
|
|
flow.ThisFunction = fn
|
|
flow.ThisFunctionId = fid
|
|
|
|
fName := fn.GetConfig().FName
|
|
fMode := fn.GetConfig().FMode
|
|
|
|
if config.GlobalConfig.EnableProm == true {
|
|
// Count the number of Function schedules
|
|
metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
|
|
|
|
// Count the time consumed by Function, record the start time
|
|
funcStart = time.Now()
|
|
}
|
|
|
|
// Get the source data that the current Function needs to process
|
|
if inputData, err := flow.getCurData(); err != nil {
|
|
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
|
|
return err
|
|
} else {
|
|
flow.inPut = inputData
|
|
}
|
|
|
|
if err := fn.Call(ctx, flow); err != nil {
|
|
// Error
|
|
return err
|
|
} else {
|
|
// Success
|
|
fn, err = flow.dealAction(ctx, fn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Count the time consumed by Function
|
|
if config.GlobalConfig.EnableProm == true {
|
|
// Function consumption time
|
|
duration := time.Since(funcStart)
|
|
|
|
// Count the current Function metrics, do time statistics
|
|
metrics.Metrics.FunctionDuration.With(
|
|
prometheus.Labels{
|
|
common.LabelFunctionName: fName,
|
|
common.LabelFunctionMode: fMode}).Observe(duration.Seconds() * 1000)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// Metrics
|
|
if config.GlobalConfig.EnableProm == true {
|
|
// Count the execution time of Flow
|
|
duration := time.Since(flowStart)
|
|
metrics.Metrics.FlowDuration.WithLabelValues(flow.Name).Observe(duration.Seconds() * 1000)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Next the current Flow enters the Action action carried by the next layer Function.
|
|
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
|
|
|
|
// Load the Action actions carried by Function FaaS
|
|
flow.action = kis.LoadActions(acts)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (flow *KisFlow) GetName() string {
|
|
return flow.Name
|
|
}
|
|
|
|
func (flow *KisFlow) GetID() string {
|
|
return flow.Id
|
|
}
|
|
|
|
func (flow *KisFlow) GetThisFunction() kis.Function {
|
|
return flow.ThisFunction
|
|
}
|
|
|
|
func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
|
|
return flow.ThisFunction.GetConfig()
|
|
}
|
|
|
|
// GetConnector gets the Connector of the Function currently being executed by the Flow.
|
|
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
|
|
if connector := flow.ThisFunction.GetConnector(); connector != nil {
|
|
return connector, nil
|
|
} else {
|
|
return nil, errors.New("GetConnector(): Connector is nil")
|
|
}
|
|
}
|
|
|
|
// GetConnConf gets the Connector configuration of the Function currently being executed by the Flow.
|
|
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
|
|
if connector := flow.ThisFunction.GetConnector(); connector != nil {
|
|
return connector.GetConfig(), nil
|
|
} else {
|
|
return nil, errors.New("GetConnConf(): Connector is nil")
|
|
}
|
|
}
|
|
|
|
func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
|
|
return flow.Conf
|
|
}
|
|
|
|
// GetFuncConfigByName gets the configuration of the current Flow by Function name.
|
|
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
|
|
if f, ok := flow.Funcs[funcName]; ok {
|
|
return f.GetConfig()
|
|
} else {
|
|
log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
|
|
return nil
|
|
}
|
|
}
|