kisConnConfig init

This commit is contained in:
aceld 2023-12-31 11:45:47 +08:00
parent abda608f60
commit bb0cda32b7
4 changed files with 102 additions and 2 deletions

View File

@ -137,8 +137,6 @@ params:
load: null
save:
- 测试KisFunction_S1
flow:
- MyFlow1
```
#### (4) KisFlow全局配置

View File

@ -28,3 +28,13 @@ const (
FlowEnable KisOnOff = 1 // 启动
FlowDisable KisOnOff = 0 // 不启动
)
type KisConnType string
const (
REDIS KisConnType = "redis"
MYSQL KisConnType = "mysql"
KAFKA KisConnType = "kafka"
TIDB KisConnType = "tidb"
ES KisConnType = "es"
)

View File

@ -2,6 +2,7 @@ package test
import (
"fmt"
"kis-flow/common"
"kis-flow/flow/kis_config"
"testing"
)
@ -51,3 +52,37 @@ func TestNewFuncConfig(t *testing.T) {
fmt.Printf("myFunc1: %+v\n", myFunc1)
}
func TestNewConnConfig(t *testing.T) {
source := flow.KisSource{
Name: "公众号抖音商城户订单数据",
Must: []string{"order_id", "user_id"},
}
option := flow.KisFuncOption{
Cid: "connector_id",
RetryTimes: 3,
RetryDuriton: 300,
Params: flow.FParam{
"param1": "value1",
"param2": "value2",
},
}
myFunc1 := flow.NewFuncConfig("funcId", "funcName", "Save", &source, &option)
connParams := flow.FParam{
"param1": "value1",
"param2": "value2",
}
myConnector1 := flow.NewConnConfig("connectorId", "connectorName", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
if err := myConnector1.WithFunc(myFunc1); err != nil {
fmt.Printf("WithFunc err: %s\n", err.Error())
}
fmt.Printf("myConnector1: %+v\n", myConnector1)
}

View File

@ -0,0 +1,57 @@
package flow
import (
"errors"
"fmt"
"kis-flow/common"
)
// KisConnConfig KisConnector 策略配置
type KisConnConfig struct {
//配置类型
KisType string `yaml:"kistype"`
//唯一标识
CId string `yaml:"cid"`
//唯一描述标识
CName string `yaml:"cname"`
//基础存储媒介地址
AddrString string `yaml:"addrs"`
//存储媒介引擎类型"Mysql" "Redis" "Kafka"等
Type common.KisConnType `yaml:"type"`
//一次存储的标识如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
Key string `yaml:"key"`
//配置信息中的自定义参数
Params map[string]string `yaml:"params"`
//存储读取所绑定的NsFuncionID
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}
// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
func NewConnConfig(cid string, cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CId = cid
strategy.CName = cName
strategy.AddrString = addr
strategy.Type = t
strategy.Key = key
strategy.Params = param
return strategy
}
// WithFunc Connector与Function进行关系绑定
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.Fmode) {
case common.S:
cConfig.Save = append(cConfig.Save, fConfig.Fid)
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.Fid)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.Fmode))
}
return nil
}