mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-23 07:30:23 +08:00
Add function comments for faasDesc and adjust some code structure.
This commit is contained in:
parent
954e1a4c93
commit
59def93350
185
kis/faas.go
185
kis/faas.go
@ -5,91 +5,150 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// FaaS Function as a Service
|
||||
// type FaaS func(context.Context, *kisflow, ...interface{}) error
|
||||
// 这是一个方法类型,会在注入时在方法内判断
|
||||
|
||||
// 将
|
||||
// type FaaS func(context.Context, Flow) error
|
||||
// 改为
|
||||
// type FaaS func(context.Context, Flow, ...interface{}) error
|
||||
// 可以通过可变参数的任意输入类型进行数据传递
|
||||
type FaaS interface{}
|
||||
|
||||
// FaaSDesc FaaS 回调计算业务函数 描述
|
||||
type FaaSDesc struct {
|
||||
FnName string
|
||||
f interface{}
|
||||
fName string
|
||||
ArgsType []reflect.Type
|
||||
ArgNum int
|
||||
FuncType reflect.Type
|
||||
FuncValue reflect.Value
|
||||
FaasSerialize
|
||||
Serialize // 当前Function的数据输入输出序列化实现
|
||||
FnName string // Function名称
|
||||
f interface{} // FaaS 函数
|
||||
fName string // 函数名称
|
||||
ArgsType []reflect.Type // 函数参数类型(集合)
|
||||
ArgNum int // 函数参数个数
|
||||
FuncType reflect.Type // 函数类型
|
||||
FuncValue reflect.Value // 函数值(函数地址)
|
||||
}
|
||||
|
||||
var globalFaaSSerialize = &DefaultFaasSerialize{}
|
||||
|
||||
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
|
||||
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
|
||||
|
||||
var serializeImpl FaasSerialize
|
||||
// 输入输出序列化实例
|
||||
var serializeImpl Serialize
|
||||
|
||||
// 传入的回调函数FaaS,函数值(函数地址)
|
||||
funcValue := reflect.ValueOf(f)
|
||||
|
||||
// 传入的回调函数FaaS 类型
|
||||
funcType := funcValue.Type()
|
||||
|
||||
if err := validateFuncType(funcType, funcValue); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
argsType := make([]reflect.Type, funcType.NumIn())
|
||||
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
|
||||
containsKisFlow := false
|
||||
containsCtx := false
|
||||
|
||||
for i := 0; i < funcType.NumIn(); i++ {
|
||||
paramType := funcType.In(i)
|
||||
if isFlowType(paramType) {
|
||||
containsKisFlow = true
|
||||
} else if isContextType(paramType) {
|
||||
containsCtx = true
|
||||
} else {
|
||||
itemType := paramType.Elem()
|
||||
// 如果切片元素是指针类型,则获取指针所指向的类型
|
||||
if itemType.Kind() == reflect.Ptr {
|
||||
itemType = itemType.Elem()
|
||||
}
|
||||
// Check if f implements FaasSerialize interface
|
||||
if isFaasSerialize(itemType) {
|
||||
serializeImpl = reflect.New(itemType).Interface().(FaasSerialize)
|
||||
} else {
|
||||
serializeImpl = globalFaaSSerialize // Use global default implementation
|
||||
}
|
||||
|
||||
}
|
||||
argsType[i] = paramType
|
||||
}
|
||||
|
||||
if !containsKisFlow {
|
||||
return nil, errors.New("function parameters must have Kisflow context")
|
||||
}
|
||||
if !containsCtx {
|
||||
return nil, errors.New("function parameters must have context")
|
||||
// 判断传递的FaaS指针是否是函数类型
|
||||
if !isFuncType(funcType) {
|
||||
return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
|
||||
}
|
||||
|
||||
// 判断传递的FaaS函数是否有返回值类型是只包括(error)
|
||||
if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
|
||||
return nil, errors.New("function must have exactly one return value of type error")
|
||||
}
|
||||
|
||||
// FaaS函数的参数类型集合
|
||||
argsType := make([]reflect.Type, funcType.NumIn())
|
||||
|
||||
// 获取FaaS的函数名称
|
||||
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
|
||||
|
||||
// 确保 FaaS func(context.Context, Flow, ...interface{}) error 形参列表,存在context.Context 和 kis.Flow
|
||||
|
||||
// 是否包含kis.Flow类型的形参
|
||||
containsKisFlow := false
|
||||
// 是否包含context.Context类型的形参
|
||||
containsCtx := false
|
||||
|
||||
// 遍历FaaS的形参类型
|
||||
for i := 0; i < funcType.NumIn(); i++ {
|
||||
|
||||
// 取出第i个形式参数类型
|
||||
paramType := funcType.In(i)
|
||||
|
||||
if isFlowType(paramType) {
|
||||
// 判断是否包含kis.Flow类型的形参
|
||||
containsKisFlow = true
|
||||
|
||||
} else if isContextType(paramType) {
|
||||
// 判断是否包含context.Context类型的形参
|
||||
containsCtx = true
|
||||
|
||||
} else if isSliceType(paramType) {
|
||||
|
||||
// 获取当前参数Slice的元素类型
|
||||
itemType := paramType.Elem()
|
||||
|
||||
// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
|
||||
if itemType.Kind() == reflect.Ptr {
|
||||
itemType = itemType.Elem() // 获取指针指向的结构体类型
|
||||
}
|
||||
|
||||
// Check if f implements Serialize interface
|
||||
// (检测传递的FaaS函数是否实现了Serialize接口)
|
||||
if isSerialize(itemType) {
|
||||
// 如果当前形参实现了Serialize接口,则使用当前形参的序列化实现
|
||||
serializeImpl = reflect.New(itemType).Interface().(Serialize)
|
||||
|
||||
} else {
|
||||
// 如果当前形参没有实现Serialize接口,则使用默认的序列化实现
|
||||
serializeImpl = defaultSerialize // Use global default implementation
|
||||
}
|
||||
} else {
|
||||
// Other types are not supported
|
||||
}
|
||||
|
||||
// 将当前形参类型追加到argsType集合中
|
||||
argsType[i] = paramType
|
||||
}
|
||||
|
||||
if !containsKisFlow {
|
||||
// 不包含kis.Flow类型的形参,返回错误
|
||||
return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
|
||||
}
|
||||
|
||||
if !containsCtx {
|
||||
// 不包含context.Context类型的形参,返回错误
|
||||
return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
|
||||
}
|
||||
|
||||
// 返回FaaSDesc描述实例
|
||||
return &FaaSDesc{
|
||||
FnName: fnName,
|
||||
f: f,
|
||||
fName: fullName,
|
||||
ArgsType: argsType,
|
||||
ArgNum: len(argsType),
|
||||
FuncType: funcType,
|
||||
FuncValue: funcValue,
|
||||
FaasSerialize: serializeImpl,
|
||||
Serialize: serializeImpl,
|
||||
FnName: fnName,
|
||||
f: f,
|
||||
fName: fullName,
|
||||
ArgsType: argsType,
|
||||
ArgNum: len(argsType),
|
||||
FuncType: funcType,
|
||||
FuncValue: funcValue,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func validateFuncType(funcType reflect.Type, funcValue reflect.Value) error {
|
||||
if funcType.Kind() != reflect.Func {
|
||||
return fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
|
||||
}
|
||||
return nil
|
||||
// isFuncType 判断传递进来的 paramType 是否是函数类型
|
||||
func isFuncType(paramType reflect.Type) bool {
|
||||
return paramType.Kind() == reflect.Func
|
||||
}
|
||||
|
||||
// isFlowType 判断传递进来的 paramType 是否是 kis.Flow 类型
|
||||
func isFlowType(paramType reflect.Type) bool {
|
||||
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
|
||||
|
||||
return paramType.Implements(flowInterfaceType)
|
||||
}
|
||||
|
||||
// isContextType 判断传递进来的 paramType 是否是 context.Context 类型
|
||||
func isContextType(paramType reflect.Type) bool {
|
||||
typeName := paramType.Name()
|
||||
|
||||
return strings.Contains(typeName, "Context")
|
||||
}
|
||||
|
||||
// isSliceType 判断传递进来的 paramType 是否是切片类型
|
||||
func isSliceType(paramType reflect.Type) bool {
|
||||
return paramType.Kind() == reflect.Slice
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -50,9 +49,3 @@ type Flow interface {
|
||||
// Fork 得到Flow的一个副本(深拷贝)
|
||||
Fork(ctx context.Context) Flow
|
||||
}
|
||||
|
||||
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
|
||||
|
||||
func isFlowType(paramType reflect.Type) bool {
|
||||
return paramType.Implements(flowInterfaceType)
|
||||
}
|
||||
|
28
kis/pool.go
28
kis/pool.go
@ -78,6 +78,7 @@ func (pool *kisPool) GetFlow(name string) Flow {
|
||||
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
|
||||
func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
||||
|
||||
// 当注册FaaS计算逻辑回调时,创建一个FaaSDesc描述对象
|
||||
faaSDesc, err := NewFaaSDesc(fnName, f)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@ -87,6 +88,7 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
||||
defer pool.fnLock.Unlock()
|
||||
|
||||
if _, ok := pool.fnRouter[fnName]; !ok {
|
||||
// 将FaaSDesc描述对象注册到fnRouter中
|
||||
pool.fnRouter[fnName] = faaSDesc
|
||||
} else {
|
||||
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
|
||||
@ -98,38 +100,58 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {
|
||||
|
||||
// CallFunction 调度 Function
|
||||
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
|
||||
|
||||
if funcDesc, ok := pool.fnRouter[fnName]; ok {
|
||||
|
||||
// 被调度Function的形参列表
|
||||
params := make([]reflect.Value, 0, funcDesc.ArgNum)
|
||||
|
||||
for _, argType := range funcDesc.ArgsType {
|
||||
|
||||
// 如果是Flow类型形参,则将 flow的值传入
|
||||
if isFlowType(argType) {
|
||||
params = append(params, reflect.ValueOf(flow))
|
||||
continue
|
||||
}
|
||||
|
||||
// 如果是Context类型形参,则将 ctx的值传入
|
||||
if isContextType(argType) {
|
||||
params = append(params, reflect.ValueOf(ctx))
|
||||
continue
|
||||
}
|
||||
if argType.Kind() == reflect.Slice {
|
||||
value, err := funcDesc.FaasSerialize.DecodeParam(flow.Input(), argType)
|
||||
|
||||
// 如果是Slice类型形参,则将 flow.Input()的值传入
|
||||
if isSliceType(argType) {
|
||||
|
||||
// 将flow.Input()中的原始数据,反序列化为argType类型的数据
|
||||
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
|
||||
if err != nil {
|
||||
log.Logger().ErrorFX(ctx, "funcDesc.FaasSerialize.DecodeParam err=%v", err)
|
||||
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
|
||||
} else {
|
||||
params = append(params, value)
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
|
||||
params = append(params, reflect.Zero(argType))
|
||||
}
|
||||
|
||||
// 调用当前Function 的计算逻辑
|
||||
retValues := funcDesc.FuncValue.Call(params)
|
||||
|
||||
// 取出第一个返回值,如果是nil,则返回nil
|
||||
ret := retValues[0].Interface()
|
||||
if ret == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 如果返回值是error类型,则返回error
|
||||
return retValues[0].Interface().(error)
|
||||
|
||||
}
|
||||
|
||||
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
|
||||
|
||||
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"kis-flow/common"
|
||||
)
|
||||
|
||||
// FaaS 定义移植到 faas.go 中
|
||||
|
||||
/*
|
||||
Function Call
|
||||
*/
|
||||
|
@ -2,16 +2,22 @@ package kis
|
||||
|
||||
import (
|
||||
"kis-flow/common"
|
||||
"kis-flow/serialize"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type FaasSerialize interface {
|
||||
DecodeParam(common.KisRowArr, reflect.Type) (reflect.Value, error)
|
||||
EncodeParam(interface{}) (common.KisRowArr, error)
|
||||
// Serialize 数据序列化接口
|
||||
type Serialize interface {
|
||||
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
|
||||
UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
|
||||
// Marshal 用于将指定类型的值序列化为 KisRowArr。
|
||||
Marshal(interface{}) (common.KisRowArr, error)
|
||||
}
|
||||
|
||||
var serializeInterfaceType = reflect.TypeOf((*FaasSerialize)(nil)).Elem()
|
||||
// defaultSerialize KisFlow提供的默认序列化实现(开发者可以自定义)
|
||||
var defaultSerialize = &serialize.DefaultSerialize{}
|
||||
|
||||
func isFaasSerialize(paramType reflect.Type) bool {
|
||||
return paramType.Implements(serializeInterfaceType)
|
||||
// isSerialize 判断传递进来的 paramType 是否实现了 Serialize 接口
|
||||
func isSerialize(paramType reflect.Type) bool {
|
||||
return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
|
||||
}
|
||||
|
11
kis/utils.go
11
kis/utils.go
@ -1,11 +0,0 @@
|
||||
package kis
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func isContextType(paramType reflect.Type) bool {
|
||||
typeName := paramType.Name()
|
||||
return strings.Contains(typeName, "Context")
|
||||
}
|
@ -1,4 +1,8 @@
|
||||
package kis
|
||||
/*
|
||||
DefaultSerialize 实现了 Serialize 接口,用于将 KisRowArr 序列化为指定类型的值,或将指定类型的值序列化为 KisRowArr。
|
||||
这部分是KisFlow默认提供的序列化办法,默认均是josn序列化,开发者可以根据自己的需求实现自己的序列化办法。
|
||||
*/
|
||||
package serialize
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -7,42 +11,50 @@ import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type DefaultFaasSerialize struct {
|
||||
}
|
||||
type DefaultSerialize struct{}
|
||||
|
||||
// DecodeParam 用于将 KisRowArr 反序列化为指定类型的值。
|
||||
func (f DefaultFaasSerialize) DecodeParam(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
|
||||
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
|
||||
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
|
||||
// 确保传入的类型是一个切片
|
||||
if r.Kind() != reflect.Slice {
|
||||
return reflect.Value{}, fmt.Errorf("r must be a slice")
|
||||
}
|
||||
|
||||
slice := reflect.MakeSlice(r, 0, len(arr))
|
||||
|
||||
// 遍历每个元素并尝试反序列化
|
||||
for _, row := range arr {
|
||||
var elem reflect.Value
|
||||
var err error
|
||||
|
||||
// 先尝试断言为结构体或指针
|
||||
elem, err = decodeStruct(row, r.Elem())
|
||||
if err != nil {
|
||||
// 如果失败,则尝试直接反序列化字符串
|
||||
elem, err = decodeString(row, r.Elem())
|
||||
if err != nil {
|
||||
// 如果还失败,则尝试先序列化为 JSON 再反序列化
|
||||
elem, err = decodeJSON(row, r.Elem())
|
||||
if err != nil {
|
||||
return reflect.Value{}, fmt.Errorf("failed to decode row: %v ", err)
|
||||
}
|
||||
}
|
||||
// 尝试断言为结构体或指针
|
||||
elem, err = unMarshalStruct(row, r.Elem())
|
||||
if err == nil {
|
||||
slice = reflect.Append(slice, elem)
|
||||
continue
|
||||
}
|
||||
|
||||
slice = reflect.Append(slice, elem)
|
||||
// 尝试直接反序列化字符串
|
||||
elem, err = unMarshalJsonString(row, r.Elem())
|
||||
if err == nil {
|
||||
slice = reflect.Append(slice, elem)
|
||||
continue
|
||||
}
|
||||
|
||||
// 尝试先序列化为 JSON 再反序列化
|
||||
elem, err = unMarshalJsonStruct(row, r.Elem())
|
||||
if err == nil {
|
||||
slice = reflect.Append(slice, elem)
|
||||
} else {
|
||||
return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return slice, nil
|
||||
}
|
||||
|
||||
// 尝试断言为结构体或指针
|
||||
func decodeStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
|
||||
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
|
||||
// 检查 row 是否为结构体或结构体指针类型
|
||||
rowType := reflect.TypeOf(row)
|
||||
if rowType == nil {
|
||||
@ -54,14 +66,19 @@ func decodeStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, erro
|
||||
|
||||
// 如果 row 是指针类型,则获取它指向的类型
|
||||
if rowType.Kind() == reflect.Ptr {
|
||||
// 空指针
|
||||
if reflect.ValueOf(row).IsNil() {
|
||||
return reflect.Value{}, fmt.Errorf("row is nil pointer")
|
||||
}
|
||||
row = reflect.ValueOf(row).Elem().Interface() // 解引用
|
||||
|
||||
// 解引用
|
||||
row = reflect.ValueOf(row).Elem().Interface()
|
||||
|
||||
// 拿到解引用后的类型
|
||||
rowType = reflect.TypeOf(row)
|
||||
}
|
||||
|
||||
// 检查是否可以将 row 断言为 elemType
|
||||
// 检查是否可以将 row 断言为 elemType(目标类型)
|
||||
if !rowType.AssignableTo(elemType) {
|
||||
return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
|
||||
}
|
||||
@ -70,17 +87,18 @@ func decodeStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, erro
|
||||
return reflect.ValueOf(row), nil
|
||||
}
|
||||
|
||||
// 尝试直接反序列化字符串
|
||||
func decodeString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
|
||||
// 尝试直接反序列化字符串(将Json字符串 反序列化为 结构体)
|
||||
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
|
||||
// 判断源数据是否可以断言成string
|
||||
str, ok := row.(string)
|
||||
if !ok {
|
||||
return reflect.Value{}, fmt.Errorf("not a string")
|
||||
}
|
||||
|
||||
// 创建一个新的结构体实例,用于存储反序列化后的值。
|
||||
// 创建一个新的结构体实例,用于存储反序列化后的值
|
||||
elem := reflect.New(elemType).Elem()
|
||||
|
||||
// 尝试将字符串反序列化为结构体。
|
||||
// 尝试将json字符串反序列化为结构体。
|
||||
if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
|
||||
return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
|
||||
}
|
||||
@ -88,14 +106,18 @@ func decodeString(row common.KisRow, elemType reflect.Type) (reflect.Value, erro
|
||||
return elem, nil
|
||||
}
|
||||
|
||||
// 尝试先序列化为 JSON 再反序列化
|
||||
func decodeJSON(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
|
||||
// 尝试先序列化为 JSON 再反序列化(将结构体转换成Json字符串,再将Json字符串 反序列化为 结构体)
|
||||
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
|
||||
// 将 row 序列化为 JSON 字符串
|
||||
jsonBytes, err := json.Marshal(row)
|
||||
if err != nil {
|
||||
return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v ", err)
|
||||
}
|
||||
|
||||
// 创建一个新的结构体实例,用于存储反序列化后的值
|
||||
elem := reflect.New(elemType).Interface()
|
||||
|
||||
// 将 JSON 字符串反序列化为结构体
|
||||
if err := json.Unmarshal(jsonBytes, elem); err != nil {
|
||||
return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v ", err)
|
||||
}
|
||||
@ -103,7 +125,8 @@ func decodeJSON(row common.KisRow, elemType reflect.Type) (reflect.Value, error)
|
||||
return reflect.ValueOf(elem).Elem(), nil
|
||||
}
|
||||
|
||||
func (f DefaultFaasSerialize) EncodeParam(i interface{}) (common.KisRowArr, error) {
|
||||
// Marshal 用于将指定类型的值序列化为 KisRowArr(json 序列化)。
|
||||
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
|
||||
var arr common.KisRowArr
|
||||
|
||||
switch reflect.TypeOf(i).Kind() {
|
@ -3,17 +3,21 @@ package faas
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/serialize"
|
||||
"kis-flow/test/proto"
|
||||
)
|
||||
|
||||
type AvgStuScoreIn struct {
|
||||
serialize.DefaultSerialize
|
||||
proto.StuScores
|
||||
}
|
||||
|
||||
type AvgStuScoreOut struct {
|
||||
serialize.DefaultSerialize
|
||||
proto.StuAvgScore
|
||||
}
|
||||
|
||||
// AvgStuScore(FaaS) 计算学生平均分
|
||||
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
|
||||
for _, row := range rows {
|
||||
avgScore := proto.StuAvgScore{
|
||||
|
@ -4,14 +4,17 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/serialize"
|
||||
"kis-flow/test/proto"
|
||||
)
|
||||
|
||||
type PrintStuAvgScoreIn struct {
|
||||
serialize.DefaultSerialize
|
||||
proto.StuAvgScore
|
||||
}
|
||||
|
||||
type PrintStuAvgScoreOut struct {
|
||||
serialize.DefaultSerialize
|
||||
}
|
||||
|
||||
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/file"
|
||||
"kis-flow/flow"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/test/faas"
|
||||
@ -11,6 +12,50 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAutoInjectParamWithConfig(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
|
||||
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
|
||||
|
||||
// 1. 加载配置文件并构建Flow
|
||||
if err := file.ConfigImportYaml("load_conf/"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 2. 获取Flow
|
||||
flow1 := kis.Pool().GetFlow("StuAvg")
|
||||
if flow1 == nil {
|
||||
panic("flow1 is nil")
|
||||
}
|
||||
|
||||
// 3. 提交原始数据
|
||||
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
|
||||
StuScores: proto.StuScores{
|
||||
StuId: 100,
|
||||
Score1: 1,
|
||||
Score2: 2,
|
||||
Score3: 3,
|
||||
},
|
||||
})
|
||||
_ = flow1.CommitRow(faas.AvgStuScoreIn{
|
||||
StuScores: proto.StuScores{
|
||||
StuId: 100,
|
||||
Score1: 1,
|
||||
Score2: 2,
|
||||
Score3: 3,
|
||||
},
|
||||
})
|
||||
|
||||
// 提交原始数据(json字符串)
|
||||
_ = flow1.CommitRow(`{"stu_id":101}`)
|
||||
|
||||
// 4. 执行flow1
|
||||
if err := flow1.Run(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutoInjectParam(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
@ -46,7 +91,7 @@ func TestAutoInjectParam(t *testing.T) {
|
||||
|
||||
// 3. 提交原始数据
|
||||
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
|
||||
proto.StuScores{
|
||||
StuScores: proto.StuScores{
|
||||
StuId: 100,
|
||||
Score1: 1,
|
||||
Score2: 2,
|
||||
@ -55,7 +100,7 @@ func TestAutoInjectParam(t *testing.T) {
|
||||
})
|
||||
_ = flow1.CommitRow(`{"stu_id":101}`)
|
||||
_ = flow1.CommitRow(faas.AvgStuScoreIn{
|
||||
proto.StuScores{
|
||||
StuScores: proto.StuScores{
|
||||
StuId: 100,
|
||||
Score1: 1,
|
||||
Score2: 2,
|
||||
|
6
test/load_conf/flow/flow-StuAvg.yml
Normal file
6
test/load_conf/flow/flow-StuAvg.yml
Normal file
@ -0,0 +1,6 @@
|
||||
kistype: flow
|
||||
status: 1
|
||||
flow_name: StuAvg
|
||||
flows:
|
||||
- fname: AvgStuScore
|
||||
- fname: PrintStuAvgScore
|
7
test/load_conf/func/func-AvgStuScore.yml
Normal file
7
test/load_conf/func/func-AvgStuScore.yml
Normal file
@ -0,0 +1,7 @@
|
||||
kistype: func
|
||||
fname: AvgStuScore
|
||||
fmode: Calculate
|
||||
source:
|
||||
name: 学生平均分
|
||||
must:
|
||||
- stu_id
|
7
test/load_conf/func/func-PrintStuAvgScore.yml
Normal file
7
test/load_conf/func/func-PrintStuAvgScore.yml
Normal file
@ -0,0 +1,7 @@
|
||||
kistype: func
|
||||
fname: PrintStuAvgScore
|
||||
fmode: Expand
|
||||
source:
|
||||
name: 学生平均分
|
||||
must:
|
||||
- stu_id
|
Loading…
Reference in New Issue
Block a user