kis-flow/flow/kis_flow.go
2024-01-01 17:49:27 +08:00

143 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package flow
import (
"context"
"errors"
"kis-flow/common"
"kis-flow/config"
"kis-flow/function"
"kis-flow/id"
"kis-flow/kis"
"sync"
)
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
// 基础信息
Id string // Flow配置策略ID(平台生成管理)
Name string // Flow的可读名称
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionID
FlowHead kis.Function // 当前Flow所拥有的Function列表表头
FlowTail kis.Function // 当前Flow所拥有的Function列表表尾
flock sync.RWMutex // 管理链表插入读写的锁
ThisFunction kis.Function // Flow当前正在执行的KisFunction对象
ThisFunctionId string // 当前执行到的Function ID (策略配置ID)
PrevFunctionId string // 当前执行到的Function 上一层FunctionID(策略配置ID)
// Function列表参数
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例NsID, value:FParam
fplock sync.RWMutex // 管理funcParams的读写锁
KisId string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
}
// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// 基础信息
flow.Id = conf.FlowId
flow.Name = conf.FlowName
flow.Conf = conf
// Function列表
flow.Funcs = make(map[string]kis.Function)
flow.funcParams = make(map[string]config.FParam)
flow.KisId = id.KisID(common.KisIdTypeFlow)
return flow
}
// Link 将Function链接到Flow中
// fConf: 当前Function策略
// fParams: 当前Flow携带的Function动态参数
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
// 创建Function
f := function.NewKisFunction(flow, fConf)
// Flow 添加 Function
if err := flow.appendFunc(f, fParams); err != nil {
return err
}
return nil
}
// appendFunc 将Function添加到Flow中, 链表操作
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
if function == nil {
return errors.New("AppendFunc append nil to List")
}
flow.flock.Lock()
defer flow.flock.Unlock()
if flow.FlowHead == nil {
// 首次添加节点
flow.FlowHead = function
flow.FlowTail = function
function.SetN(nil)
function.SetP(nil)
} else {
// 将function插入到链表的尾部
function.SetP(flow.FlowTail)
function.SetN(nil)
flow.FlowTail.SetN(function)
flow.FlowTail = function
}
//将Function ID 详细Hash对应关系添加到flow对象中
flow.Funcs[function.GetId()] = function
//先添加function 默认携带的Params参数
params := make(config.FParam)
for key, value := range function.GetConfig().Option.Params {
params[key] = value
}
//再添加flow携带的function定义参数(重复即覆盖)
for key, value := range fParam {
params[key] = value
}
// 将得到的FParams存留在flow结构体中用来function业务直接通过Hash获取
// key 为当前Function的KisId不用Fid的原因是为了防止一个Flow添加两个相同策略Id的Function
flow.funcParams[function.GetKisId()] = params
return nil
}
// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
fn = flow.FlowHead
if flow.Conf.Status == int(common.FlowDisable) {
//flow被配置关闭
return nil
}
//流式链式调用
for fn != nil {
if err := fn.Call(ctx, flow); err != nil {
//Error
return err
} else {
//Success
fn = fn.Next()
}
}
return nil
}