mirror of
https://github.com/aceld/kis-flow.git
synced 2025-02-02 15:28:38 +08:00
v0.2 kisFunctioin Creator
This commit is contained in:
parent
25ef6d73c1
commit
9e560c6b79
@ -1,5 +1,21 @@
|
|||||||
package common
|
package common
|
||||||
|
|
||||||
|
// KisIdType 用户生成KisId的字符串前缀
|
||||||
|
const (
|
||||||
|
KisIdTypeFlow = "flow"
|
||||||
|
KisIdTypeConnnector = "conn"
|
||||||
|
KisIdTypeFunction = "func"
|
||||||
|
KisIdTypeGlobal = "global"
|
||||||
|
KisIdJoinChar = "-"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
|
||||||
|
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
|
||||||
|
// FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
|
||||||
|
FunctionIdLastVirtual = "FunctionIdLastVirtual"
|
||||||
|
)
|
||||||
|
|
||||||
type KisMode string
|
type KisMode string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
19
flow/kis_flow.go
Normal file
19
flow/kis_flow.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package flow
|
||||||
|
|
||||||
|
import "kis-flow/config"
|
||||||
|
|
||||||
|
// KisFlow 用于贯穿整条流式计算的上下文环境
|
||||||
|
type KisFlow struct {
|
||||||
|
Id string
|
||||||
|
Name string
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO for test
|
||||||
|
func NewKisFlow(config *config.KisFlowConfig) *KisFlow {
|
||||||
|
flow := new(KisFlow)
|
||||||
|
flow.Id = config.FlowId
|
||||||
|
flow.Name = config.FlowName
|
||||||
|
|
||||||
|
return flow
|
||||||
|
}
|
107
function/kis_base_function.go
Normal file
107
function/kis_base_function.go
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"kis-flow/common"
|
||||||
|
"kis-flow/config"
|
||||||
|
"kis-flow/flow"
|
||||||
|
"kis-flow/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BaseFunction struct {
|
||||||
|
Config *config.KisFuncConfig
|
||||||
|
|
||||||
|
Flow *flow.KisFlow //上下文环境KisFlow
|
||||||
|
cid string //当前Function所依赖的KisConnectorID(如果存在)
|
||||||
|
|
||||||
|
N KisFunction //下一个流计算Function
|
||||||
|
P KisFunction //上一个流计算Function
|
||||||
|
|
||||||
|
//KisId , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
|
||||||
|
//KisId 和 Function Config中的 Fid的区别在于,Fid用来形容一类Funcion策略的ID,
|
||||||
|
//而KisId则为在KisFlow中KisFunction已经实例化的 实例对象ID 这个ID是随机生成且唯一
|
||||||
|
KisId string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call
|
||||||
|
// BaseFunction 为空实现,目的为了让其他具体类型的KisFunction,如KisFunction_V 来继承BaseFuncion来重写此方法
|
||||||
|
func (base *BaseFunction) Call(ctx context.Context, flow *flow.KisFlow) error { return nil }
|
||||||
|
|
||||||
|
func (base *BaseFunction) Next() KisFunction {
|
||||||
|
return base.N
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) Prev() KisFunction {
|
||||||
|
return base.P
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) SetN(f KisFunction) {
|
||||||
|
base.N = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) SetP(f KisFunction) {
|
||||||
|
base.P = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
|
||||||
|
if s == nil {
|
||||||
|
return errors.New("KisFuncConfig is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
base.Config = s
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetId() string {
|
||||||
|
return base.GetConfig().Fid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetPrevId() string {
|
||||||
|
if base.P == nil {
|
||||||
|
//Function为首结点
|
||||||
|
return common.FunctionIdFirstVirtual
|
||||||
|
}
|
||||||
|
return base.P.GetConfig().Fid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetNextId() string {
|
||||||
|
if base.N == nil {
|
||||||
|
//Function为尾结点
|
||||||
|
return common.FunctionIdLastVirtual
|
||||||
|
}
|
||||||
|
return base.N.GetConfig().Fid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
|
||||||
|
return base.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) SetFlow(f *flow.KisFlow) error {
|
||||||
|
if f == nil {
|
||||||
|
return errors.New("KisFlow is nil")
|
||||||
|
}
|
||||||
|
base.Flow = f
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetFlow() *flow.KisFlow {
|
||||||
|
return base.Flow
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetConnId() string {
|
||||||
|
return base.cid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) SetConnId(id string) {
|
||||||
|
base.cid = id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) CreateKisId() {
|
||||||
|
base.KisId = id.KisID(common.KisIdTypeFunction)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (base *BaseFunction) GetKisId() string {
|
||||||
|
return base.KisId
|
||||||
|
}
|
94
function/kis_function.go
Normal file
94
function/kis_function.go
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"kis-flow/common"
|
||||||
|
"kis-flow/config"
|
||||||
|
"kis-flow/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KisFunction 流式计算基础计算模块,KisFunction是一条流式计算的基本计算逻辑单元,
|
||||||
|
// 任意个KisFunction可以组合成一个KisFlow
|
||||||
|
type KisFunction interface {
|
||||||
|
// Call 执行流式计算逻辑
|
||||||
|
Call(ctx context.Context, flow *flow.KisFlow) error
|
||||||
|
|
||||||
|
// SetConfig 给当前Function实例配置策略
|
||||||
|
SetConfig(s *config.KisFuncConfig) error
|
||||||
|
// GetConfig 获取当前Function实例配置策略
|
||||||
|
GetConfig() *config.KisFuncConfig
|
||||||
|
|
||||||
|
// SetFlow 给当前Function实例设置所依赖的Flow实例
|
||||||
|
SetFlow(f *flow.KisFlow) error
|
||||||
|
// GetFlow 获取当前Functioin实力所依赖的Flow
|
||||||
|
GetFlow() *flow.KisFlow
|
||||||
|
|
||||||
|
// SetConnId 如果当前Function为S或者L 那么建议设置当前Funciton所关联的Connector
|
||||||
|
SetConnId(string)
|
||||||
|
// GetConnId 获取所关联的Connector CID
|
||||||
|
GetConnId() string
|
||||||
|
|
||||||
|
// GetPrevId 获取当前Function上一个Function节点FID
|
||||||
|
GetPrevId() string
|
||||||
|
// GetNextId 获取当前Function下一个Function节点FID
|
||||||
|
GetNextId() string
|
||||||
|
// GetId 获取当前Function的FID
|
||||||
|
GetId() string
|
||||||
|
|
||||||
|
// CreateKisId 给当前Funciton实力生成一个随机的实例KisID
|
||||||
|
CreateKisId()
|
||||||
|
// GetKisId 获取当前Function的唯一实例KisID
|
||||||
|
GetKisId() string
|
||||||
|
|
||||||
|
// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
|
||||||
|
Next() KisFunction
|
||||||
|
// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
|
||||||
|
Prev() KisFunction
|
||||||
|
// SetN 设置下一层Function实例
|
||||||
|
SetN(f KisFunction)
|
||||||
|
// SetP 设置上一层Function实例
|
||||||
|
SetP(f KisFunction)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKisFunction 创建一个NsFunction
|
||||||
|
// flow: 当前所属的flow实例
|
||||||
|
// s : 当前function的配置策略
|
||||||
|
func NewKisFunction(flow *flow.KisFlow, config *config.KisFuncConfig) KisFunction {
|
||||||
|
var f KisFunction
|
||||||
|
|
||||||
|
//工厂生产泛化对象
|
||||||
|
switch common.KisMode(config.Fmode) {
|
||||||
|
case common.V:
|
||||||
|
f = new(KisFunctionV)
|
||||||
|
break
|
||||||
|
case common.S:
|
||||||
|
f = new(KisFunctionS)
|
||||||
|
case common.L:
|
||||||
|
f = new(KisFunctionL)
|
||||||
|
case common.C:
|
||||||
|
f = new(KisFunctionC)
|
||||||
|
case common.E:
|
||||||
|
f = new(KisFunctionE)
|
||||||
|
default:
|
||||||
|
//LOG ERROR
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//设置基础信息属性
|
||||||
|
if err := f.SetConfig(config); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := f.SetFlow(flow); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.Option.Cid != "" {
|
||||||
|
f.SetConnId(config.Option.Cid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 生成随机实力唯一ID
|
||||||
|
f.CreateKisId()
|
||||||
|
|
||||||
|
return f
|
||||||
|
}
|
19
function/kis_function_c.go
Normal file
19
function/kis_function_c.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"kis-flow/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KisFunctionC struct {
|
||||||
|
BaseFunction
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *KisFunctionC) Call(ctx context.Context, flow *flow.KisFlow) error {
|
||||||
|
fmt.Printf("KisFunction_C, flow = %+v\n", flow)
|
||||||
|
|
||||||
|
// TODO 调用具体的Function执行方法
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
19
function/kis_function_e.go
Normal file
19
function/kis_function_e.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"kis-flow/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KisFunctionE struct {
|
||||||
|
BaseFunction
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *KisFunctionE) Call(ctx context.Context, flow *flow.KisFlow) error {
|
||||||
|
fmt.Printf("KisFunctionE, flow = %+v\n", flow)
|
||||||
|
|
||||||
|
// TODO 调用具体的Function执行方法
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
19
function/kis_function_l.go
Normal file
19
function/kis_function_l.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"kis-flow/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KisFunctionL struct {
|
||||||
|
BaseFunction
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *KisFunctionL) Call(ctx context.Context, flow *flow.KisFlow) error {
|
||||||
|
fmt.Printf("KisFunctionL, flow = %+v\n", flow)
|
||||||
|
|
||||||
|
// TODO 调用具体的Function执行方法
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
19
function/kis_function_s.go
Normal file
19
function/kis_function_s.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"kis-flow/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KisFunctionS struct {
|
||||||
|
BaseFunction
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *KisFunctionS) Call(ctx context.Context, flow *flow.KisFlow) error {
|
||||||
|
fmt.Printf("KisFunctionS, flow = %+v\n", flow)
|
||||||
|
|
||||||
|
// TODO 调用具体的Function执行方法
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
42
function/kis_function_test.go
Normal file
42
function/kis_function_test.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"kis-flow/common"
|
||||||
|
"kis-flow/config"
|
||||||
|
"kis-flow/flow"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewKisFunction(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// 1. 创建一个KisFunction配置实例
|
||||||
|
source := config.KisSource{
|
||||||
|
Name: "公众号抖音商城户订单数据",
|
||||||
|
Must: []string{"order_id", "user_id"},
|
||||||
|
}
|
||||||
|
|
||||||
|
myFuncConfig1 := config.NewFuncConfig("funcId1", "funcName", common.C, &source, nil)
|
||||||
|
if myFuncConfig1 == nil {
|
||||||
|
panic("myFuncConfig1 is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 创建一个 KisFlow 配置实例
|
||||||
|
flowFuncParams1 := config.KisFlowFunctionParam{
|
||||||
|
Fid: "funcId1",
|
||||||
|
}
|
||||||
|
|
||||||
|
myFlowConfig1 := config.NewFlowConfig("flowId", "flowName", common.FlowEnable)
|
||||||
|
myFlowConfig1.AppendFunctionConfig(flowFuncParams1)
|
||||||
|
|
||||||
|
// 3. 创建一个KisFlow对象
|
||||||
|
flow1 := flow.NewKisFlow(myFlowConfig1)
|
||||||
|
|
||||||
|
// 4. 创建一个KisFunction对象
|
||||||
|
func1 := NewKisFunction(flow1, myFuncConfig1)
|
||||||
|
|
||||||
|
if err := func1.Call(ctx, flow1); err != nil {
|
||||||
|
t.Errorf("func1.Call() error = %v", err)
|
||||||
|
}
|
||||||
|
}
|
19
function/kis_function_v.go
Normal file
19
function/kis_function_v.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
package function
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"kis-flow/flow"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KisFunctionV struct {
|
||||||
|
BaseFunction
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *KisFunctionV) Call(ctx context.Context, flow *flow.KisFlow) error {
|
||||||
|
fmt.Printf("KisFunctionV, flow = %+v\n", flow)
|
||||||
|
|
||||||
|
// TODO 调用具体的Function执行方法
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
2
go.mod
2
go.mod
@ -1,3 +1,5 @@
|
|||||||
module kis-flow
|
module kis-flow
|
||||||
|
|
||||||
go 1.18
|
go 1.18
|
||||||
|
|
||||||
|
require github.com/google/uuid v1.5.0
|
||||||
|
34
id/kis_id.go
Normal file
34
id/kis_id.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package id
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"kis-flow/common"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// KisID 获取一个中随机实例ID
|
||||||
|
// 格式为 "prefix1-[prefix2-][prefix3-]ID"
|
||||||
|
// 如:flow-1234567890
|
||||||
|
// 如:func-1234567890
|
||||||
|
// 如: conn-1234567890
|
||||||
|
// 如: func-1-1234567890
|
||||||
|
func KisID(prefix ...string) (kisId string) {
|
||||||
|
|
||||||
|
idStr := strings.Replace(uuid.New().String(), "-", "", -1)
|
||||||
|
kisId = formatKisID(idStr, prefix...)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatKisID(idStr string, prefix ...string) string {
|
||||||
|
var kisId string
|
||||||
|
|
||||||
|
for _, fix := range prefix {
|
||||||
|
kisId += fix
|
||||||
|
kisId += common.KisIdJoinChar
|
||||||
|
}
|
||||||
|
|
||||||
|
kisId += idStr
|
||||||
|
|
||||||
|
return kisId
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user