diff --git a/file/config_export.go b/file/config_export.go new file mode 100644 index 0000000..e42237f --- /dev/null +++ b/file/config_export.go @@ -0,0 +1,57 @@ +package file + +import ( + "errors" + "fmt" + "gopkg.in/yaml.v3" + "io/ioutil" + "kis-flow/common" + "kis-flow/kis" +) + +// ConfigExportYaml 将flow配置输出,且存储本地 +func ConfigExportYaml(flow kis.Flow, savaPath string) error { + + if data, err := yaml.Marshal(flow.GetConfig()); err != nil { + return err + } else { + //flow + err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644) + if err != nil { + return err + } + + //function + for _, fp := range flow.GetConfig().Flows { + fConf := flow.GetFuncConfigByName(fp.FuncName) + if fConf == nil { + return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName)) + } + + if fdata, err := yaml.Marshal(fConf); err != nil { + return err + } else { + if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil { + return err + } + } + + // Connector + if fConf.Option.CName != "" { + cConf, err := fConf.GetConnConfig() + if err != nil { + return err + } + if cdata, err := yaml.Marshal(cConf); err != nil { + return err + } else { + if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil { + return err + } + } + } + } + } + + return nil +} diff --git a/file/config_import.go b/file/config_import.go new file mode 100644 index 0000000..ec4ebc6 --- /dev/null +++ b/file/config_import.go @@ -0,0 +1,182 @@ +package file + +import ( + "errors" + "fmt" + "gopkg.in/yaml.v3" + "io/ioutil" + "kis-flow/common" + "kis-flow/config" + "kis-flow/flow" + "kis-flow/kis" + "os" + "path" + "path/filepath" +) + +type allConfig struct { + Flows map[string]*config.KisFlowConfig + Funcs map[string]*config.KisFuncConfig + Conns map[string]*config.KisConnConfig +} + +// kisTypeFlowConfigure 解析Flow配置文件,yaml格式 +func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { + flow := new(config.KisFlowConfig) + if ok := yaml.Unmarshal(confData, flow); ok != nil { + return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) + } + + // 如果FLow状态为关闭,则不做配置加载 + if common.KisOnOff(flow.Status) == common.FlowDisable { + return nil + } + + if _, ok := all.Flows[flow.FlowName]; ok { + return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName)) + } + + // 加入配置集合中 + all.Flows[flow.FlowName] = flow + + return nil +} + +// kisTypeFuncConfigure 解析Function配置文件,yaml格式 +func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { + function := new(config.KisFuncConfig) + if ok := yaml.Unmarshal(confData, function); ok != nil { + return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) + } + if _, ok := all.Funcs[function.FName]; ok { + return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName)) + } + + // 加入配置集合中 + all.Funcs[function.FName] = function + + return nil +} + +// kisTypeConnConfigure 解析Connector配置文件,yaml格式 +func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { + conn := new(config.KisConnConfig) + if ok := yaml.Unmarshal(confData, conn); ok != nil { + return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType)) + } + + if _, ok := all.Conns[conn.CName]; ok { + return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName)) + } + + // 加入配置集合中 + all.Conns[conn.CName] = conn + + return nil +} + +// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中 +func parseConfigWalkYaml(loadPath string) (*allConfig, error) { + + all := new(allConfig) + + all.Flows = make(map[string]*config.KisFlowConfig) + all.Funcs = make(map[string]*config.KisFuncConfig) + all.Conns = make(map[string]*config.KisConnConfig) + + err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error { + // 校验文件后缀是否合法 + if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" { + return nil + } + + // 读取文件内容 + confData, err := ioutil.ReadFile(filePath) + if err != nil { + return err + } + + confMap := make(map[string]interface{}) + + // 校验yaml合法性 + if err := yaml.Unmarshal(confData, confMap); err != nil { + return err + } + + // 判断kisType是否存在 + if kisType, ok := confMap["kistype"]; !ok { + return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath)) + } else { + switch kisType { + case common.KisIdTypeFlow: + return kisTypeFlowConfigure(all, confData, filePath, kisType) + + case common.KisIdTypeFunction: + return kisTypeFuncConfigure(all, confData, filePath, kisType) + + case common.KisIdTypeConnnector: + return kisTypeConnConfigure(all, confData, filePath, kisType) + + default: + return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType)) + } + } + }) + + if err != nil { + return nil, err + } + + return all, nil +} + +func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error { + //加载当前Flow依赖的Function + if funcConfig, ok := all.Funcs[fp.FuncName]; !ok { + return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName)) + } else { + //flow add connector + if funcConfig.Option.CName != "" { + // 加载当前Function依赖的Connector + if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok { + return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName)) + } else { + // Function Config 关联 Connector Config + _ = funcConfig.AddConnConfig(connConf) + } + } + + //flow add function + if err := newFlow.Link(funcConfig, fp.Params); err != nil { + return err + } + } + + return nil +} + +// ConfigImportYaml 全盘解析配置文件,yaml格式 +func ConfigImportYaml(loadPath string) error { + + all, err := parseConfigWalkYaml(loadPath) + if err != nil { + return err + } + + for flowName, flowConfig := range all.Flows { + + // 构建一个Flow + newFlow := flow.NewKisFlow(flowConfig) + + for _, fp := range flowConfig.Flows { + if err := buildFlow(all, fp, newFlow, flowName); err != nil { + return err + } + } + + //将flow添加到FlowPool中 + kis.Pool().AddFlow(flowName, newFlow) + } + + return nil +} diff --git a/flow/kis_flow.go b/flow/kis_flow.go index 352b152..be60604 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -21,13 +21,13 @@ type KisFlow struct { Conf *config.KisFlowConfig // Flow配置策略 // Function列表 - Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionID + Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName FlowHead kis.Function // 当前Flow所拥有的Function列表表头 FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 flock sync.RWMutex // 管理链表插入读写的锁 ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 - ThisFunctionId string // 当前执行到的Function ID (策略配置ID) - PrevFunctionId string // 当前执行到的Function 上一层FunctionID(策略配置ID) + ThisFunctionId string // 当前执行到的Function ID + PrevFunctionId string // 当前执行到的Function 上一层FunctionID // Function列表参数 funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam @@ -122,8 +122,8 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err flow.FlowTail = function } - //将Function ID 详细Hash对应关系添加到flow对象中 - flow.Funcs[function.GetId()] = function + //将Function Name 详细Hash对应关系添加到flow对象中 + flow.Funcs[function.GetConfig().FName] = function //先添加function 默认携带的Params参数 params := make(config.FParam) @@ -228,3 +228,17 @@ func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) { return nil, errors.New("GetConnConf(): Connector is nil") } } + +func (flow *KisFlow) GetConfig() *config.KisFlowConfig { + return flow.Conf +} + +// GetFuncConfigByName 得到当前Flow的配置 +func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig { + if f, ok := flow.Funcs[funcName]; ok { + return f.GetConfig() + } else { + log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName) + return nil + } +} diff --git a/go.mod b/go.mod index 84dbffe..30791b8 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,5 @@ module kis-flow go 1.18 require github.com/google/uuid v1.5.0 + +require gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/kis/flow.go b/kis/flow.go index e6fe744..846db96 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -25,4 +25,8 @@ type Flow interface { GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) + // GetConfig 得到当前Flow的配置 + GetConfig() *config.KisFlowConfig + // GetFuncConfigByName 得到当前Flow的配置 + GetFuncConfigByName(funcName string) *config.KisFuncConfig } diff --git a/kis/pool.go b/kis/pool.go index 4acb116..ba35857 100644 --- a/kis/pool.go +++ b/kis/pool.go @@ -170,3 +170,17 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode)) } + +// GetFlows 得到全部的Flow +func (pool *kisPool) GetFlows() []Flow { + pool.flowLock.RLock() // 读锁 + defer pool.flowLock.RUnlock() + + var flows []Flow + + for _, flow := range pool.flowRouter { + flows = append(flows, flow) + } + + return flows +} diff --git a/test/export_conf/conn-ConnName1.yaml b/test/export_conf/conn-ConnName1.yaml new file mode 100644 index 0000000..c86affb --- /dev/null +++ b/test/export_conf/conn-ConnName1.yaml @@ -0,0 +1,12 @@ +kistype: conn +cname: ConnName1 +addrs: 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 +type: redis +key: redis-key +params: + args1: value1 + args2: value2 +load: [] +save: + - funcName2 + - funcName2 diff --git a/test/export_conf/flow-flowName1.yaml b/test/export_conf/flow-flowName1.yaml new file mode 100644 index 0000000..8f2ca6a --- /dev/null +++ b/test/export_conf/flow-flowName1.yaml @@ -0,0 +1,10 @@ +kistype: flow +status: 1 +flow_name: flowName1 +flows: + - fname: funcName1 + params: {} + - fname: funcName2 + params: {} + - fname: funcName3 + params: {} diff --git a/test/export_conf/func-funcName1.yaml b/test/export_conf/func-funcName1.yaml new file mode 100644 index 0000000..ebc213c --- /dev/null +++ b/test/export_conf/func-funcName1.yaml @@ -0,0 +1,13 @@ +kistype: func +fname: funcName1 +fmode: Verify +source: + name: 公众号抖音商城户订单数据 + must: + - order_id + - user_id +option: + cname: "" + retry_times: 0 + return_duration: 0 + default_params: {} diff --git a/test/export_conf/func-funcName2.yaml b/test/export_conf/func-funcName2.yaml new file mode 100644 index 0000000..647c19e --- /dev/null +++ b/test/export_conf/func-funcName2.yaml @@ -0,0 +1,13 @@ +kistype: func +fname: funcName2 +fmode: Save +source: + name: 用户订单错误率 + must: + - order_id + - user_id +option: + cname: ConnName1 + retry_times: 0 + return_duration: 0 + default_params: {} diff --git a/test/export_conf/func-funcName3.yaml b/test/export_conf/func-funcName3.yaml new file mode 100644 index 0000000..49deac0 --- /dev/null +++ b/test/export_conf/func-funcName3.yaml @@ -0,0 +1,13 @@ +kistype: func +fname: funcName3 +fmode: Calculate +source: + name: 用户订单错误率 + must: + - order_id + - user_id +option: + cname: "" + retry_times: 0 + return_duration: 0 + default_params: {} diff --git a/test/kis_config_export_test.go b/test/kis_config_export_test.go new file mode 100644 index 0000000..1a2d31c --- /dev/null +++ b/test/kis_config_export_test.go @@ -0,0 +1,34 @@ +package test + +import ( + "kis-flow/common" + "kis-flow/file" + "kis-flow/kis" + "kis-flow/test/caas" + "kis-flow/test/faas" + "testing" +) + +func TestConfigExportYmal(t *testing.T) { + // 0. 注册Function 回调业务 + kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) + kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) + kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) + + // 0. 注册ConnectorInit 和 Connector 回调业务 + kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) + kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) + + // 1. 加载配置文件并构建Flow + if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + panic(err) + } + + // 2. 讲构建的内存KisFlow结构配置导出的文件当中 + flows := kis.Pool().GetFlows() + for _, flow := range flows { + if err := file.ConfigExportYaml(flow, "/Users/tal/gopath/src/kis-flow/test/export_conf/"); err != nil { + panic(err) + } + } +} diff --git a/test/kis_config_import_test.go b/test/kis_config_import_test.go new file mode 100644 index 0000000..a0594ff --- /dev/null +++ b/test/kis_config_import_test.go @@ -0,0 +1,42 @@ +package test + +import ( + "context" + "kis-flow/common" + "kis-flow/file" + "kis-flow/kis" + "kis-flow/test/caas" + "kis-flow/test/faas" + "testing" +) + +func TestConfigImportYmal(t *testing.T) { + ctx := context.Background() + + // 0. 注册Function 回调业务 + kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) + kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) + kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) + + // 0. 注册ConnectorInit 和 Connector 回调业务 + kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) + kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) + + // 1. 加载配置文件并构建Flow + if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + panic(err) + } + + // 2. 获取Flow + flow1 := kis.Pool().GetFlow("flowName1") + + // 3. 提交原始数据 + _ = flow1.CommitRow("This is Data1 from Test") + _ = flow1.CommitRow("This is Data2 from Test") + _ = flow1.CommitRow("This is Data3 from Test") + + // 4. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +} diff --git a/test/load_conf/conn/conn-ConnName1.yml b/test/load_conf/conn/conn-ConnName1.yml new file mode 100644 index 0000000..1d59ddc --- /dev/null +++ b/test/load_conf/conn/conn-ConnName1.yml @@ -0,0 +1,11 @@ +kistype: conn +cname: ConnName1 +addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' +type: redis +key: redis-key +params: + args1: value1 + args2: value2 +load: null +save: + - funcName2 \ No newline at end of file diff --git a/test/load_conf/flow/flow-FlowName1.yml b/test/load_conf/flow/flow-FlowName1.yml new file mode 100644 index 0000000..dcc97d2 --- /dev/null +++ b/test/load_conf/flow/flow-FlowName1.yml @@ -0,0 +1,7 @@ +kistype: flow +status: 1 +flow_name: flowName1 +flows: + - fname: funcName1 + - fname: funcName2 + - fname: funcName3 diff --git a/test/load_conf/func/func-FuncName1.yml b/test/load_conf/func/func-FuncName1.yml new file mode 100644 index 0000000..a9c0c49 --- /dev/null +++ b/test/load_conf/func/func-FuncName1.yml @@ -0,0 +1,8 @@ +kistype: func +fname: funcName1 +fmode: Verify +source: + name: 公众号抖音商城户订单数据 + must: + - order_id + - user_id \ No newline at end of file diff --git a/test/load_conf/func/func-FuncName2.yml b/test/load_conf/func/func-FuncName2.yml new file mode 100644 index 0000000..f1589d7 --- /dev/null +++ b/test/load_conf/func/func-FuncName2.yml @@ -0,0 +1,10 @@ +kistype: func +fname: funcName2 +fmode: Save +source: + name: 用户订单错误率 + must: + - order_id + - user_id +option: + cname: ConnName1 \ No newline at end of file diff --git a/test/load_conf/func/func-FuncName3.yml b/test/load_conf/func/func-FuncName3.yml new file mode 100644 index 0000000..98f2fc1 --- /dev/null +++ b/test/load_conf/func/func-FuncName3.yml @@ -0,0 +1,8 @@ +kistype: func +fname: funcName3 +fmode: Calculate +source: + name: 用户订单错误率 + must: + - order_id + - user_id \ No newline at end of file