From 9d55259727b07192e337145b5a38e8fc2daf5ecc Mon Sep 17 00:00:00 2001 From: aceld Date: Mon, 1 Jan 2024 17:49:27 +0800 Subject: [PATCH] kis-flow v0.1 init KisFlow,KisFunction --- flow/kis_flow.go | 139 ++++++++++++++++++-- function/kis_base_function.go | 67 ++++++++-- function/kis_function_c.go | 6 +- function/kis_function_e.go | 4 +- function/kis_function_l.go | 4 +- function/kis_function_s.go | 4 +- function/kis_function_v.go | 4 +- kis/flow.go | 13 ++ function/kis_function.go => kis/function.go | 65 ++------- {config => test}/kis_config_test.go | 33 ++--- test/kis_flow_test.go | 53 ++++++++ {function => test}/kis_function_test.go | 10 +- 12 files changed, 293 insertions(+), 109 deletions(-) create mode 100644 kis/flow.go rename function/kis_function.go => kis/function.go (51%) rename {config => test}/kis_config_test.go (60%) create mode 100644 test/kis_flow_test.go rename {function => test}/kis_function_test.go (81%) diff --git a/flow/kis_flow.go b/flow/kis_flow.go index 888fa18..5eecfa1 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -1,19 +1,142 @@ package flow -import "kis-flow/config" +import ( + "context" + "errors" + "kis-flow/common" + "kis-flow/config" + "kis-flow/function" + "kis-flow/id" + "kis-flow/kis" + "sync" +) // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { - Id string - Name string - // TODO + // 基础信息 + Id string // Flow配置策略ID(平台生成管理) + Name string // Flow的可读名称 + Conf *config.KisFlowConfig // Flow配置策略 + + // Function列表 + Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionID + FlowHead kis.Function // 当前Flow所拥有的Function列表表头 + FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 + flock sync.RWMutex // 管理链表插入读写的锁 + ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 + ThisFunctionId string // 当前执行到的Function ID (策略配置ID) + PrevFunctionId string // 当前执行到的Function 上一层FunctionID(策略配置ID) + + // Function列表参数 + funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例NsID, value:FParam + fplock sync.RWMutex // 管理funcParams的读写锁 + + KisId string // Flow的分布式实例ID(用于KisFlow内部区分不同实例) } -// TODO for test -func NewKisFlow(config *config.KisFlowConfig) *KisFlow { +// NewKisFlow 创建一个KisFlow. +func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) - flow.Id = config.FlowId - flow.Name = config.FlowName + + // 基础信息 + flow.Id = conf.FlowId + flow.Name = conf.FlowName + flow.Conf = conf + + // Function列表 + flow.Funcs = make(map[string]kis.Function) + flow.funcParams = make(map[string]config.FParam) + + flow.KisId = id.KisID(common.KisIdTypeFlow) return flow } + +// Link 将Function链接到Flow中 +// fConf: 当前Function策略 +// fParams: 当前Flow携带的Function动态参数 +func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error { + // 创建Function + f := function.NewKisFunction(flow, fConf) + + // Flow 添加 Function + if err := flow.appendFunc(f, fParams); err != nil { + return err + } + + return nil +} + +// appendFunc 将Function添加到Flow中, 链表操作 +func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error { + + if function == nil { + return errors.New("AppendFunc append nil to List") + } + + flow.flock.Lock() + defer flow.flock.Unlock() + + if flow.FlowHead == nil { + // 首次添加节点 + flow.FlowHead = function + flow.FlowTail = function + + function.SetN(nil) + function.SetP(nil) + + } else { + // 将function插入到链表的尾部 + function.SetP(flow.FlowTail) + function.SetN(nil) + + flow.FlowTail.SetN(function) + flow.FlowTail = function + } + + //将Function ID 详细Hash对应关系添加到flow对象中 + flow.Funcs[function.GetId()] = function + + //先添加function 默认携带的Params参数 + params := make(config.FParam) + for key, value := range function.GetConfig().Option.Params { + params[key] = value + } + + //再添加flow携带的function定义参数(重复即覆盖) + for key, value := range fParam { + params[key] = value + } + + // 将得到的FParams存留在flow结构体中,用来function业务直接通过Hash获取 + // key 为当前Function的KisId,不用Fid的原因是为了防止一个Flow添加两个相同策略Id的Function + flow.funcParams[function.GetKisId()] = params + + return nil +} + +// Run 启动KisFlow的流式计算, 从起始Function开始执行流 +func (flow *KisFlow) Run(ctx context.Context) error { + + var fn kis.Function + + fn = flow.FlowHead + + if flow.Conf.Status == int(common.FlowDisable) { + //flow被配置关闭 + return nil + } + + //流式链式调用 + for fn != nil { + if err := fn.Call(ctx, flow); err != nil { + //Error + return err + } else { + //Success + fn = fn.Next() + } + } + + return nil +} diff --git a/function/kis_base_function.go b/function/kis_base_function.go index 57e9762..72352eb 100644 --- a/function/kis_base_function.go +++ b/function/kis_base_function.go @@ -5,18 +5,18 @@ import ( "errors" "kis-flow/common" "kis-flow/config" - "kis-flow/flow" "kis-flow/id" + "kis-flow/kis" ) type BaseFunction struct { Config *config.KisFuncConfig - Flow *flow.KisFlow //上下文环境KisFlow - cid string //当前Function所依赖的KisConnectorID(如果存在) + Flow kis.Flow //上下文环境KisFlow + cid string //当前Function所依赖的KisConnectorID(如果存在) - N KisFunction //下一个流计算Function - P KisFunction //上一个流计算Function + N kis.Function //下一个流计算Function + P kis.Function //上一个流计算Function //KisId , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象 //KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID, @@ -26,21 +26,21 @@ type BaseFunction struct { // Call // BaseFunction 为空实现,目的为了让其他具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此方法 -func (base *BaseFunction) Call(ctx context.Context, flow *flow.KisFlow) error { return nil } +func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil } -func (base *BaseFunction) Next() KisFunction { +func (base *BaseFunction) Next() kis.Function { return base.N } -func (base *BaseFunction) Prev() KisFunction { +func (base *BaseFunction) Prev() kis.Function { return base.P } -func (base *BaseFunction) SetN(f KisFunction) { +func (base *BaseFunction) SetN(f kis.Function) { base.N = f } -func (base *BaseFunction) SetP(f KisFunction) { +func (base *BaseFunction) SetP(f kis.Function) { base.P = f } @@ -78,7 +78,7 @@ func (base *BaseFunction) GetConfig() *config.KisFuncConfig { return base.Config } -func (base *BaseFunction) SetFlow(f *flow.KisFlow) error { +func (base *BaseFunction) SetFlow(f kis.Flow) error { if f == nil { return errors.New("KisFlow is nil") } @@ -86,7 +86,7 @@ func (base *BaseFunction) SetFlow(f *flow.KisFlow) error { return nil } -func (base *BaseFunction) GetFlow() *flow.KisFlow { +func (base *BaseFunction) GetFlow() kis.Flow { return base.Flow } @@ -105,3 +105,46 @@ func (base *BaseFunction) CreateKisId() { func (base *BaseFunction) GetKisId() string { return base.KisId } + +// NewKisFunction 创建一个NsFunction +// flow: 当前所属的flow实例 +// s : 当前function的配置策略 +func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function { + var f kis.Function + + //工厂生产泛化对象 + switch common.KisMode(config.Fmode) { + case common.V: + f = new(KisFunctionV) + break + case common.S: + f = new(KisFunctionS) + case common.L: + f = new(KisFunctionL) + case common.C: + f = new(KisFunctionC) + case common.E: + f = new(KisFunctionE) + default: + //LOG ERROR + return nil + } + + //设置基础信息属性 + if err := f.SetConfig(config); err != nil { + panic(err) + } + + if err := f.SetFlow(flow); err != nil { + panic(err) + } + + if config.Option.Cid != "" { + f.SetConnId(config.Option.Cid) + } + + // 生成随机实力唯一ID + f.CreateKisId() + + return f +} diff --git a/function/kis_function_c.go b/function/kis_function_c.go index ebfdcb9..0c92e12 100644 --- a/function/kis_function_c.go +++ b/function/kis_function_c.go @@ -3,15 +3,15 @@ package function import ( "context" "fmt" - "kis-flow/flow" + "kis-flow/kis" ) type KisFunctionC struct { BaseFunction } -func (f *KisFunctionC) Call(ctx context.Context, flow *flow.KisFlow) error { - fmt.Printf("KisFunction_C, flow = %+v\n", flow) +func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { + fmt.Printf("KisFunctionC, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 diff --git a/function/kis_function_e.go b/function/kis_function_e.go index eec90d8..c26bc75 100644 --- a/function/kis_function_e.go +++ b/function/kis_function_e.go @@ -3,14 +3,14 @@ package function import ( "context" "fmt" - "kis-flow/flow" + "kis-flow/kis" ) type KisFunctionE struct { BaseFunction } -func (f *KisFunctionE) Call(ctx context.Context, flow *flow.KisFlow) error { +func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionE, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 diff --git a/function/kis_function_l.go b/function/kis_function_l.go index d020dab..77c5126 100644 --- a/function/kis_function_l.go +++ b/function/kis_function_l.go @@ -3,14 +3,14 @@ package function import ( "context" "fmt" - "kis-flow/flow" + "kis-flow/kis" ) type KisFunctionL struct { BaseFunction } -func (f *KisFunctionL) Call(ctx context.Context, flow *flow.KisFlow) error { +func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionL, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 diff --git a/function/kis_function_s.go b/function/kis_function_s.go index d160b7f..3e06695 100644 --- a/function/kis_function_s.go +++ b/function/kis_function_s.go @@ -3,14 +3,14 @@ package function import ( "context" "fmt" - "kis-flow/flow" + "kis-flow/kis" ) type KisFunctionS struct { BaseFunction } -func (f *KisFunctionS) Call(ctx context.Context, flow *flow.KisFlow) error { +func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionS, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 diff --git a/function/kis_function_v.go b/function/kis_function_v.go index 98c99e2..fd02514 100644 --- a/function/kis_function_v.go +++ b/function/kis_function_v.go @@ -3,14 +3,14 @@ package function import ( "context" "fmt" - "kis-flow/flow" + "kis-flow/kis" ) type KisFunctionV struct { BaseFunction } -func (f *KisFunctionV) Call(ctx context.Context, flow *flow.KisFlow) error { +func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error { fmt.Printf("KisFunctionV, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 diff --git a/kis/flow.go b/kis/flow.go new file mode 100644 index 0000000..b142ef5 --- /dev/null +++ b/kis/flow.go @@ -0,0 +1,13 @@ +package kis + +import ( + "context" + "kis-flow/config" +) + +type Flow interface { + // Run 调度Flow,依次调度Flow中的Function并且执行 + Run(ctx context.Context) error + // Link 将Flow中的Function按照配置文件中的配置进行连接 + Link(fConf *config.KisFuncConfig, fParams config.FParam) error +} diff --git a/function/kis_function.go b/kis/function.go similarity index 51% rename from function/kis_function.go rename to kis/function.go index 0980649..06964b9 100644 --- a/function/kis_function.go +++ b/kis/function.go @@ -1,17 +1,15 @@ -package function +package kis import ( "context" - "kis-flow/common" "kis-flow/config" - "kis-flow/flow" ) -// KisFunction 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元, +// Function 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元, // 任意个KisFunction可以组合成一个KisFlow -type KisFunction interface { +type Function interface { // Call 执行流式计算逻辑 - Call(ctx context.Context, flow *flow.KisFlow) error + Call(ctx context.Context, flow Flow) error // SetConfig 给当前Function实例配置策略 SetConfig(s *config.KisFuncConfig) error @@ -19,9 +17,9 @@ type KisFunction interface { GetConfig() *config.KisFuncConfig // SetFlow 给当前Function实例设置所依赖的Flow实例 - SetFlow(f *flow.KisFlow) error + SetFlow(f Flow) error // GetFlow 获取当前Functioin实力所依赖的Flow - GetFlow() *flow.KisFlow + GetFlow() Flow // SetConnId 如果当前Function为S或者L 那么建议设置当前Funciton所关联的Connector SetConnId(string) @@ -41,54 +39,11 @@ type KisFunction interface { GetKisId() string // Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil - Next() KisFunction + Next() Function // Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil - Prev() KisFunction + Prev() Function // SetN 设置下一层Function实例 - SetN(f KisFunction) + SetN(f Function) // SetP 设置上一层Function实例 - SetP(f KisFunction) -} - -// NewKisFunction 创建一个NsFunction -// flow: 当前所属的flow实例 -// s : 当前function的配置策略 -func NewKisFunction(flow *flow.KisFlow, config *config.KisFuncConfig) KisFunction { - var f KisFunction - - //工厂生产泛化对象 - switch common.KisMode(config.Fmode) { - case common.V: - f = new(KisFunctionV) - break - case common.S: - f = new(KisFunctionS) - case common.L: - f = new(KisFunctionL) - case common.C: - f = new(KisFunctionC) - case common.E: - f = new(KisFunctionE) - default: - //LOG ERROR - return nil - } - - //设置基础信息属性 - if err := f.SetConfig(config); err != nil { - panic(err) - } - - if err := f.SetFlow(flow); err != nil { - panic(err) - } - - if config.Option.Cid != "" { - f.SetConnId(config.Option.Cid) - } - - // 生成随机实力唯一ID - f.CreateKisId() - - return f + SetP(f Function) } diff --git a/config/kis_config_test.go b/test/kis_config_test.go similarity index 60% rename from config/kis_config_test.go rename to test/kis_config_test.go index fbe1bb6..1c52495 100644 --- a/config/kis_config_test.go +++ b/test/kis_config_test.go @@ -1,29 +1,30 @@ -package config +package test import ( "fmt" "kis-flow/common" + "kis-flow/config" "testing" ) func TestNewFlowConfig(t *testing.T) { - flowFuncParams1 := KisFlowFunctionParam{ + flowFuncParams1 := config.KisFlowFunctionParam{ Fid: "funcId1", - Params: FParam{ + Params: config.FParam{ "flowSetFunParam1": "value1", "flowSetFunParam2": "value2", }, } - flowFuncParams2 := KisFlowFunctionParam{ + flowFuncParams2 := config.KisFlowFunctionParam{ Fid: "funcId2", - Params: FParam{ + Params: config.FParam{ "default": "value1", }, } - myFlow1 := NewFlowConfig("flowId", "flowName", 1) + myFlow1 := config.NewFlowConfig("flowId", "flowName", 1) myFlow1.AppendFunctionConfig(flowFuncParams1) myFlow1.AppendFunctionConfig(flowFuncParams2) @@ -31,53 +32,53 @@ func TestNewFlowConfig(t *testing.T) { } func TestNewFuncConfig(t *testing.T) { - source := KisSource{ + source := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } - option := KisFuncOption{ + option := config.KisFuncOption{ Cid: "connector_id", RetryTimes: 3, RetryDuriton: 300, - Params: FParam{ + Params: config.FParam{ "param1": "value1", "param2": "value2", }, } - myFunc1 := NewFuncConfig("funcId", "funcName", "Save", &source, &option) + myFunc1 := config.NewFuncConfig("funcId", "funcName", "Save", &source, &option) fmt.Printf("myFunc1: %+v\n", myFunc1) } func TestNewConnConfig(t *testing.T) { - source := KisSource{ + source := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } - option := KisFuncOption{ + option := config.KisFuncOption{ Cid: "connector_id", RetryTimes: 3, RetryDuriton: 300, - Params: FParam{ + Params: config.FParam{ "param1": "value1", "param2": "value2", }, } - myFunc1 := NewFuncConfig("funcId", "funcName", "Save", &source, &option) + myFunc1 := config.NewFuncConfig("funcId", "funcName", "Save", &source, &option) - connParams := FParam{ + connParams := config.FParam{ "param1": "value1", "param2": "value2", } - myConnector1 := NewConnConfig("connectorId", "connectorName", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams) + myConnector1 := config.NewConnConfig("connectorId", "connectorName", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams) if err := myConnector1.WithFunc(myFunc1); err != nil { fmt.Printf("WithFunc err: %s\n", err.Error()) diff --git a/test/kis_flow_test.go b/test/kis_flow_test.go new file mode 100644 index 0000000..419594c --- /dev/null +++ b/test/kis_flow_test.go @@ -0,0 +1,53 @@ +package test + +import ( + "context" + "kis-flow/common" + "kis-flow/config" + "kis-flow/flow" + "testing" +) + +func TestNewKisFlow(t *testing.T) { + ctx := context.Background() + + // 1. 创建2个KisFunction配置实例 + source1 := config.KisSource{ + Name: "公众号抖音商城户订单数据", + Must: []string{"order_id", "user_id"}, + } + + source2 := config.KisSource{ + Name: "用户订单错误率", + Must: []string{"order_id", "user_id"}, + } + + myFuncConfig1 := config.NewFuncConfig("funcId1", "funcName", common.C, &source1, nil) + if myFuncConfig1 == nil { + panic("myFuncConfig1 is nil") + } + + myFuncConfig2 := config.NewFuncConfig("funcId2", "funcName", common.V, &source2, nil) + if myFuncConfig2 == nil { + panic("myFuncConfig2 is nil") + } + + // 2. 创建一个 KisFlow 配置实例 + myFlowConfig1 := config.NewFlowConfig("flowId", "flowName", common.FlowEnable) + + // 3. 创建一个KisFlow对象 + flow1 := flow.NewKisFlow(myFlowConfig1) + + // 4. 拼接Functioin 到 Flow 上 + if err := flow1.Link(myFuncConfig1, nil); err != nil { + panic(err) + } + if err := flow1.Link(myFuncConfig2, nil); err != nil { + panic(err) + } + + // 5. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +} diff --git a/function/kis_function_test.go b/test/kis_function_test.go similarity index 81% rename from function/kis_function_test.go rename to test/kis_function_test.go index 020c90c..4e77a45 100644 --- a/function/kis_function_test.go +++ b/test/kis_function_test.go @@ -1,10 +1,11 @@ -package function +package test import ( "context" "kis-flow/common" "kis-flow/config" "kis-flow/flow" + "kis-flow/function" "testing" ) @@ -23,18 +24,13 @@ func TestNewKisFunction(t *testing.T) { } // 2. 创建一个 KisFlow 配置实例 - flowFuncParams1 := config.KisFlowFunctionParam{ - Fid: "funcId1", - } - myFlowConfig1 := config.NewFlowConfig("flowId", "flowName", common.FlowEnable) - myFlowConfig1.AppendFunctionConfig(flowFuncParams1) // 3. 创建一个KisFlow对象 flow1 := flow.NewKisFlow(myFlowConfig1) // 4. 创建一个KisFunction对象 - func1 := NewKisFunction(flow1, myFuncConfig1) + func1 := function.NewKisFunction(flow1, myFuncConfig1) if err := func1.Call(ctx, flow1); err != nil { t.Errorf("func1.Call() error = %v", err)