diff --git a/common/const.go b/common/const.go index 24f63f6..a693993 100644 --- a/common/const.go +++ b/common/const.go @@ -4,11 +4,11 @@ import "time" // 用户生成KisId的字符串前缀 const ( - KisIdTypeFlow = "flow" - KisIdTypeConnnector = "conn" - KisIdTypeFunction = "func" - KisIdTypeGlobal = "global" - KisIdJoinChar = "-" + KisIdTypeFlow = "flow" + KisIdTypeConnector = "conn" + KisIdTypeFunction = "func" + KisIdTypeGlobal = "global" + KisIdJoinChar = "-" ) const ( @@ -38,7 +38,7 @@ const ( ) /* - 是否启动Flow +是否启动Flow */ type KisOnOff int @@ -60,7 +60,7 @@ const ( // cache const ( // DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间 - DeFaultFlowCacheCleanUp = 5 //单位 min + DeFaultFlowCacheCleanUp = 5 // 单位 min // DefaultExpiration 默认GoCahce时间 ,永久保存 DefaultExpiration time.Duration = 0 ) diff --git a/config/kis_func_config.go b/config/kis_func_config.go index 01864a4..55ad785 100644 --- a/config/kis_func_config.go +++ b/config/kis_func_config.go @@ -11,16 +11,16 @@ type FParam map[string]string // KisSource 表示当前Function的业务源 type KisSource struct { - Name string `yaml:"name"` //本层Function的数据源描述 - Must []string `yaml:"must"` //source必传字段 + Name string `yaml:"name"` // 本层Function的数据源描述 + Must []string `yaml:"must"` // source必传字段 } // KisFuncOption 可选配置 type KisFuncOption struct { - CName string `yaml:"cname"` //连接器Connector名称 - RetryTimes int `yaml:"retry_times"` //选填,Function调度重试(不包括正常调度)最大次数 - RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms) - Params FParam `yaml:"default_params"` //选填,在当前Flow中Function定制固定配置参数 + CName string `yaml:"cname"` // 连接器Connector名称 + RetryTimes int `yaml:"retry_times"` // 选填,Function调度重试(不包括正常调度)最大次数 + RetryDuration int `yaml:"return_duration"` // 选填,Function调度每次重试最大时间间隔(单位:ms) + Params FParam `yaml:"default_params"` // 选填,在当前Flow中Function定制固定配置参数 } // KisFuncConfig 一个KisFunction策略配置 @@ -49,7 +49,7 @@ func NewFuncConfig( config.FMode = string(mode) - //FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系 + // FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系 if mode == common.S || mode == common.L { if option == nil { log.Logger().ErrorF("Funcion S/L need option->Cid\n") diff --git a/conn/kis_connector.go b/conn/kis_connector.go index df48d45..5e62e06 100644 --- a/conn/kis_connector.go +++ b/conn/kis_connector.go @@ -29,7 +29,7 @@ type KisConnector struct { // NewKisConnector 根据配置策略创建一个KisConnector func NewKisConnector(config *config.KisConnConfig) *KisConnector { conn := new(KisConnector) - conn.CId = id.KisID(common.KisIdTypeConnnector) + conn.CId = id.KisID(common.KisIdTypeConnector) conn.CName = config.CName conn.Conf = config conn.metaData = make(map[string]interface{}) @@ -41,7 +41,7 @@ func NewKisConnector(config *config.KisConnConfig) *KisConnector { func (conn *KisConnector) Init() error { var err error - //一个Connector只能执行初始化业务一次 + // 一个Connector只能执行初始化业务一次 conn.onceInit.Do(func() { err = kis.Pool().CallConnInit(conn) }) diff --git a/file/config_export.go b/file/config_export.go index e42237f..a7fbc43 100644 --- a/file/config_export.go +++ b/file/config_export.go @@ -3,10 +3,11 @@ package file import ( "errors" "fmt" - "gopkg.in/yaml.v3" - "io/ioutil" "kis-flow/common" "kis-flow/kis" + "os" + + "gopkg.in/yaml.v3" ) // ConfigExportYaml 将flow配置输出,且存储本地 @@ -15,13 +16,13 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error { if data, err := yaml.Marshal(flow.GetConfig()); err != nil { return err } else { - //flow - err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644) + // flow + err := os.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644) if err != nil { return err } - //function + // function for _, fp := range flow.GetConfig().Flows { fConf := flow.GetFuncConfigByName(fp.FuncName) if fConf == nil { @@ -31,7 +32,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error { if fdata, err := yaml.Marshal(fConf); err != nil { return err } else { - if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil { + if err := os.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil { return err } } @@ -45,7 +46,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error { if cdata, err := yaml.Marshal(cConf); err != nil { return err } else { - if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil { + if err := os.WriteFile(savaPath+common.KisIdTypeConnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil { return err } } diff --git a/file/config_import.go b/file/config_import.go index 4dc6aa1..521fb9d 100644 --- a/file/config_import.go +++ b/file/config_import.go @@ -3,8 +3,6 @@ package file import ( "errors" "fmt" - "gopkg.in/yaml.v3" - "io/ioutil" "kis-flow/common" "kis-flow/config" "kis-flow/flow" @@ -13,6 +11,8 @@ import ( "os" "path" "path/filepath" + + "gopkg.in/yaml.v3" ) type allConfig struct { @@ -23,22 +23,22 @@ type allConfig struct { // kisTypeFlowConfigure 解析Flow配置文件,yaml格式 func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { - flow := new(config.KisFlowConfig) - if ok := yaml.Unmarshal(confData, flow); ok != nil { + flowCfg := new(config.KisFlowConfig) + if ok := yaml.Unmarshal(confData, flowCfg); ok != nil { return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) } // 如果FLow状态为关闭,则不做配置加载 - if common.KisOnOff(flow.Status) == common.FlowDisable { + if common.KisOnOff(flowCfg.Status) == common.FlowDisable { return nil } - if _, ok := all.Flows[flow.FlowName]; ok { - return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName)) + if _, ok := all.Flows[flowCfg.FlowName]; ok { + return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flowCfg.FlowName)) } // 加入配置集合中 - all.Flows[flow.FlowName] = flow + all.Flows[flowCfg.FlowName] = flowCfg return nil } @@ -105,7 +105,7 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) { } // 读取文件内容 - confData, err := ioutil.ReadFile(filePath) + confData, err := os.ReadFile(filePath) if err != nil { return err } @@ -128,7 +128,7 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) { case common.KisIdTypeFunction: return kisTypeFuncConfigure(all, confData, filePath, kisType) - case common.KisIdTypeConnnector: + case common.KisIdTypeConnector: return kisTypeConnConfigure(all, confData, filePath, kisType) case common.KisIdTypeGlobal: @@ -148,11 +148,11 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) { } func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error { - //加载当前Flow依赖的Function + // 加载当前Flow依赖的Function if funcConfig, ok := all.Funcs[fp.FuncName]; !ok { return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName)) } else { - //flow add connector + // flow add connector if funcConfig.Option.CName != "" { // 加载当前Function依赖的Connector if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok { @@ -163,7 +163,7 @@ func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, } } - //flow add function + // flow add function if err := newFlow.Link(funcConfig, fp.Params); err != nil { return err } @@ -191,7 +191,7 @@ func ConfigImportYaml(loadPath string) error { } } - //将flow添加到FlowPool中 + // 将flow添加到FlowPool中 kis.Pool().AddFlow(flowName, newFlow) } diff --git a/flow/kis_flow.go b/flow/kis_flow.go index 7fcaac5..3ce0a25 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -3,7 +3,6 @@ package flow import ( "context" "errors" - "github.com/patrickmn/go-cache" "kis-flow/common" "kis-flow/config" "kis-flow/conn" @@ -13,6 +12,8 @@ import ( "kis-flow/log" "sync" "time" + + "github.com/patrickmn/go-cache" ) // KisFlow 用于贯穿整条流式计算的上下文环境 @@ -79,17 +80,17 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { // Fork 得到Flow的一个副本(深拷贝) func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { - config := flow.Conf + cfg := flow.Conf // 通过之前的配置生成一个新的Flow - newFlow := NewKisFlow(config) + newFlow := NewKisFlow(cfg) for _, fp := range flow.Conf.Flows { if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok { - //当前function没有配置Params + // 当前function没有配置Params newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil) } else { - //当前function有配置Params + // 当前function有配置Params newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) } } @@ -163,16 +164,16 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err flow.FlowTail = function } - //将Function Name 详细Hash对应关系添加到flow对象中 + // 将Function Name 详细Hash对应关系添加到flow对象中 flow.Funcs[function.GetConfig().FName] = function - //先添加function 默认携带的Params参数 + // 先添加function 默认携带的Params参数 params := make(config.FParam) for key, value := range function.GetConfig().Option.Params { params[key] = value } - //再添加flow携带的function定义参数(重复即覆盖) + // 再添加flow携带的function定义参数(重复即覆盖) for key, value := range fParam { params[key] = value } @@ -193,7 +194,7 @@ func (flow *KisFlow) Run(ctx context.Context) error { flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { - //flow被配置关闭 + // flow被配置关闭 return nil } @@ -205,7 +206,7 @@ func (flow *KisFlow) Run(ctx context.Context) error { return err } - //流式链式调用 + // 流式链式调用 for fn != nil && flow.abort == false { // flow记录当前执行到的Function 标记 @@ -222,10 +223,10 @@ func (flow *KisFlow) Run(ctx context.Context) error { } if err := fn.Call(ctx, flow); err != nil { - //Error + // Error return err } else { - //Success + // Success fn, err = flow.dealAction(ctx, fn) if err != nil { return err @@ -259,8 +260,8 @@ func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig { // GetConnector 得到当前正在执行的Function的Connector func (flow *KisFlow) GetConnector() (kis.Connector, error) { - if conn := flow.ThisFunction.GetConnector(); conn != nil { - return conn, nil + if connector := flow.ThisFunction.GetConnector(); connector != nil { + return connector, nil } else { return nil, errors.New("GetConnector(): Connector is nil") } @@ -268,8 +269,8 @@ func (flow *KisFlow) GetConnector() (kis.Connector, error) { // GetConnConf 得到当前正在执行的Function的Connector的配置 func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) { - if conn := flow.ThisFunction.GetConnector(); conn != nil { - return conn.GetConfig(), nil + if connector := flow.ThisFunction.GetConnector(); connector != nil { + return connector.GetConfig(), nil } else { return nil, errors.New("GetConnConf(): Connector is nil") } diff --git a/test/kis_config_import_test.go b/test/kis_config_import_test.go index a0594ff..f008070 100644 --- a/test/kis_config_import_test.go +++ b/test/kis_config_import_test.go @@ -10,7 +10,7 @@ import ( "testing" ) -func TestConfigImportYmal(t *testing.T) { +func TestConfigImportYaml(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 diff --git a/test/kis_config_test.go b/test/kis_config_test.go index 5d19e04..36d4450 100644 --- a/test/kis_config_test.go +++ b/test/kis_config_test.go @@ -14,9 +14,9 @@ func TestNewFuncConfig(t *testing.T) { } option := config.KisFuncOption{ - CName: "connectorName1", - RetryTimes: 3, - RetryDuriton: 300, + CName: "connectorName1", + RetryTimes: 3, + RetryDuration: 300, Params: config.FParam{ "param1": "value1", @@ -61,9 +61,9 @@ func TestNewConnConfig(t *testing.T) { } option := config.KisFuncOption{ - CName: "connectorName1", - RetryTimes: 3, - RetryDuriton: 300, + CName: "connectorName1", + RetryTimes: 3, + RetryDuration: 300, Params: config.FParam{ "param1": "value1",