add file yaml config import/export

This commit is contained in:
aceld 2024-01-12 17:27:43 +08:00
parent 0fc13e2540
commit c066ebae48
18 changed files with 459 additions and 5 deletions

57
file/config_export.go Normal file
View File

@ -0,0 +1,57 @@
package file
import (
"errors"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"kis-flow/common"
"kis-flow/kis"
)
// ConfigExportYaml 将flow配置输出且存储本地
func ConfigExportYaml(flow kis.Flow, savaPath string) error {
if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
return err
} else {
//flow
err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
if err != nil {
return err
}
//function
for _, fp := range flow.GetConfig().Flows {
fConf := flow.GetFuncConfigByName(fp.FuncName)
if fConf == nil {
return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
}
if fdata, err := yaml.Marshal(fConf); err != nil {
return err
} else {
if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
return err
}
}
// Connector
if fConf.Option.CName != "" {
cConf, err := fConf.GetConnConfig()
if err != nil {
return err
}
if cdata, err := yaml.Marshal(cConf); err != nil {
return err
} else {
if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
return err
}
}
}
}
}
return nil
}

182
file/config_import.go Normal file
View File

@ -0,0 +1,182 @@
package file
import (
"errors"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/kis"
"os"
"path"
"path/filepath"
)
type allConfig struct {
Flows map[string]*config.KisFlowConfig
Funcs map[string]*config.KisFuncConfig
Conns map[string]*config.KisConnConfig
}
// kisTypeFlowConfigure 解析Flow配置文件yaml格式
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
flow := new(config.KisFlowConfig)
if ok := yaml.Unmarshal(confData, flow); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
// 如果FLow状态为关闭则不做配置加载
if common.KisOnOff(flow.Status) == common.FlowDisable {
return nil
}
if _, ok := all.Flows[flow.FlowName]; ok {
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
}
// 加入配置集合中
all.Flows[flow.FlowName] = flow
return nil
}
// kisTypeFuncConfigure 解析Function配置文件yaml格式
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
function := new(config.KisFuncConfig)
if ok := yaml.Unmarshal(confData, function); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
if _, ok := all.Funcs[function.FName]; ok {
return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
}
// 加入配置集合中
all.Funcs[function.FName] = function
return nil
}
// kisTypeConnConfigure 解析Connector配置文件yaml格式
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
conn := new(config.KisConnConfig)
if ok := yaml.Unmarshal(confData, conn); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
}
if _, ok := all.Conns[conn.CName]; ok {
return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
}
// 加入配置集合中
all.Conns[conn.CName] = conn
return nil
}
// parseConfigWalkYaml 全盘解析配置文件yaml格式, 讲配置信息解析到allConfig中
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
all := new(allConfig)
all.Flows = make(map[string]*config.KisFlowConfig)
all.Funcs = make(map[string]*config.KisFuncConfig)
all.Conns = make(map[string]*config.KisConnConfig)
err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
// 校验文件后缀是否合法
if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
return nil
}
// 读取文件内容
confData, err := ioutil.ReadFile(filePath)
if err != nil {
return err
}
confMap := make(map[string]interface{})
// 校验yaml合法性
if err := yaml.Unmarshal(confData, confMap); err != nil {
return err
}
// 判断kisType是否存在
if kisType, ok := confMap["kistype"]; !ok {
return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
} else {
switch kisType {
case common.KisIdTypeFlow:
return kisTypeFlowConfigure(all, confData, filePath, kisType)
case common.KisIdTypeFunction:
return kisTypeFuncConfigure(all, confData, filePath, kisType)
case common.KisIdTypeConnnector:
return kisTypeConnConfigure(all, confData, filePath, kisType)
default:
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
}
}
})
if err != nil {
return nil, err
}
return all, nil
}
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
//加载当前Flow依赖的Function
if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
} else {
//flow add connector
if funcConfig.Option.CName != "" {
// 加载当前Function依赖的Connector
if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
} else {
// Function Config 关联 Connector Config
_ = funcConfig.AddConnConfig(connConf)
}
}
//flow add function
if err := newFlow.Link(funcConfig, fp.Params); err != nil {
return err
}
}
return nil
}
// ConfigImportYaml 全盘解析配置文件yaml格式
func ConfigImportYaml(loadPath string) error {
all, err := parseConfigWalkYaml(loadPath)
if err != nil {
return err
}
for flowName, flowConfig := range all.Flows {
// 构建一个Flow
newFlow := flow.NewKisFlow(flowConfig)
for _, fp := range flowConfig.Flows {
if err := buildFlow(all, fp, newFlow, flowName); err != nil {
return err
}
}
//将flow添加到FlowPool中
kis.Pool().AddFlow(flowName, newFlow)
}
return nil
}

View File

@ -21,13 +21,13 @@ type KisFlow struct {
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionID
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
FlowHead kis.Function // 当前Flow所拥有的Function列表表头
FlowTail kis.Function // 当前Flow所拥有的Function列表表尾
flock sync.RWMutex // 管理链表插入读写的锁
ThisFunction kis.Function // Flow当前正在执行的KisFunction对象
ThisFunctionId string // 当前执行到的Function ID (策略配置ID)
PrevFunctionId string // 当前执行到的Function 上一层FunctionID(策略配置ID)
ThisFunctionId string // 当前执行到的Function ID
PrevFunctionId string // 当前执行到的Function 上一层FunctionID
// Function列表参数
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
@ -122,8 +122,8 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err
flow.FlowTail = function
}
//将Function ID 详细Hash对应关系添加到flow对象中
flow.Funcs[function.GetId()] = function
//将Function Name 详细Hash对应关系添加到flow对象中
flow.Funcs[function.GetConfig().FName] = function
//先添加function 默认携带的Params参数
params := make(config.FParam)
@ -228,3 +228,17 @@ func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
return nil, errors.New("GetConnConf(): Connector is nil")
}
}
func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
return flow.Conf
}
// GetFuncConfigByName 得到当前Flow的配置
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
if f, ok := flow.Funcs[funcName]; ok {
return f.GetConfig()
} else {
log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
return nil
}
}

2
go.mod
View File

@ -3,3 +3,5 @@ module kis-flow
go 1.18
require github.com/google/uuid v1.5.0
require gopkg.in/yaml.v3 v3.0.1 // indirect

View File

@ -25,4 +25,8 @@ type Flow interface {
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
}

View File

@ -170,3 +170,17 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto
return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}
// GetFlows 得到全部的Flow
func (pool *kisPool) GetFlows() []Flow {
pool.flowLock.RLock() // 读锁
defer pool.flowLock.RUnlock()
var flows []Flow
for _, flow := range pool.flowRouter {
flows = append(flows, flow)
}
return flows
}

View File

@ -0,0 +1,12 @@
kistype: conn
cname: ConnName1
addrs: 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: []
save:
- funcName2
- funcName2

View File

@ -0,0 +1,10 @@
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
params: {}
- fname: funcName2
params: {}
- fname: funcName3
params: {}

View File

@ -0,0 +1,13 @@
kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
must:
- order_id
- user_id
option:
cname: ""
retry_times: 0
return_duration: 0
default_params: {}

View File

@ -0,0 +1,13 @@
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1
retry_times: 0
return_duration: 0
default_params: {}

View File

@ -0,0 +1,13 @@
kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ""
retry_times: 0
return_duration: 0
default_params: {}

View File

@ -0,0 +1,34 @@
package test
import (
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestConfigExportYmal(t *testing.T) {
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 讲构建的内存KisFlow结构配置导出的文件当中
flows := kis.Pool().GetFlows()
for _, flow := range flows {
if err := file.ConfigExportYaml(flow, "/Users/tal/gopath/src/kis-flow/test/export_conf/"); err != nil {
panic(err)
}
}
}

View File

@ -0,0 +1,42 @@
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestConfigImportYmal(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}

View File

@ -0,0 +1,11 @@
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2

View File

@ -0,0 +1,7 @@
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
- fname: funcName2
- fname: funcName3

View File

@ -0,0 +1,8 @@
kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
must:
- order_id
- user_id

View File

@ -0,0 +1,10 @@
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1

View File

@ -0,0 +1,8 @@
kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id