mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-23 07:30:23 +08:00
kisflow v0.1
This commit is contained in:
parent
c2b2c1ab02
commit
7a841faec1
@ -10,8 +10,6 @@ import (
|
||||
type KisConnConfig struct {
|
||||
//配置类型
|
||||
KisType string `yaml:"kistype"`
|
||||
//唯一标识
|
||||
CId string `yaml:"cid"`
|
||||
//唯一描述标识
|
||||
CName string `yaml:"cname"`
|
||||
//基础存储媒介地址
|
||||
@ -28,9 +26,8 @@ type KisConnConfig struct {
|
||||
}
|
||||
|
||||
// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
|
||||
func NewConnConfig(cid string, cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
|
||||
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
|
||||
strategy := new(KisConnConfig)
|
||||
strategy.CId = cid
|
||||
strategy.CName = cName
|
||||
strategy.AddrString = addr
|
||||
|
||||
@ -44,13 +41,13 @@ func NewConnConfig(cid string, cName string, addr string, t common.KisConnType,
|
||||
// WithFunc Connector与Function进行关系绑定
|
||||
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
|
||||
|
||||
switch common.KisMode(fConfig.Fmode) {
|
||||
switch common.KisMode(fConfig.FMode) {
|
||||
case common.S:
|
||||
cConfig.Save = append(cConfig.Save, fConfig.Fid)
|
||||
cConfig.Save = append(cConfig.Save, fConfig.FName)
|
||||
case common.L:
|
||||
cConfig.Load = append(cConfig.Load, fConfig.Fid)
|
||||
cConfig.Load = append(cConfig.Load, fConfig.FName)
|
||||
default:
|
||||
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.Fmode))
|
||||
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -4,23 +4,21 @@ import "kis-flow/common"
|
||||
|
||||
// KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数
|
||||
type KisFlowFunctionParam struct {
|
||||
Fid string `yaml:"fid"` //必须
|
||||
Params FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
|
||||
FuncName string `yaml:"fname"` //必须
|
||||
Params FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
|
||||
}
|
||||
|
||||
// KisFlowConfig 用户贯穿整条流式计算上下文环境的对象
|
||||
type KisFlowConfig struct {
|
||||
KisType string `yaml:"kistype"`
|
||||
FlowId string `yaml:"flow_id"`
|
||||
Status int `yaml:"status"`
|
||||
FlowName string `yaml:"flow_name"`
|
||||
Flows []KisFlowFunctionParam `yaml:"flows"`
|
||||
}
|
||||
|
||||
// NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息
|
||||
func NewFlowConfig(flowId string, flowName string, enable common.KisOnOff) *KisFlowConfig {
|
||||
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
|
||||
config := new(KisFlowConfig)
|
||||
config.FlowId = flowId
|
||||
config.FlowName = flowName
|
||||
config.Flows = make([]KisFlowFunctionParam, 0)
|
||||
|
||||
|
@ -16,7 +16,7 @@ type KisSource struct {
|
||||
|
||||
// KisFuncOption 可选配置
|
||||
type KisFuncOption struct {
|
||||
Cid string `yaml:"cid"`
|
||||
CName string `yaml:"cname"` //连接器Connector名称
|
||||
RetryTimes int `yaml:"retry_times"` //选填,Function调度重试(不包括正常调度)最大次数
|
||||
RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms)
|
||||
Params FParam `yaml:"default_params"` //选填,在当前Flow中Function定制固定配置参数
|
||||
@ -25,36 +25,34 @@ type KisFuncOption struct {
|
||||
// KisFuncConfig 一个KisFunction策略配置
|
||||
type KisFuncConfig struct {
|
||||
KisType string `yaml:"kistype"`
|
||||
Fid string `yaml:"fid"`
|
||||
Fname string `yaml:"fname"`
|
||||
Fmode string `yaml:"fmode"`
|
||||
FName string `yaml:"fname"`
|
||||
FMode string `yaml:"fmode"`
|
||||
Source KisSource `yaml:"source"`
|
||||
Option KisFuncOption `yaml:"option"`
|
||||
}
|
||||
|
||||
// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
|
||||
func NewFuncConfig(
|
||||
funcId string, funcName string, mode common.KisMode,
|
||||
funcName string, mode common.KisMode,
|
||||
source *KisSource, option *KisFuncOption) *KisFuncConfig {
|
||||
|
||||
config := new(KisFuncConfig)
|
||||
config.Fid = funcId
|
||||
config.Fname = funcName
|
||||
config.FName = funcName
|
||||
|
||||
if source == nil {
|
||||
log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcId)
|
||||
log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName)
|
||||
return nil
|
||||
}
|
||||
config.Source = *source
|
||||
|
||||
config.Fmode = string(mode)
|
||||
config.FMode = string(mode)
|
||||
|
||||
//FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
|
||||
if mode == common.S || mode == common.L {
|
||||
if option == nil {
|
||||
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
|
||||
return nil
|
||||
} else if option.Cid == "" {
|
||||
} else if option.CName == "" {
|
||||
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
|
||||
return nil
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
// KisFlow 用于贯穿整条流式计算的上下文环境
|
||||
type KisFlow struct {
|
||||
// 基础信息
|
||||
Id string // Flow配置策略ID(平台生成管理)
|
||||
Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
|
||||
Name string // Flow的可读名称
|
||||
Conf *config.KisFlowConfig // Flow配置策略
|
||||
|
||||
@ -30,16 +30,15 @@ type KisFlow struct {
|
||||
// Function列表参数
|
||||
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例NsID, value:FParam
|
||||
fplock sync.RWMutex // 管理funcParams的读写锁
|
||||
|
||||
KisId string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
|
||||
}
|
||||
|
||||
// NewKisFlow 创建一个KisFlow.
|
||||
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
|
||||
flow := new(KisFlow)
|
||||
// 实例Id
|
||||
flow.Id = id.KisID(common.KisIdTypeFlow)
|
||||
|
||||
// 基础信息
|
||||
flow.Id = conf.FlowId
|
||||
flow.Name = conf.FlowName
|
||||
flow.Conf = conf
|
||||
|
||||
@ -47,8 +46,6 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
|
||||
flow.Funcs = make(map[string]kis.Function)
|
||||
flow.funcParams = make(map[string]config.FParam)
|
||||
|
||||
flow.KisId = id.KisID(common.KisIdTypeFlow)
|
||||
|
||||
return flow
|
||||
}
|
||||
|
||||
@ -110,7 +107,7 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err
|
||||
|
||||
// 将得到的FParams存留在flow结构体中,用来function业务直接通过Hash获取
|
||||
// key 为当前Function的KisId,不用Fid的原因是为了防止一个Flow添加两个相同策略Id的Function
|
||||
flow.funcParams[function.GetKisId()] = params
|
||||
flow.funcParams[function.GetId()] = params
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -10,18 +10,16 @@ import (
|
||||
)
|
||||
|
||||
type BaseFunction struct {
|
||||
// Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
|
||||
Id string
|
||||
Config *config.KisFuncConfig
|
||||
|
||||
// flow
|
||||
Flow kis.Flow //上下文环境KisFlow
|
||||
cid string //当前Function所依赖的KisConnectorID(如果存在)
|
||||
|
||||
// link
|
||||
N kis.Function //下一个流计算Function
|
||||
P kis.Function //上一个流计算Function
|
||||
|
||||
//KisId , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
|
||||
//KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID,
|
||||
//而KisId则为在KisFlow中KisFunction已经实例化的 实例对象ID 这个ID是随机生成且唯一
|
||||
KisId string
|
||||
}
|
||||
|
||||
// Call
|
||||
@ -55,7 +53,7 @@ func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetId() string {
|
||||
return base.GetConfig().Fid
|
||||
return base.Id
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetPrevId() string {
|
||||
@ -63,7 +61,7 @@ func (base *BaseFunction) GetPrevId() string {
|
||||
//Function为首结点
|
||||
return common.FunctionIdFirstVirtual
|
||||
}
|
||||
return base.P.GetConfig().Fid
|
||||
return base.P.GetId()
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetNextId() string {
|
||||
@ -71,7 +69,7 @@ func (base *BaseFunction) GetNextId() string {
|
||||
//Function为尾结点
|
||||
return common.FunctionIdLastVirtual
|
||||
}
|
||||
return base.N.GetConfig().Fid
|
||||
return base.N.GetId()
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
|
||||
@ -90,20 +88,8 @@ func (base *BaseFunction) GetFlow() kis.Flow {
|
||||
return base.Flow
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetConnId() string {
|
||||
return base.cid
|
||||
}
|
||||
|
||||
func (base *BaseFunction) SetConnId(id string) {
|
||||
base.cid = id
|
||||
}
|
||||
|
||||
func (base *BaseFunction) CreateKisId() {
|
||||
base.KisId = id.KisID(common.KisIdTypeFunction)
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetKisId() string {
|
||||
return base.KisId
|
||||
func (base *BaseFunction) CreateId() {
|
||||
base.Id = id.KisID(common.KisIdTypeFunction)
|
||||
}
|
||||
|
||||
// NewKisFunction 创建一个NsFunction
|
||||
@ -113,7 +99,7 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
|
||||
var f kis.Function
|
||||
|
||||
//工厂生产泛化对象
|
||||
switch common.KisMode(config.Fmode) {
|
||||
switch common.KisMode(config.FMode) {
|
||||
case common.V:
|
||||
f = new(KisFunctionV)
|
||||
break
|
||||
@ -130,6 +116,9 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 生成随机实例唯一ID
|
||||
f.CreateId()
|
||||
|
||||
//设置基础信息属性
|
||||
if err := f.SetConfig(config); err != nil {
|
||||
panic(err)
|
||||
@ -139,12 +128,5 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if config.Option.Cid != "" {
|
||||
f.SetConnId(config.Option.Cid)
|
||||
}
|
||||
|
||||
// 生成随机实力唯一ID
|
||||
f.CreateKisId()
|
||||
|
||||
return f
|
||||
}
|
||||
|
@ -21,22 +21,14 @@ type Function interface {
|
||||
// GetFlow 获取当前Functioin实力所依赖的Flow
|
||||
GetFlow() Flow
|
||||
|
||||
// SetConnId 如果当前Function为S或者L 那么建议设置当前Funciton所关联的Connector
|
||||
SetConnId(string)
|
||||
// GetConnId 获取所关联的Connector CID
|
||||
GetConnId() string
|
||||
|
||||
// CreateId 给当前Funciton实力生成一个随机的实例KisID
|
||||
CreateId()
|
||||
// GetId 获取当前Function的FID
|
||||
GetId() string
|
||||
// GetPrevId 获取当前Function上一个Function节点FID
|
||||
GetPrevId() string
|
||||
// GetNextId 获取当前Function下一个Function节点FID
|
||||
GetNextId() string
|
||||
// GetId 获取当前Function的FID
|
||||
GetId() string
|
||||
|
||||
// CreateKisId 给当前Funciton实力生成一个随机的实例KisID
|
||||
CreateKisId()
|
||||
// GetKisId 获取当前Function的唯一实例KisID
|
||||
GetKisId() string
|
||||
|
||||
// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
|
||||
Next() Function
|
||||
|
@ -10,29 +10,35 @@ type kisDefaultLog struct{}
|
||||
|
||||
func (log *kisDefaultLog) InfoF(str string, v ...interface{}) {
|
||||
fmt.Printf(str, v...)
|
||||
fmt.Printf("\n")
|
||||
}
|
||||
|
||||
func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) {
|
||||
fmt.Printf(str, v...)
|
||||
fmt.Printf("\n")
|
||||
}
|
||||
|
||||
func (log *kisDefaultLog) DebugF(str string, v ...interface{}) {
|
||||
fmt.Printf(str, v...)
|
||||
fmt.Printf("\n")
|
||||
}
|
||||
|
||||
func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) {
|
||||
fmt.Println(ctx)
|
||||
fmt.Printf(str, v...)
|
||||
fmt.Printf("\n")
|
||||
}
|
||||
|
||||
func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) {
|
||||
fmt.Println(ctx)
|
||||
fmt.Printf(str, v...)
|
||||
fmt.Printf("\n")
|
||||
}
|
||||
|
||||
func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) {
|
||||
fmt.Println(ctx)
|
||||
fmt.Printf(str, v...)
|
||||
fmt.Printf("\n")
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -7,30 +7,6 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNewFlowConfig(t *testing.T) {
|
||||
|
||||
flowFuncParams1 := config.KisFlowFunctionParam{
|
||||
Fid: "funcId1",
|
||||
Params: config.FParam{
|
||||
"flowSetFunParam1": "value1",
|
||||
"flowSetFunParam2": "value2",
|
||||
},
|
||||
}
|
||||
|
||||
flowFuncParams2 := config.KisFlowFunctionParam{
|
||||
Fid: "funcId2",
|
||||
Params: config.FParam{
|
||||
"default": "value1",
|
||||
},
|
||||
}
|
||||
|
||||
myFlow1 := config.NewFlowConfig("flowId", "flowName", 1)
|
||||
myFlow1.AppendFunctionConfig(flowFuncParams1)
|
||||
myFlow1.AppendFunctionConfig(flowFuncParams2)
|
||||
|
||||
log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
|
||||
}
|
||||
|
||||
func TestNewFuncConfig(t *testing.T) {
|
||||
source := config.KisSource{
|
||||
Name: "公众号抖音商城户订单数据",
|
||||
@ -38,7 +14,7 @@ func TestNewFuncConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
option := config.KisFuncOption{
|
||||
Cid: "connector_id",
|
||||
CName: "connectorName1",
|
||||
RetryTimes: 3,
|
||||
RetryDuriton: 300,
|
||||
|
||||
@ -48,9 +24,33 @@ func TestNewFuncConfig(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
myFunc1 := config.NewFuncConfig("funcId", "funcName", "Save", &source, &option)
|
||||
myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
|
||||
|
||||
log.Logger().InfoF("myFunc1: %+v\n", myFunc1)
|
||||
log.Logger().InfoF("funcName1: %+v\n", myFunc1)
|
||||
}
|
||||
|
||||
func TestNewFlowConfig(t *testing.T) {
|
||||
|
||||
flowFuncParams1 := config.KisFlowFunctionParam{
|
||||
FuncName: "funcName1",
|
||||
Params: config.FParam{
|
||||
"flowSetFunParam1": "value1",
|
||||
"flowSetFunParam2": "value2",
|
||||
},
|
||||
}
|
||||
|
||||
flowFuncParams2 := config.KisFlowFunctionParam{
|
||||
FuncName: "funcName2",
|
||||
Params: config.FParam{
|
||||
"default": "value1",
|
||||
},
|
||||
}
|
||||
|
||||
myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable)
|
||||
myFlow1.AppendFunctionConfig(flowFuncParams1)
|
||||
myFlow1.AppendFunctionConfig(flowFuncParams2)
|
||||
|
||||
log.Logger().InfoF("myFlow1: %+v\n", myFlow1)
|
||||
}
|
||||
|
||||
func TestNewConnConfig(t *testing.T) {
|
||||
@ -61,7 +61,7 @@ func TestNewConnConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
option := config.KisFuncOption{
|
||||
Cid: "connector_id",
|
||||
CName: "connectorName1",
|
||||
RetryTimes: 3,
|
||||
RetryDuriton: 300,
|
||||
|
||||
@ -71,14 +71,14 @@ func TestNewConnConfig(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
myFunc1 := config.NewFuncConfig("funcId", "funcName", "Save", &source, &option)
|
||||
myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option)
|
||||
|
||||
connParams := config.FParam{
|
||||
"param1": "value1",
|
||||
"param2": "value2",
|
||||
}
|
||||
|
||||
myConnector1 := config.NewConnConfig("connectorId", "connectorName", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
|
||||
myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams)
|
||||
|
||||
if err := myConnector1.WithFunc(myFunc1); err != nil {
|
||||
log.Logger().ErrorF("WithFunc err: %s\n", err.Error())
|
||||
|
@ -22,18 +22,18 @@ func TestNewKisFlow(t *testing.T) {
|
||||
Must: []string{"order_id", "user_id"},
|
||||
}
|
||||
|
||||
myFuncConfig1 := config.NewFuncConfig("funcId1", "funcName", common.C, &source1, nil)
|
||||
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
|
||||
if myFuncConfig1 == nil {
|
||||
panic("myFuncConfig1 is nil")
|
||||
}
|
||||
|
||||
myFuncConfig2 := config.NewFuncConfig("funcId2", "funcName", common.V, &source2, nil)
|
||||
myFuncConfig2 := config.NewFuncConfig("funcName2", common.V, &source2, nil)
|
||||
if myFuncConfig2 == nil {
|
||||
panic("myFuncConfig2 is nil")
|
||||
}
|
||||
|
||||
// 2. 创建一个 KisFlow 配置实例
|
||||
myFlowConfig1 := config.NewFlowConfig("flowId", "flowName", common.FlowEnable)
|
||||
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
|
||||
|
||||
// 3. 创建一个KisFlow对象
|
||||
flow1 := flow.NewKisFlow(myFlowConfig1)
|
||||
|
@ -18,13 +18,13 @@ func TestNewKisFunction(t *testing.T) {
|
||||
Must: []string{"order_id", "user_id"},
|
||||
}
|
||||
|
||||
myFuncConfig1 := config.NewFuncConfig("funcId1", "funcName", common.C, &source, nil)
|
||||
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source, nil)
|
||||
if myFuncConfig1 == nil {
|
||||
panic("myFuncConfig1 is nil")
|
||||
}
|
||||
|
||||
// 2. 创建一个 KisFlow 配置实例
|
||||
myFlowConfig1 := config.NewFlowConfig("flowId", "flowName", common.FlowEnable)
|
||||
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
|
||||
|
||||
// 3. 创建一个KisFlow对象
|
||||
flow1 := flow.NewKisFlow(myFlowConfig1)
|
||||
|
19
test/kis_log_test.go
Normal file
19
test/kis_log_test.go
Normal file
@ -0,0 +1,19 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/log"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestKisLogger(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
log.Logger().InfoFX(ctx, "TestKisLogger InfoFX")
|
||||
log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX")
|
||||
log.Logger().DebugFX(ctx, "TestKisLogger DebugFX")
|
||||
|
||||
log.Logger().InfoF("TestKisLogger InfoF")
|
||||
log.Logger().ErrorF("TestKisLogger ErrorF")
|
||||
log.Logger().DebugF("TestKisLogger DebugF")
|
||||
}
|
Loading…
Reference in New Issue
Block a user