mirror of
https://github.com/aceld/kis-flow.git
synced 2025-02-02 15:28:38 +08:00
commit
f284daaa6a
@ -5,7 +5,7 @@ import "time"
|
||||
// 用户生成KisId的字符串前缀
|
||||
const (
|
||||
KisIdTypeFlow = "flow"
|
||||
KisIdTypeConnnector = "conn"
|
||||
KisIdTypeConnector = "conn"
|
||||
KisIdTypeFunction = "func"
|
||||
KisIdTypeGlobal = "global"
|
||||
KisIdJoinChar = "-"
|
||||
|
@ -19,7 +19,7 @@ type KisSource struct {
|
||||
type KisFuncOption struct {
|
||||
CName string `yaml:"cname"` // 连接器Connector名称
|
||||
RetryTimes int `yaml:"retry_times"` // 选填,Function调度重试(不包括正常调度)最大次数
|
||||
RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms)
|
||||
RetryDuration int `yaml:"return_duration"` // 选填,Function调度每次重试最大时间间隔(单位:ms)
|
||||
Params FParam `yaml:"default_params"` // 选填,在当前Flow中Function定制固定配置参数
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ type KisConnector struct {
|
||||
// NewKisConnector 根据配置策略创建一个KisConnector
|
||||
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
|
||||
conn := new(KisConnector)
|
||||
conn.CId = id.KisID(common.KisIdTypeConnnector)
|
||||
conn.CId = id.KisID(common.KisIdTypeConnector)
|
||||
conn.CName = config.CName
|
||||
conn.Conf = config
|
||||
conn.metaData = make(map[string]interface{})
|
||||
|
@ -3,10 +3,11 @@ package file
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io/ioutil"
|
||||
"kis-flow/common"
|
||||
"kis-flow/kis"
|
||||
"os"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// ConfigExportYaml 将flow配置输出,且存储本地
|
||||
@ -16,7 +17,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
|
||||
return err
|
||||
} else {
|
||||
// flow
|
||||
err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
|
||||
err := os.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -31,7 +32,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
|
||||
if fdata, err := yaml.Marshal(fConf); err != nil {
|
||||
return err
|
||||
} else {
|
||||
if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
|
||||
if err := os.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -45,7 +46,7 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
|
||||
if cdata, err := yaml.Marshal(cConf); err != nil {
|
||||
return err
|
||||
} else {
|
||||
if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
|
||||
if err := os.WriteFile(savaPath+common.KisIdTypeConnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -3,8 +3,6 @@ package file
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v3"
|
||||
"io/ioutil"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/flow"
|
||||
@ -12,6 +10,8 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type allConfig struct {
|
||||
@ -22,22 +22,22 @@ type allConfig struct {
|
||||
|
||||
// kisTypeFlowConfigure 解析Flow配置文件,yaml格式
|
||||
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
|
||||
flow := new(config.KisFlowConfig)
|
||||
if ok := yaml.Unmarshal(confData, flow); ok != nil {
|
||||
flowCfg := new(config.KisFlowConfig)
|
||||
if ok := yaml.Unmarshal(confData, flowCfg); ok != nil {
|
||||
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
|
||||
}
|
||||
|
||||
// 如果FLow状态为关闭,则不做配置加载
|
||||
if common.KisOnOff(flow.Status) == common.FlowDisable {
|
||||
if common.KisOnOff(flowCfg.Status) == common.FlowDisable {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, ok := all.Flows[flow.FlowName]; ok {
|
||||
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
|
||||
if _, ok := all.Flows[flowCfg.FlowName]; ok {
|
||||
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flowCfg.FlowName))
|
||||
}
|
||||
|
||||
// 加入配置集合中
|
||||
all.Flows[flow.FlowName] = flow
|
||||
all.Flows[flowCfg.FlowName] = flowCfg
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -91,7 +91,7 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
|
||||
}
|
||||
|
||||
// 读取文件内容
|
||||
confData, err := ioutil.ReadFile(filePath)
|
||||
confData, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -114,7 +114,7 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
|
||||
case common.KisIdTypeFunction:
|
||||
return kisTypeFuncConfigure(all, confData, filePath, kisType)
|
||||
|
||||
case common.KisIdTypeConnnector:
|
||||
case common.KisIdTypeConnector:
|
||||
return kisTypeConnConfigure(all, confData, filePath, kisType)
|
||||
|
||||
default:
|
||||
|
@ -3,7 +3,6 @@ package flow
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/conn"
|
||||
@ -13,6 +12,8 @@ import (
|
||||
"kis-flow/log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
// KisFlow 用于贯穿整条流式计算的上下文环境
|
||||
@ -79,10 +80,10 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
|
||||
// Fork 得到Flow的一个副本(深拷贝)
|
||||
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
|
||||
|
||||
config := flow.Conf
|
||||
cfg := flow.Conf
|
||||
|
||||
// 通过之前的配置生成一个新的Flow
|
||||
newFlow := NewKisFlow(config)
|
||||
newFlow := NewKisFlow(cfg)
|
||||
|
||||
for _, fp := range flow.Conf.Flows {
|
||||
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
|
||||
@ -259,8 +260,8 @@ func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
|
||||
|
||||
// GetConnector 得到当前正在执行的Function的Connector
|
||||
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
|
||||
if conn := flow.ThisFunction.GetConnector(); conn != nil {
|
||||
return conn, nil
|
||||
if connector := flow.ThisFunction.GetConnector(); connector != nil {
|
||||
return connector, nil
|
||||
} else {
|
||||
return nil, errors.New("GetConnector(): Connector is nil")
|
||||
}
|
||||
@ -268,8 +269,8 @@ func (flow *KisFlow) GetConnector() (kis.Connector, error) {
|
||||
|
||||
// GetConnConf 得到当前正在执行的Function的Connector的配置
|
||||
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
|
||||
if conn := flow.ThisFunction.GetConnector(); conn != nil {
|
||||
return conn.GetConfig(), nil
|
||||
if connector := flow.ThisFunction.GetConnector(); connector != nil {
|
||||
return connector.GetConfig(), nil
|
||||
} else {
|
||||
return nil, errors.New("GetConnConf(): Connector is nil")
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestConfigImportYmal(t *testing.T) {
|
||||
func TestConfigImportYaml(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// 0. 注册Function 回调业务
|
||||
|
@ -16,7 +16,7 @@ func TestNewFuncConfig(t *testing.T) {
|
||||
option := config.KisFuncOption{
|
||||
CName: "connectorName1",
|
||||
RetryTimes: 3,
|
||||
RetryDuriton: 300,
|
||||
RetryDuration: 300,
|
||||
|
||||
Params: config.FParam{
|
||||
"param1": "value1",
|
||||
@ -63,7 +63,7 @@ func TestNewConnConfig(t *testing.T) {
|
||||
option := config.KisFuncOption{
|
||||
CName: "connectorName1",
|
||||
RetryTimes: 3,
|
||||
RetryDuriton: 300,
|
||||
RetryDuration: 300,
|
||||
|
||||
Params: config.FParam{
|
||||
"param1": "value1",
|
||||
|
Loading…
Reference in New Issue
Block a user