2023-12-31 18:04:28 +08:00
package flow
2024-01-01 17:49:27 +08:00
import (
"context"
"errors"
2024-03-26 14:54:50 +08:00
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/conn"
"github.com/aceld/kis-flow/function"
"github.com/aceld/kis-flow/id"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/log"
"github.com/aceld/kis-flow/metrics"
2024-03-18 09:29:05 +08:00
"github.com/prometheus/client_golang/prometheus"
2024-01-01 17:49:27 +08:00
"sync"
2024-01-26 17:27:29 +08:00
"time"
2024-03-01 16:29:07 +08:00
"github.com/patrickmn/go-cache"
2024-01-01 17:49:27 +08:00
)
2023-12-31 18:04:28 +08:00
2024-04-15 17:50:02 +08:00
// KisFlow is used to manage the context environment of the entire streaming computation.
2023-12-31 18:04:28 +08:00
type KisFlow struct {
2024-04-15 17:50:02 +08:00
// Basic information
Id string // Distributed instance ID of the Flow (used internally by KisFlow to distinguish different instances)
Name string // Readable name of the Flow
Conf * config . KisFlowConfig // Flow configuration policy
// List of Functions
Funcs map [ string ] kis . Function // All managed Function objects of the current flow, key: FunctionName
FlowHead kis . Function // Head of the Function list owned by the current Flow
FlowTail kis . Function // Tail of the Function list owned by the current Flow
flock sync . RWMutex // Lock for managing linked list insertion and reading
ThisFunction kis . Function // KisFunction object currently being executed in the Flow
ThisFunctionId string // ID of the Function currently being executed
PrevFunctionId string // ID of the previous layer Function
// Function list parameters
funcParams map [ string ] config . FParam // Custom fixed configuration parameters of the Flow in the current Function, Key: KisID of the function instance, value: FParam
fplock sync . RWMutex // Lock for managing funcParams read and write
// Data
buffer common . KisRowArr // Internal buffer used to temporarily store input byte data, one data is interface{}, multiple data is []interface{} i.e. KisBatch
data common . KisDataMap // Data sources at various levels of the streaming computation
inPut common . KisRowArr // Input data for the current Function computation
abort bool // Whether to abort the Flow
action kis . Action // Action carried by the current Flow
// Local cache of the flow
cache * cache . Cache // Temporary cache context environment of the Flow
// metaData of the flow
metaData map [ string ] interface { } // Custom temporary data of the Flow
mLock sync . RWMutex // Lock for managing metaData read and write
2023-12-31 18:04:28 +08:00
}
2024-04-15 17:50:02 +08:00
// NewKisFlow creates a KisFlow.
2024-01-01 17:49:27 +08:00
func NewKisFlow ( conf * config . KisFlowConfig ) kis . Flow {
2023-12-31 18:04:28 +08:00
flow := new ( KisFlow )
2024-04-15 17:50:02 +08:00
// Instance Id
flow . Id = id . KisID ( common . KisIDTypeFlow )
2024-01-01 17:49:27 +08:00
2024-04-15 17:50:02 +08:00
// Basic information
2024-01-01 17:49:27 +08:00
flow . Name = conf . FlowName
flow . Conf = conf
2024-04-15 17:50:02 +08:00
// List of Functions
2024-01-01 17:49:27 +08:00
flow . Funcs = make ( map [ string ] kis . Function )
flow . funcParams = make ( map [ string ] config . FParam )
2024-04-15 17:50:02 +08:00
// Data
2024-01-03 17:22:35 +08:00
flow . data = make ( common . KisDataMap )
2024-04-15 17:50:02 +08:00
// Initialize local cache
2024-01-26 17:27:29 +08:00
flow . cache = cache . New ( cache . NoExpiration , common . DeFaultFlowCacheCleanUp * time . Minute )
2024-04-15 17:50:02 +08:00
// Initialize temporary data
2024-01-26 17:27:29 +08:00
flow . metaData = make ( map [ string ] interface { } )
2023-12-31 18:04:28 +08:00
return flow
}
2024-01-01 17:49:27 +08:00
2024-04-15 17:50:02 +08:00
// Fork gets a copy (deep copy) of the Flow.
2024-02-04 16:27:28 +08:00
func ( flow * KisFlow ) Fork ( ctx context . Context ) kis . Flow {
2024-03-01 16:29:07 +08:00
cfg := flow . Conf
2024-02-04 16:27:28 +08:00
2024-04-15 17:50:02 +08:00
// Generate a new Flow based on the previous configuration
2024-03-01 16:29:07 +08:00
newFlow := NewKisFlow ( cfg )
2024-02-04 16:27:28 +08:00
for _ , fp := range flow . Conf . Flows {
2024-04-15 17:50:02 +08:00
if _ , ok := flow . funcParams [ flow . Funcs [ fp . FuncName ] . GetID ( ) ] ; ! ok {
// The current function has no Params configured
2024-03-29 18:07:57 +08:00
_ = newFlow . AppendNewFunction ( flow . Funcs [ fp . FuncName ] . GetConfig ( ) , nil )
2024-02-04 16:27:28 +08:00
} else {
2024-04-15 17:50:02 +08:00
// The current function has configured Params
2024-03-29 18:07:57 +08:00
_ = newFlow . AppendNewFunction ( flow . Funcs [ fp . FuncName ] . GetConfig ( ) , fp . Params )
2024-02-04 16:27:28 +08:00
}
}
2024-04-15 11:17:47 +08:00
log . Logger ( ) . DebugFX ( ctx , "=====>Flow Fork, oldFlow.funcParams = %+v\n" , flow . funcParams )
log . Logger ( ) . DebugFX ( ctx , "=====>Flow Fork, newFlow.funcParams = %+v\n" , newFlow . GetFuncParamsAllFuncs ( ) )
2024-02-04 16:27:28 +08:00
return newFlow
}
2024-04-15 17:50:02 +08:00
// Link links the Function to the Flow, and also adds the Function's configuration parameters to the Flow's configuration.
// fConf: Current Function strategy
// fParams: Dynamic parameters carried by the current Flow's Function
2024-01-01 17:49:27 +08:00
func ( flow * KisFlow ) Link ( fConf * config . KisFuncConfig , fParams config . FParam ) error {
2024-03-29 18:07:57 +08:00
2024-04-15 17:50:02 +08:00
// Add Function to Flow
2024-03-29 18:07:57 +08:00
_ = flow . AppendNewFunction ( fConf , fParams )
2024-04-15 17:50:02 +08:00
// Add Function to FlowConfig
2024-03-29 18:07:57 +08:00
flowFuncParam := config . KisFlowFunctionParam {
FuncName : fConf . FName ,
Params : fParams ,
}
flow . Conf . AppendFunctionConfig ( flowFuncParam )
return nil
}
2024-04-15 17:50:02 +08:00
// AppendNewFunction appends a new Function to the Flow.
2024-03-29 18:07:57 +08:00
func ( flow * KisFlow ) AppendNewFunction ( fConf * config . KisFuncConfig , fParams config . FParam ) error {
2024-04-15 17:50:02 +08:00
// Create Function instance
2024-01-01 17:49:27 +08:00
f := function . NewKisFunction ( flow , fConf )
2024-01-09 17:30:58 +08:00
if fConf . Option . CName != "" {
2024-04-15 17:50:02 +08:00
// The current Function has a Connector association and needs to initialize the Connector instance
2024-01-09 17:30:58 +08:00
2024-04-15 17:50:02 +08:00
// Get Connector configuration
2024-01-09 17:30:58 +08:00
connConfig , err := fConf . GetConnConfig ( )
if err != nil {
panic ( err )
}
2024-04-15 17:50:02 +08:00
// Create Connector object
2024-01-09 17:30:58 +08:00
connector := conn . NewKisConnector ( connConfig )
2024-04-15 17:50:02 +08:00
// Initialize Connector, execute the Connector Init method
2024-01-09 17:30:58 +08:00
if err = connector . Init ( ) ; err != nil {
panic ( err )
}
2024-04-15 17:50:02 +08:00
// Associate the Function instance with the Connector instance
2024-01-09 17:30:58 +08:00
_ = f . AddConnector ( connector )
}
2024-04-15 17:50:02 +08:00
// Add Function to Flow
2024-01-01 17:49:27 +08:00
if err := flow . appendFunc ( f , fParams ) ; err != nil {
return err
}
return nil
}
2024-04-15 17:50:02 +08:00
// appendFunc adds the Function to the Flow, linked list operation
2024-01-01 17:49:27 +08:00
func ( flow * KisFlow ) appendFunc ( function kis . Function , fParam config . FParam ) error {
if function == nil {
return errors . New ( "AppendFunc append nil to List" )
}
flow . flock . Lock ( )
defer flow . flock . Unlock ( )
if flow . FlowHead == nil {
2024-04-15 17:50:02 +08:00
// First time adding a node
2024-01-01 17:49:27 +08:00
flow . FlowHead = function
flow . FlowTail = function
function . SetN ( nil )
function . SetP ( nil )
} else {
2024-04-15 17:50:02 +08:00
// Insert the function at the end of the linked list
2024-01-01 17:49:27 +08:00
function . SetP ( flow . FlowTail )
function . SetN ( nil )
flow . FlowTail . SetN ( function )
flow . FlowTail = function
}
2024-04-15 17:50:02 +08:00
// Add the detailed Function Name-Hash correspondence to the flow object
2024-01-12 17:27:43 +08:00
flow . Funcs [ function . GetConfig ( ) . FName ] = function
2024-01-01 17:49:27 +08:00
2024-04-15 17:50:02 +08:00
// First add the Params parameters carried by the function by default
2024-01-01 17:49:27 +08:00
params := make ( config . FParam )
for key , value := range function . GetConfig ( ) . Option . Params {
params [ key ] = value
}
2024-04-15 17:50:02 +08:00
// Then add the function definition parameters carried by the flow (overwriting duplicates)
2024-01-01 17:49:27 +08:00
for key , value := range fParam {
params [ key ] = value
}
2024-04-15 17:50:02 +08:00
// Store the obtained FParams in the flow structure for direct access by the function
// The key is the KisId of the current Function, not using Fid to prevent adding two Functions with the same strategy Id to a Flow
flow . funcParams [ function . GetID ( ) ] = params
2024-01-01 17:49:27 +08:00
return nil
}
2024-04-15 17:50:02 +08:00
// Run starts the streaming computation of KisFlow, starting from the initial Function.
2024-01-01 17:49:27 +08:00
func ( flow * KisFlow ) Run ( ctx context . Context ) error {
var fn kis . Function
fn = flow . FlowHead
2024-01-23 16:21:02 +08:00
flow . abort = false
2024-01-01 17:49:27 +08:00
if flow . Conf . Status == int ( common . FlowDisable ) {
2024-04-15 17:50:02 +08:00
// Flow is configured to be disabled
2024-01-01 17:49:27 +08:00
return nil
}
2024-03-18 09:29:05 +08:00
// Metrics
var funcStart time . Time
var flowStart time . Time
2024-04-15 17:50:02 +08:00
// Since no Function has been executed at this time, PrevFunctionId is FirstVirtual because there is no previous layer Function
flow . PrevFunctionId = common . FunctionIDFirstVirtual
2024-01-03 17:22:35 +08:00
2024-04-15 17:50:02 +08:00
// Commit the original data stream
2024-01-03 17:22:35 +08:00
if err := flow . commitSrcData ( ctx ) ; err != nil {
return err
}
2024-03-18 09:29:05 +08:00
// Metrics
if config . GlobalConfig . EnableProm == true {
2024-04-15 17:50:02 +08:00
// Count the number of Flow schedules
2024-03-18 09:29:05 +08:00
metrics . Metrics . FlowScheduleCntsToTal . WithLabelValues ( flow . Name ) . Inc ( )
2024-04-15 17:50:02 +08:00
// Count the execution time of Flow
2024-03-18 09:29:05 +08:00
flowStart = time . Now ( )
}
2024-04-15 17:50:02 +08:00
// Streaming chain call
2024-01-23 16:21:02 +08:00
for fn != nil && flow . abort == false {
2024-01-03 17:22:35 +08:00
2024-04-15 17:50:02 +08:00
// Record the current Function being executed by the flow
fid := fn . GetID ( )
2024-01-03 17:22:35 +08:00
flow . ThisFunction = fn
flow . ThisFunctionId = fid
2024-03-18 09:29:05 +08:00
fName := fn . GetConfig ( ) . FName
fMode := fn . GetConfig ( ) . FMode
if config . GlobalConfig . EnableProm == true {
2024-04-15 17:50:02 +08:00
// Count the number of Function schedules
2024-03-18 09:29:05 +08:00
metrics . Metrics . FuncScheduleCntsTotal . WithLabelValues ( fName , fMode ) . Inc ( )
2024-04-15 17:50:02 +08:00
// Count the time consumed by Function, record the start time
2024-03-18 09:29:05 +08:00
funcStart = time . Now ( )
}
2024-04-15 17:50:02 +08:00
// Get the source data that the current Function needs to process
2024-01-03 17:22:35 +08:00
if inputData , err := flow . getCurData ( ) ; err != nil {
2024-04-15 11:17:47 +08:00
log . Logger ( ) . ErrorFX ( ctx , "flow.Run(): getCurData err = %s\n" , err . Error ( ) )
2024-01-03 17:22:35 +08:00
return err
} else {
flow . inPut = inputData
}
2024-01-01 17:49:27 +08:00
if err := fn . Call ( ctx , flow ) ; err != nil {
2024-03-01 16:29:07 +08:00
// Error
2024-01-01 17:49:27 +08:00
return err
} else {
2024-03-01 16:29:07 +08:00
// Success
2024-01-23 16:21:02 +08:00
fn , err = flow . dealAction ( ctx , fn )
if err != nil {
2024-01-03 17:22:35 +08:00
return err
}
2024-03-18 09:29:05 +08:00
2024-04-15 17:50:02 +08:00
// Count the time consumed by Function
2024-03-18 09:29:05 +08:00
if config . GlobalConfig . EnableProm == true {
2024-04-15 17:50:02 +08:00
// Function consumption time
2024-03-18 09:29:05 +08:00
duration := time . Since ( funcStart )
2024-04-15 17:50:02 +08:00
// Count the current Function metrics, do time statistics
2024-03-18 09:29:05 +08:00
metrics . Metrics . FunctionDuration . With (
prometheus . Labels {
2024-04-15 17:50:02 +08:00
common . LabelFunctionName : fName ,
common . LabelFunctionMode : fMode } ) . Observe ( duration . Seconds ( ) * 1000 )
2024-03-18 09:29:05 +08:00
}
2024-01-01 17:49:27 +08:00
}
}
2024-03-18 09:29:05 +08:00
// Metrics
if config . GlobalConfig . EnableProm == true {
2024-04-15 17:50:02 +08:00
// Count the execution time of Flow
2024-03-18 09:29:05 +08:00
duration := time . Since ( flowStart )
metrics . Metrics . FlowDuration . WithLabelValues ( flow . Name ) . Observe ( duration . Seconds ( ) * 1000 )
}
2024-01-01 17:49:27 +08:00
return nil
}
2024-01-04 16:36:36 +08:00
2024-04-15 17:50:02 +08:00
// Next the current Flow enters the Action action carried by the next layer Function.
2024-01-23 16:21:02 +08:00
func ( flow * KisFlow ) Next ( acts ... kis . ActionFunc ) error {
2024-04-15 17:50:02 +08:00
// Load the Action actions carried by Function FaaS
2024-01-23 16:21:02 +08:00
flow . action = kis . LoadActions ( acts )
return nil
}
2024-01-04 16:36:36 +08:00
func ( flow * KisFlow ) GetName ( ) string {
return flow . Name
}
2024-04-15 17:50:02 +08:00
func ( flow * KisFlow ) GetID ( ) string {
2024-04-15 10:44:57 +08:00
return flow . Id
}
2024-01-04 16:36:36 +08:00
func ( flow * KisFlow ) GetThisFunction ( ) kis . Function {
return flow . ThisFunction
}
func ( flow * KisFlow ) GetThisFuncConf ( ) * config . KisFuncConfig {
return flow . ThisFunction . GetConfig ( )
}
2024-01-09 17:30:58 +08:00
2024-04-15 17:50:02 +08:00
// GetConnector gets the Connector of the Function currently being executed by the Flow.
2024-01-09 17:30:58 +08:00
func ( flow * KisFlow ) GetConnector ( ) ( kis . Connector , error ) {
2024-03-01 16:29:07 +08:00
if connector := flow . ThisFunction . GetConnector ( ) ; connector != nil {
return connector , nil
2024-01-09 17:30:58 +08:00
} else {
return nil , errors . New ( "GetConnector(): Connector is nil" )
}
}
2024-04-15 17:50:02 +08:00
// GetConnConf gets the Connector configuration of the Function currently being executed by the Flow.
2024-01-09 17:30:58 +08:00
func ( flow * KisFlow ) GetConnConf ( ) ( * config . KisConnConfig , error ) {
2024-03-01 16:29:07 +08:00
if connector := flow . ThisFunction . GetConnector ( ) ; connector != nil {
return connector . GetConfig ( ) , nil
2024-01-09 17:30:58 +08:00
} else {
return nil , errors . New ( "GetConnConf(): Connector is nil" )
}
}
2024-01-12 17:27:43 +08:00
func ( flow * KisFlow ) GetConfig ( ) * config . KisFlowConfig {
return flow . Conf
}
2024-04-15 17:50:02 +08:00
// GetFuncConfigByName gets the configuration of the current Flow by Function name.
2024-01-12 17:27:43 +08:00
func ( flow * KisFlow ) GetFuncConfigByName ( funcName string ) * config . KisFuncConfig {
if f , ok := flow . Funcs [ funcName ] ; ok {
return f . GetConfig ( )
} else {
2024-04-15 11:17:47 +08:00
log . Logger ( ) . ErrorF ( "GetFuncConfigByName(): Function %s not found" , funcName )
2024-01-12 17:27:43 +08:00
return nil
}
}