mirror of
https://github.com/aceld/kis-flow.git
synced 2025-02-02 15:28:38 +08:00
add kis connector model
This commit is contained in:
parent
634c7b4508
commit
0fc13e2540
@ -1,6 +1,7 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"kis-flow/common"
|
||||
"kis-flow/log"
|
||||
)
|
||||
@ -24,11 +25,12 @@ type KisFuncOption struct {
|
||||
|
||||
// KisFuncConfig 一个KisFunction策略配置
|
||||
type KisFuncConfig struct {
|
||||
KisType string `yaml:"kistype"`
|
||||
FName string `yaml:"fname"`
|
||||
FMode string `yaml:"fmode"`
|
||||
Source KisSource `yaml:"source"`
|
||||
Option KisFuncOption `yaml:"option"`
|
||||
KisType string `yaml:"kistype"`
|
||||
FName string `yaml:"fname"`
|
||||
FMode string `yaml:"fmode"`
|
||||
Source KisSource `yaml:"source"`
|
||||
Option KisFuncOption `yaml:"option"`
|
||||
connConf *KisConnConfig
|
||||
}
|
||||
|
||||
// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
|
||||
@ -64,3 +66,25 @@ func NewFuncConfig(
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
|
||||
if cConf == nil {
|
||||
return errors.New("KisConnConfig is nil")
|
||||
}
|
||||
|
||||
// Function需要和Connector进行关联
|
||||
fConf.connConf = cConf
|
||||
|
||||
// Connector需要和Function进行关联
|
||||
_ = cConf.WithFunc(fConf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
|
||||
if fConf.connConf == nil {
|
||||
return nil, errors.New("KisFuncConfig.connConf not set")
|
||||
}
|
||||
|
||||
return fConf.connConf, nil
|
||||
}
|
||||
|
65
conn/kis_connector.go
Normal file
65
conn/kis_connector.go
Normal file
@ -0,0 +1,65 @@
|
||||
package conn
|
||||
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/id"
|
||||
"kis-flow/kis"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type KisConnector struct {
|
||||
// Connector ID
|
||||
CId string
|
||||
// Connector Name
|
||||
CName string
|
||||
// Connector Config
|
||||
Conf *config.KisConnConfig
|
||||
|
||||
// Connector Init
|
||||
onceInit sync.Once
|
||||
}
|
||||
|
||||
// NewKisConnector 根据配置策略创建一个KisConnector
|
||||
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
|
||||
conn := new(KisConnector)
|
||||
conn.CId = id.KisID(common.KisIdTypeConnnector)
|
||||
conn.CName = config.CName
|
||||
conn.Conf = config
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
// Init 初始化Connector所关联的存储引擎链接等
|
||||
func (conn *KisConnector) Init() error {
|
||||
var err error
|
||||
|
||||
//一个Connector只能执行初始化业务一次
|
||||
conn.onceInit.Do(func() {
|
||||
err = kis.Pool().CallConnInit(conn)
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Call 调用Connector 外挂存储逻辑的读写操作
|
||||
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) error {
|
||||
if err := kis.Pool().CallConnector(ctx, flow, conn, args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (conn *KisConnector) GetName() string {
|
||||
return conn.CName
|
||||
}
|
||||
|
||||
func (conn *KisConnector) GetConfig() *config.KisConnConfig {
|
||||
return conn.Conf
|
||||
}
|
||||
|
||||
func (conn *KisConnector) GetId() string {
|
||||
return conn.CId
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/conn"
|
||||
"kis-flow/function"
|
||||
"kis-flow/id"
|
||||
"kis-flow/kis"
|
||||
@ -62,9 +63,30 @@ func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
|
||||
// fConf: 当前Function策略
|
||||
// fParams: 当前Flow携带的Function动态参数
|
||||
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
|
||||
// 创建Function
|
||||
// 创建Function实例
|
||||
f := function.NewKisFunction(flow, fConf)
|
||||
|
||||
if fConf.Option.CName != "" {
|
||||
// 当前Function有Connector关联,需要初始化Connector实例
|
||||
|
||||
// 获取Connector配置
|
||||
connConfig, err := fConf.GetConnConfig()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 创建Connector对象
|
||||
connector := conn.NewKisConnector(connConfig)
|
||||
|
||||
// 初始化Connector, 执行Connector Init 方法
|
||||
if err = connector.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 关联Function实例和Connector实例关系
|
||||
_ = f.AddConnector(connector)
|
||||
}
|
||||
|
||||
// Flow 添加 Function
|
||||
if err := flow.appendFunc(f, fParams); err != nil {
|
||||
return err
|
||||
@ -188,3 +210,21 @@ func (flow *KisFlow) GetThisFunction() kis.Function {
|
||||
func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
|
||||
return flow.ThisFunction.GetConfig()
|
||||
}
|
||||
|
||||
// GetConnector 得到当前正在执行的Function的Connector
|
||||
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
|
||||
if conn := flow.ThisFunction.GetConnector(); conn != nil {
|
||||
return conn, nil
|
||||
} else {
|
||||
return nil, errors.New("GetConnector(): Connector is nil")
|
||||
}
|
||||
}
|
||||
|
||||
// GetConnConf 得到当前正在执行的Function的Connector的配置
|
||||
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
|
||||
if conn := flow.ThisFunction.GetConnector(); conn != nil {
|
||||
return conn.GetConfig(), nil
|
||||
} else {
|
||||
return nil, errors.New("GetConnConf(): Connector is nil")
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,10 @@ type BaseFunction struct {
|
||||
Config *config.KisFuncConfig
|
||||
|
||||
// flow
|
||||
Flow kis.Flow //上下文环境KisFlow
|
||||
flow kis.Flow //上下文环境KisFlow
|
||||
|
||||
// connector
|
||||
connector kis.Connector
|
||||
|
||||
// link
|
||||
N kis.Function //下一个流计算Function
|
||||
@ -80,12 +83,28 @@ func (base *BaseFunction) SetFlow(f kis.Flow) error {
|
||||
if f == nil {
|
||||
return errors.New("KisFlow is nil")
|
||||
}
|
||||
base.Flow = f
|
||||
base.flow = f
|
||||
return nil
|
||||
}
|
||||
|
||||
func (base *BaseFunction) GetFlow() kis.Flow {
|
||||
return base.Flow
|
||||
return base.flow
|
||||
}
|
||||
|
||||
// AddConnector 给当前Function实例添加一个Connector
|
||||
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
|
||||
if conn == nil {
|
||||
return errors.New("conn is nil")
|
||||
}
|
||||
|
||||
base.connector = conn
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConnector 获取当前Function实例所关联的Connector
|
||||
func (base *BaseFunction) GetConnector() kis.Connector {
|
||||
return base.connector
|
||||
}
|
||||
|
||||
func (base *BaseFunction) CreateId() {
|
||||
@ -119,11 +138,12 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
|
||||
// 生成随机实例唯一ID
|
||||
f.CreateId()
|
||||
|
||||
//设置基础信息属性
|
||||
// 设置基础信息属性
|
||||
if err := f.SetConfig(config); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 设置Flow
|
||||
if err := f.SetFlow(flow); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
19
kis/connector.go
Normal file
19
kis/connector.go
Normal file
@ -0,0 +1,19 @@
|
||||
package kis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/config"
|
||||
)
|
||||
|
||||
type Connector interface {
|
||||
// Init 初始化Connector所关联的存储引擎链接等
|
||||
Init() error
|
||||
// Call 调用Connector 外挂存储逻辑的读写操作
|
||||
Call(ctx context.Context, flow Flow, args interface{}) error
|
||||
// GetId 获取Connector的ID
|
||||
GetId() string
|
||||
// GetName 获取Connector的名称
|
||||
GetName() string
|
||||
// GetConfig 获取Connector的配置信息
|
||||
GetConfig() *config.KisConnConfig
|
||||
}
|
@ -21,4 +21,8 @@ type Flow interface {
|
||||
GetThisFunction() Function
|
||||
// GetThisFuncConf 得到当前正在执行的Function的配置
|
||||
GetThisFuncConf() *config.KisFuncConfig
|
||||
// GetConnector 得到当前正在执行的Function的Connector
|
||||
GetConnector() (Connector, error)
|
||||
// GetConnConf 得到当前正在执行的Function的Connector的配置
|
||||
GetConnConf() (*config.KisConnConfig, error)
|
||||
}
|
||||
|
@ -21,6 +21,11 @@ type Function interface {
|
||||
// GetFlow 获取当前Functioin实力所依赖的Flow
|
||||
GetFlow() Flow
|
||||
|
||||
// AddConnector 给当前Function实例添加一个Connector
|
||||
AddConnector(conn Connector) error
|
||||
// GetConnector 获取当前Function实例所关联的Connector
|
||||
GetConnector() Connector
|
||||
|
||||
// CreateId 给当前Funciton实力生成一个随机的实例KisID
|
||||
CreateId()
|
||||
// GetId 获取当前Function的FID
|
||||
|
87
kis/pool.go
87
kis/pool.go
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"kis-flow/common"
|
||||
"kis-flow/log"
|
||||
"sync"
|
||||
)
|
||||
@ -17,6 +18,13 @@ type kisPool struct {
|
||||
|
||||
flowRouter flowRouter // 全部的flow对象
|
||||
flowLock sync.RWMutex // flowRouter 锁
|
||||
|
||||
cInitRouter connInitRouter // 全部的Connector初始化路由
|
||||
ciLock sync.RWMutex // cInitRouter 锁
|
||||
|
||||
cTree connTree //全部Connector管理路由
|
||||
connectors map[string]Connector // 全部的Connector对象
|
||||
cLock sync.RWMutex // cTree 锁
|
||||
}
|
||||
|
||||
// 单例
|
||||
@ -33,13 +41,18 @@ func Pool() *kisPool {
|
||||
|
||||
// flowRouter初始化
|
||||
_pool.flowRouter = make(flowRouter)
|
||||
|
||||
// connTree初始化
|
||||
_pool.cTree = make(connTree)
|
||||
_pool.cInitRouter = make(connInitRouter)
|
||||
_pool.connectors = make(map[string]Connector)
|
||||
})
|
||||
|
||||
return _pool
|
||||
}
|
||||
|
||||
func (pool *kisPool) AddFlow(name string, flow Flow) {
|
||||
pool.flowLock.Lock()
|
||||
pool.flowLock.Lock() // 写锁
|
||||
defer pool.flowLock.Unlock()
|
||||
|
||||
if _, ok := pool.flowRouter[name]; !ok {
|
||||
@ -53,7 +66,7 @@ func (pool *kisPool) AddFlow(name string, flow Flow) {
|
||||
}
|
||||
|
||||
func (pool *kisPool) GetFlow(name string) Flow {
|
||||
pool.flowLock.RLock()
|
||||
pool.flowLock.RLock() // 读锁
|
||||
defer pool.flowLock.RUnlock()
|
||||
|
||||
if flow, ok := pool.flowRouter[name]; ok {
|
||||
@ -65,7 +78,7 @@ func (pool *kisPool) GetFlow(name string) Flow {
|
||||
|
||||
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
|
||||
func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
||||
pool.fnLock.Lock()
|
||||
pool.fnLock.Lock() // 写锁
|
||||
defer pool.fnLock.Unlock()
|
||||
|
||||
if _, ok := pool.fnRouter[fnName]; !ok {
|
||||
@ -89,3 +102,71 @@ func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow)
|
||||
|
||||
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
|
||||
}
|
||||
|
||||
// CaaSInit 注册Connector初始化业务
|
||||
func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
|
||||
pool.ciLock.Lock() // 写锁
|
||||
defer pool.ciLock.Unlock()
|
||||
|
||||
if _, ok := pool.cInitRouter[cname]; !ok {
|
||||
pool.cInitRouter[cname] = c
|
||||
} else {
|
||||
errString := fmt.Sprintf("KisPool Reg CaaSInit Repeat CName=%s\n", cname)
|
||||
panic(errString)
|
||||
}
|
||||
|
||||
log.Logger().InfoF("Add KisPool CaaSInit CName=%s", cname)
|
||||
}
|
||||
|
||||
// CallConnInit 调度 ConnInit
|
||||
func (pool *kisPool) CallConnInit(conn Connector) error {
|
||||
pool.ciLock.RLock() // 读锁
|
||||
defer pool.ciLock.RUnlock()
|
||||
|
||||
init, ok := pool.cInitRouter[conn.GetName()]
|
||||
|
||||
if !ok {
|
||||
panic(errors.New(fmt.Sprintf("init connector cname = %s not reg..", conn.GetName())))
|
||||
}
|
||||
|
||||
return init(conn)
|
||||
}
|
||||
|
||||
// CaaS 注册Connector Call业务
|
||||
func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
|
||||
pool.cLock.Lock() // 写锁
|
||||
defer pool.cLock.Unlock()
|
||||
|
||||
if _, ok := pool.cTree[cname]; !ok {
|
||||
//cid 首次注册,不存在,创建二级树NsConnSL
|
||||
pool.cTree[cname] = make(connSL)
|
||||
|
||||
//初始化各类型FunctionMode
|
||||
pool.cTree[cname][common.S] = make(connFuncRouter)
|
||||
pool.cTree[cname][common.L] = make(connFuncRouter)
|
||||
}
|
||||
|
||||
if _, ok := pool.cTree[cname][mode][fname]; !ok {
|
||||
pool.cTree[cname][mode][fname] = c
|
||||
} else {
|
||||
errString := fmt.Sprintf("CaaS Repeat CName=%s, FName=%s, Mode =%s\n", cname, fname, mode)
|
||||
panic(errString)
|
||||
}
|
||||
|
||||
log.Logger().InfoF("Add KisPool CaaS CName=%s, FName=%s, Mode =%s", cname, fname, mode)
|
||||
}
|
||||
|
||||
// CallConnector 调度 Connector
|
||||
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) error {
|
||||
fn := flow.GetThisFunction()
|
||||
fnConf := fn.GetConfig()
|
||||
mode := common.KisMode(fnConf.FMode)
|
||||
|
||||
if callback, ok := pool.cTree[conn.GetName()][mode][fnConf.FName]; ok {
|
||||
return callback(ctx, conn, fn, flow, args)
|
||||
}
|
||||
|
||||
log.Logger().ErrorFX(ctx, "CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.\n", conn.GetName(), fnConf.FName, mode)
|
||||
|
||||
return errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
|
||||
}
|
||||
|
@ -1,7 +1,13 @@
|
||||
package kis
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/common"
|
||||
)
|
||||
|
||||
/*
|
||||
Function Call
|
||||
*/
|
||||
// FaaS Function as a Service
|
||||
type FaaS func(context.Context, Flow) error
|
||||
|
||||
@ -14,3 +20,34 @@ type funcRouter map[string]FaaS
|
||||
// key: Flow Name
|
||||
// value: Flow
|
||||
type flowRouter map[string]Flow
|
||||
|
||||
/*
|
||||
Connector Init
|
||||
*/
|
||||
// ConnInit Connector 第三方挂载存储初始化
|
||||
type ConnInit func(conn Connector) error
|
||||
|
||||
// connInitRouter
|
||||
//key:
|
||||
type connInitRouter map[string]ConnInit
|
||||
|
||||
/*
|
||||
Connector Call
|
||||
*/
|
||||
// CaaS Connector的存储读取业务实现
|
||||
type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
|
||||
|
||||
// connFuncRouter 通过FunctionName索引到CaaS回调存储业务的映射关系
|
||||
// key: Function Name
|
||||
// value: Connector的存储读取业务实现
|
||||
type connFuncRouter map[string]CaaS
|
||||
|
||||
// connSL 通过KisMode 将connFuncRouter分为两个子树
|
||||
// key: Function KisMode S/L
|
||||
// value: NsConnRouter
|
||||
type connSL map[common.KisMode]connFuncRouter
|
||||
|
||||
// connTree
|
||||
// key: Connector Name
|
||||
// value: connSL 二级树
|
||||
type connTree map[string]connSL
|
||||
|
18
test/caas/caas_demo1.go
Normal file
18
test/caas/caas_demo1.go
Normal file
@ -0,0 +1,18 @@
|
||||
package caas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
)
|
||||
|
||||
// type CaaS func(context.Context, Connector, Function, Flow, interface{}) error
|
||||
|
||||
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
|
||||
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
|
||||
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)
|
||||
|
||||
fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)
|
||||
|
||||
return nil
|
||||
}
|
20
test/caas/caas_init1.go
Normal file
20
test/caas/caas_init1.go
Normal file
@ -0,0 +1,20 @@
|
||||
package caas
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
)
|
||||
|
||||
// type ConnInit func(conn Connector) error
|
||||
|
||||
func InitConnDemo1(connector kis.Connector) error {
|
||||
fmt.Println("===> Call Connector InitDemo1")
|
||||
//config info
|
||||
connConf := connector.GetConfig()
|
||||
|
||||
fmt.Println(connConf)
|
||||
|
||||
// init connector , 如 初始化数据库链接等
|
||||
|
||||
return nil
|
||||
}
|
27
test/faas/faas_demo1.go
Normal file
27
test/faas/faas_demo1.go
Normal file
@ -0,0 +1,27 @@
|
||||
package faas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
)
|
||||
|
||||
// type FaaS func(context.Context, Flow) error
|
||||
|
||||
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
|
||||
fmt.Println("---> Call funcName1Handler ----")
|
||||
|
||||
for index, row := range flow.Input() {
|
||||
// 打印数据
|
||||
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
|
||||
fmt.Println(str)
|
||||
|
||||
// 计算结果数据
|
||||
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
|
||||
|
||||
// 提交结果数据
|
||||
_ = flow.CommitRow(resultStr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
38
test/faas/faas_demo2.go
Normal file
38
test/faas/faas_demo2.go
Normal file
@ -0,0 +1,38 @@
|
||||
package faas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/log"
|
||||
)
|
||||
|
||||
// type FaaS func(context.Context, Flow) error
|
||||
|
||||
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
|
||||
fmt.Println("---> Call funcName2Handler ----")
|
||||
|
||||
for index, row := range flow.Input() {
|
||||
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
|
||||
fmt.Println(str)
|
||||
|
||||
conn, err := flow.GetConnector()
|
||||
if err != nil {
|
||||
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
if conn.Call(ctx, flow, row) != nil {
|
||||
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// 计算结果数据
|
||||
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
|
||||
|
||||
// 提交结果数据
|
||||
_ = flow.CommitRow(resultStr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
20
test/faas/faas_demo3.go
Normal file
20
test/faas/faas_demo3.go
Normal file
@ -0,0 +1,20 @@
|
||||
package faas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
)
|
||||
|
||||
// type FaaS func(context.Context, Flow) error
|
||||
|
||||
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
|
||||
fmt.Println("---> Call funcName3Handler ----")
|
||||
|
||||
for _, row := range flow.Input() {
|
||||
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
|
||||
fmt.Println(str)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
92
test/kis_connector_test.go
Normal file
92
test/kis_connector_test.go
Normal file
@ -0,0 +1,92 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/flow"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/test/caas"
|
||||
"kis-flow/test/faas"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNewKisConnector(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// 0. 注册Function 回调业务
|
||||
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
|
||||
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
|
||||
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
|
||||
|
||||
// 0. 注册ConnectorInit 和 Connector 回调业务
|
||||
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
|
||||
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
|
||||
|
||||
// 1. 创建3个KisFunction配置实例, 其中myFuncConfig2 有Connector配置
|
||||
source1 := config.KisSource{
|
||||
Name: "公众号抖音商城户订单数据",
|
||||
Must: []string{"order_id", "user_id"},
|
||||
}
|
||||
|
||||
source2 := config.KisSource{
|
||||
Name: "用户订单错误率",
|
||||
Must: []string{"order_id", "user_id"},
|
||||
}
|
||||
|
||||
myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil)
|
||||
if myFuncConfig1 == nil {
|
||||
panic("myFuncConfig1 is nil")
|
||||
}
|
||||
|
||||
option := config.KisFuncOption{
|
||||
CName: "ConnName1",
|
||||
}
|
||||
|
||||
myFuncConfig2 := config.NewFuncConfig("funcName2", common.S, &source2, &option)
|
||||
if myFuncConfig2 == nil {
|
||||
panic("myFuncConfig2 is nil")
|
||||
}
|
||||
|
||||
myFuncConfig3 := config.NewFuncConfig("funcName3", common.E, &source2, nil)
|
||||
if myFuncConfig3 == nil {
|
||||
panic("myFuncConfig3 is nil")
|
||||
}
|
||||
|
||||
// 2. 创建一个KisConnector配置实例
|
||||
myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
|
||||
if myConnConfig1 == nil {
|
||||
panic("myConnConfig1 is nil")
|
||||
}
|
||||
|
||||
// 3. 将KisConnector配置实例绑定到KisFunction配置实例上
|
||||
_ = myFuncConfig2.AddConnConfig(myConnConfig1)
|
||||
|
||||
// 4. 创建一个 KisFlow 配置实例
|
||||
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
|
||||
|
||||
// 5. 创建一个KisFlow对象
|
||||
flow1 := flow.NewKisFlow(myFlowConfig1)
|
||||
|
||||
// 6. 拼接Functioin 到 Flow 上
|
||||
if err := flow1.Link(myFuncConfig1, nil); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := flow1.Link(myFuncConfig2, nil); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := flow1.Link(myFuncConfig3, nil); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 7. 提交原始数据
|
||||
_ = flow1.CommitRow("This is Data1 from Test")
|
||||
_ = flow1.CommitRow("This is Data2 from Test")
|
||||
_ = flow1.CommitRow("This is Data3 from Test")
|
||||
|
||||
// 8. 执行flow1
|
||||
if err := flow1.Run(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
@ -2,49 +2,21 @@ package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/flow"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/test/faas"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func funcName1Handler(ctx context.Context, flow kis.Flow) error {
|
||||
fmt.Println("---> Call funcName1Handler ----")
|
||||
|
||||
for index, row := range flow.Input() {
|
||||
// 打印数据
|
||||
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
|
||||
fmt.Println(str)
|
||||
|
||||
// 计算结果数据
|
||||
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
|
||||
|
||||
// 提交结果数据
|
||||
_ = flow.CommitRow(resultStr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func funcName2Handler(ctx context.Context, flow kis.Flow) error {
|
||||
|
||||
for _, row := range flow.Input() {
|
||||
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
|
||||
fmt.Println(str)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestNewKisPool(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// 0. 注册Function
|
||||
kis.Pool().FaaS("funcName1", funcName1Handler)
|
||||
kis.Pool().FaaS("funcName2", funcName2Handler)
|
||||
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
|
||||
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
|
||||
|
||||
// 1. 创建2个KisFunction配置实例
|
||||
source1 := config.KisSource{
|
||||
|
Loading…
Reference in New Issue
Block a user