Merge branch 'master' into feature/aceld

This commit is contained in:
aceld 2024-03-04 14:54:20 +08:00
commit 20b05d1246
8 changed files with 62 additions and 60 deletions

View File

@ -5,7 +5,7 @@ import "time"
// 用户生成KisId的字符串前缀 // 用户生成KisId的字符串前缀
const ( const (
KisIdTypeFlow = "flow" KisIdTypeFlow = "flow"
KisIdTypeConnnector = "conn" KisIdTypeConnector = "conn"
KisIdTypeFunction = "func" KisIdTypeFunction = "func"
KisIdTypeGlobal = "global" KisIdTypeGlobal = "global"
KisIdJoinChar = "-" KisIdJoinChar = "-"
@ -38,7 +38,7 @@ const (
) )
/* /*
是否启动Flow 是否启动Flow
*/ */
type KisOnOff int type KisOnOff int
@ -60,7 +60,7 @@ const (
// cache // cache
const ( const (
// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间 // DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
DeFaultFlowCacheCleanUp = 5 //单位 min DeFaultFlowCacheCleanUp = 5 // 单位 min
// DefaultExpiration 默认GoCahce时间 ,永久保存 // DefaultExpiration 默认GoCahce时间 ,永久保存
DefaultExpiration time.Duration = 0 DefaultExpiration time.Duration = 0
) )

View File

@ -11,16 +11,16 @@ type FParam map[string]string
// KisSource 表示当前Function的业务源 // KisSource 表示当前Function的业务源
type KisSource struct { type KisSource struct {
Name string `yaml:"name"` //本层Function的数据源描述 Name string `yaml:"name"` // 本层Function的数据源描述
Must []string `yaml:"must"` //source必传字段 Must []string `yaml:"must"` // source必传字段
} }
// KisFuncOption 可选配置 // KisFuncOption 可选配置
type KisFuncOption struct { type KisFuncOption struct {
CName string `yaml:"cname"` //连接器Connector名称 CName string `yaml:"cname"` // 连接器Connector名称
RetryTimes int `yaml:"retry_times"` //选填,Function调度重试(不包括正常调度)最大次数 RetryTimes int `yaml:"retry_times"` // 选填,Function调度重试(不包括正常调度)最大次数
RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms) RetryDuration int `yaml:"return_duration"` // 选填,Function调度每次重试最大时间间隔(单位:ms)
Params FParam `yaml:"default_params"` //选填,在当前Flow中Function定制固定配置参数 Params FParam `yaml:"default_params"` // 选填,在当前Flow中Function定制固定配置参数
} }
// KisFuncConfig 一个KisFunction策略配置 // KisFuncConfig 一个KisFunction策略配置
@ -49,7 +49,7 @@ func NewFuncConfig(
config.FMode = string(mode) config.FMode = string(mode)
//FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系 // FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
if mode == common.S || mode == common.L { if mode == common.S || mode == common.L {
if option == nil { if option == nil {
log.Logger().ErrorF("Funcion S/L need option->Cid\n") log.Logger().ErrorF("Funcion S/L need option->Cid\n")

View File

@ -29,7 +29,7 @@ type KisConnector struct {
// NewKisConnector 根据配置策略创建一个KisConnector // NewKisConnector 根据配置策略创建一个KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector { func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn := new(KisConnector) conn := new(KisConnector)
conn.CId = id.KisID(common.KisIdTypeConnnector) conn.CId = id.KisID(common.KisIdTypeConnector)
conn.CName = config.CName conn.CName = config.CName
conn.Conf = config conn.Conf = config
conn.metaData = make(map[string]interface{}) conn.metaData = make(map[string]interface{})
@ -41,7 +41,7 @@ func NewKisConnector(config *config.KisConnConfig) *KisConnector {
func (conn *KisConnector) Init() error { func (conn *KisConnector) Init() error {
var err error var err error
//一个Connector只能执行初始化业务一次 // 一个Connector只能执行初始化业务一次
conn.onceInit.Do(func() { conn.onceInit.Do(func() {
err = kis.Pool().CallConnInit(conn) err = kis.Pool().CallConnInit(conn)
}) })

View File

@ -3,10 +3,11 @@ package file
import ( import (
"errors" "errors"
"fmt" "fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"kis-flow/common" "kis-flow/common"
"kis-flow/kis" "kis-flow/kis"
"os"
"gopkg.in/yaml.v3"
) )
// ConfigExportYaml 将flow配置输出且存储本地 // ConfigExportYaml 将flow配置输出且存储本地
@ -15,13 +16,13 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
if data, err := yaml.Marshal(flow.GetConfig()); err != nil { if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
return err return err
} else { } else {
//flow // flow
err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644) err := os.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
if err != nil { if err != nil {
return err return err
} }
//function // function
for _, fp := range flow.GetConfig().Flows { for _, fp := range flow.GetConfig().Flows {
fConf := flow.GetFuncConfigByName(fp.FuncName) fConf := flow.GetFuncConfigByName(fp.FuncName)
if fConf == nil { if fConf == nil {
@ -31,7 +32,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
if fdata, err := yaml.Marshal(fConf); err != nil { if fdata, err := yaml.Marshal(fConf); err != nil {
return err return err
} else { } else {
if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil { if err := os.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
return err return err
} }
} }
@ -45,7 +46,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
if cdata, err := yaml.Marshal(cConf); err != nil { if cdata, err := yaml.Marshal(cConf); err != nil {
return err return err
} else { } else {
if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil { if err := os.WriteFile(savaPath+common.KisIdTypeConnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
return err return err
} }
} }

View File

@ -3,8 +3,6 @@ package file
import ( import (
"errors" "errors"
"fmt" "fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"kis-flow/common" "kis-flow/common"
"kis-flow/config" "kis-flow/config"
"kis-flow/flow" "kis-flow/flow"
@ -13,6 +11,8 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"gopkg.in/yaml.v3"
) )
type allConfig struct { type allConfig struct {
@ -23,22 +23,22 @@ type allConfig struct {
// kisTypeFlowConfigure 解析Flow配置文件yaml格式 // kisTypeFlowConfigure 解析Flow配置文件yaml格式
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
flow := new(config.KisFlowConfig) flowCfg := new(config.KisFlowConfig)
if ok := yaml.Unmarshal(confData, flow); ok != nil { if ok := yaml.Unmarshal(confData, flowCfg); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
} }
// 如果FLow状态为关闭则不做配置加载 // 如果FLow状态为关闭则不做配置加载
if common.KisOnOff(flow.Status) == common.FlowDisable { if common.KisOnOff(flowCfg.Status) == common.FlowDisable {
return nil return nil
} }
if _, ok := all.Flows[flow.FlowName]; ok { if _, ok := all.Flows[flowCfg.FlowName]; ok {
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName)) return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flowCfg.FlowName))
} }
// 加入配置集合中 // 加入配置集合中
all.Flows[flow.FlowName] = flow all.Flows[flowCfg.FlowName] = flowCfg
return nil return nil
} }
@ -105,7 +105,7 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
} }
// 读取文件内容 // 读取文件内容
confData, err := ioutil.ReadFile(filePath) confData, err := os.ReadFile(filePath)
if err != nil { if err != nil {
return err return err
} }
@ -128,7 +128,7 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
case common.KisIdTypeFunction: case common.KisIdTypeFunction:
return kisTypeFuncConfigure(all, confData, filePath, kisType) return kisTypeFuncConfigure(all, confData, filePath, kisType)
case common.KisIdTypeConnnector: case common.KisIdTypeConnector:
return kisTypeConnConfigure(all, confData, filePath, kisType) return kisTypeConnConfigure(all, confData, filePath, kisType)
case common.KisIdTypeGlobal: case common.KisIdTypeGlobal:
@ -148,11 +148,11 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
} }
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error { func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
//加载当前Flow依赖的Function // 加载当前Flow依赖的Function
if funcConfig, ok := all.Funcs[fp.FuncName]; !ok { if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName)) return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
} else { } else {
//flow add connector // flow add connector
if funcConfig.Option.CName != "" { if funcConfig.Option.CName != "" {
// 加载当前Function依赖的Connector // 加载当前Function依赖的Connector
if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok { if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
@ -163,7 +163,7 @@ func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow,
} }
} }
//flow add function // flow add function
if err := newFlow.Link(funcConfig, fp.Params); err != nil { if err := newFlow.Link(funcConfig, fp.Params); err != nil {
return err return err
} }
@ -191,7 +191,7 @@ func ConfigImportYaml(loadPath string) error {
} }
} }
//将flow添加到FlowPool中 // 将flow添加到FlowPool中
kis.Pool().AddFlow(flowName, newFlow) kis.Pool().AddFlow(flowName, newFlow)
} }

View File

@ -3,7 +3,6 @@ package flow
import ( import (
"context" "context"
"errors" "errors"
"github.com/patrickmn/go-cache"
"kis-flow/common" "kis-flow/common"
"kis-flow/config" "kis-flow/config"
"kis-flow/conn" "kis-flow/conn"
@ -13,6 +12,8 @@ import (
"kis-flow/log" "kis-flow/log"
"sync" "sync"
"time" "time"
"github.com/patrickmn/go-cache"
) )
// KisFlow 用于贯穿整条流式计算的上下文环境 // KisFlow 用于贯穿整条流式计算的上下文环境
@ -79,17 +80,17 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
// Fork 得到Flow的一个副本(深拷贝) // Fork 得到Flow的一个副本(深拷贝)
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
config := flow.Conf cfg := flow.Conf
// 通过之前的配置生成一个新的Flow // 通过之前的配置生成一个新的Flow
newFlow := NewKisFlow(config) newFlow := NewKisFlow(cfg)
for _, fp := range flow.Conf.Flows { for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok { if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
//当前function没有配置Params // 当前function没有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil) newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else { } else {
//当前function有配置Params // 当前function有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
} }
} }
@ -163,16 +164,16 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err
flow.FlowTail = function flow.FlowTail = function
} }
//将Function Name 详细Hash对应关系添加到flow对象中 // 将Function Name 详细Hash对应关系添加到flow对象中
flow.Funcs[function.GetConfig().FName] = function flow.Funcs[function.GetConfig().FName] = function
//先添加function 默认携带的Params参数 // 先添加function 默认携带的Params参数
params := make(config.FParam) params := make(config.FParam)
for key, value := range function.GetConfig().Option.Params { for key, value := range function.GetConfig().Option.Params {
params[key] = value params[key] = value
} }
//再添加flow携带的function定义参数(重复即覆盖) // 再添加flow携带的function定义参数(重复即覆盖)
for key, value := range fParam { for key, value := range fParam {
params[key] = value params[key] = value
} }
@ -193,7 +194,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
flow.abort = false flow.abort = false
if flow.Conf.Status == int(common.FlowDisable) { if flow.Conf.Status == int(common.FlowDisable) {
//flow被配置关闭 // flow被配置关闭
return nil return nil
} }
@ -205,7 +206,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
return err return err
} }
//流式链式调用 // 流式链式调用
for fn != nil && flow.abort == false { for fn != nil && flow.abort == false {
// flow记录当前执行到的Function 标记 // flow记录当前执行到的Function 标记
@ -222,10 +223,10 @@ func (flow *KisFlow) Run(ctx context.Context) error {
} }
if err := fn.Call(ctx, flow); err != nil { if err := fn.Call(ctx, flow); err != nil {
//Error // Error
return err return err
} else { } else {
//Success // Success
fn, err = flow.dealAction(ctx, fn) fn, err = flow.dealAction(ctx, fn)
if err != nil { if err != nil {
return err return err
@ -259,8 +260,8 @@ func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
// GetConnector 得到当前正在执行的Function的Connector // GetConnector 得到当前正在执行的Function的Connector
func (flow *KisFlow) GetConnector() (kis.Connector, error) { func (flow *KisFlow) GetConnector() (kis.Connector, error) {
if conn := flow.ThisFunction.GetConnector(); conn != nil { if connector := flow.ThisFunction.GetConnector(); connector != nil {
return conn, nil return connector, nil
} else { } else {
return nil, errors.New("GetConnector(): Connector is nil") return nil, errors.New("GetConnector(): Connector is nil")
} }
@ -268,8 +269,8 @@ func (flow *KisFlow) GetConnector() (kis.Connector, error) {
// GetConnConf 得到当前正在执行的Function的Connector的配置 // GetConnConf 得到当前正在执行的Function的Connector的配置
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) { func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
if conn := flow.ThisFunction.GetConnector(); conn != nil { if connector := flow.ThisFunction.GetConnector(); connector != nil {
return conn.GetConfig(), nil return connector.GetConfig(), nil
} else { } else {
return nil, errors.New("GetConnConf(): Connector is nil") return nil, errors.New("GetConnConf(): Connector is nil")
} }

View File

@ -10,7 +10,7 @@ import (
"testing" "testing"
) )
func TestConfigImportYmal(t *testing.T) { func TestConfigImportYaml(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// 0. 注册Function 回调业务 // 0. 注册Function 回调业务

View File

@ -16,7 +16,7 @@ func TestNewFuncConfig(t *testing.T) {
option := config.KisFuncOption{ option := config.KisFuncOption{
CName: "connectorName1", CName: "connectorName1",
RetryTimes: 3, RetryTimes: 3,
RetryDuriton: 300, RetryDuration: 300,
Params: config.FParam{ Params: config.FParam{
"param1": "value1", "param1": "value1",
@ -63,7 +63,7 @@ func TestNewConnConfig(t *testing.T) {
option := config.KisFuncOption{ option := config.KisFuncOption{
CName: "connectorName1", CName: "connectorName1",
RetryTimes: 3, RetryTimes: 3,
RetryDuriton: 300, RetryDuration: 300,
Params: config.FParam{ Params: config.FParam{
"param1": "value1", "param1": "value1",