Merge pull request #9 from aceld/feature/aceld

Add function comments for faasDesc and adjust some code structure.
This commit is contained in:
huchenhao 2024-03-25 16:42:40 +08:00 committed by GitHub
commit 64f0868fea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 286 additions and 120 deletions

View File

@ -5,77 +5,120 @@ import (
"fmt"
"reflect"
"runtime"
"strings"
)
// FaaS Function as a Service
// type FaaS func(context.Context, *kisflow, ...interface{}) error
// 这是一个方法类型,会在注入时在方法内判断
// 将
// type FaaS func(context.Context, Flow) error
// 改为
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通过可变参数的任意输入类型进行数据传递
type FaaS interface{}
// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
FnName string
f interface{}
fName string
ArgsType []reflect.Type
ArgNum int
FuncType reflect.Type
FuncValue reflect.Value
FaasSerialize
Serialize // 当前Function的数据输入输出序列化实现
FnName string // Function名称
f interface{} // FaaS 函数
fName string // 函数名称
ArgsType []reflect.Type // 函数参数类型(集合)
ArgNum int // 函数参数个数
FuncType reflect.Type // 函数类型
FuncValue reflect.Value // 函数值(函数地址)
}
var globalFaaSSerialize = &DefaultFaasSerialize{}
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
var serializeImpl FaasSerialize
// 输入输出序列化实例
var serializeImpl Serialize
// 传入的回调函数FaaS,函数值(函数地址)
funcValue := reflect.ValueOf(f)
// 传入的回调函数FaaS 类型
funcType := funcValue.Type()
if err := validateFuncType(funcType, funcValue); err != nil {
return nil, err
}
argsType := make([]reflect.Type, funcType.NumIn())
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
containsKisFlow := false
containsCtx := false
for i := 0; i < funcType.NumIn(); i++ {
paramType := funcType.In(i)
if isFlowType(paramType) {
containsKisFlow = true
} else if isContextType(paramType) {
containsCtx = true
} else {
itemType := paramType.Elem()
// 如果切片元素是指针类型,则获取指针所指向的类型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem()
}
// Check if f implements FaasSerialize interface
if isFaasSerialize(itemType) {
serializeImpl = reflect.New(itemType).Interface().(FaasSerialize)
} else {
serializeImpl = globalFaaSSerialize // Use global default implementation
}
}
argsType[i] = paramType
}
if !containsKisFlow {
return nil, errors.New("function parameters must have Kisflow context")
}
if !containsCtx {
return nil, errors.New("function parameters must have context")
// 判断传递的FaaS指针是否是函数类型
if !isFuncType(funcType) {
return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
}
// 判断传递的FaaS函数是否有返回值类型是只包括(error)
if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
return nil, errors.New("function must have exactly one return value of type error")
}
// FaaS函数的参数类型集合
argsType := make([]reflect.Type, funcType.NumIn())
// 获取FaaS的函数名称
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
// 确保 FaaS func(context.Context, Flow, ...interface{}) error 形参列表存在context.Context 和 kis.Flow
// 是否包含kis.Flow类型的形参
containsKisFlow := false
// 是否包含context.Context类型的形参
containsCtx := false
// 遍历FaaS的形参类型
for i := 0; i < funcType.NumIn(); i++ {
// 取出第i个形式参数类型
paramType := funcType.In(i)
if isFlowType(paramType) {
// 判断是否包含kis.Flow类型的形参
containsKisFlow = true
} else if isContextType(paramType) {
// 判断是否包含context.Context类型的形参
containsCtx = true
} else if isSliceType(paramType) {
// 获取当前参数Slice的元素类型
itemType := paramType.Elem()
// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 获取指针指向的结构体类型
}
// Check if f implements Serialize interface
// (检测传递的FaaS函数是否实现了Serialize接口)
if isSerialize(itemType) {
// 如果当前形参实现了Serialize接口则使用当前形参的序列化实现
serializeImpl = reflect.New(itemType).Interface().(Serialize)
} else {
// 如果当前形参没有实现Serialize接口则使用默认的序列化实现
serializeImpl = defaultSerialize // Use global default implementation
}
} else {
// Other types are not supported
}
// 将当前形参类型追加到argsType集合中
argsType[i] = paramType
}
if !containsKisFlow {
// 不包含kis.Flow类型的形参返回错误
return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
if !containsCtx {
// 不包含context.Context类型的形参返回错误
return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
// 返回FaaSDesc描述实例
return &FaaSDesc{
Serialize: serializeImpl,
FnName: fnName,
f: f,
fName: fullName,
@ -83,13 +126,29 @@ func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
ArgNum: len(argsType),
FuncType: funcType,
FuncValue: funcValue,
FaasSerialize: serializeImpl,
}, nil
}
func validateFuncType(funcType reflect.Type, funcValue reflect.Value) error {
if funcType.Kind() != reflect.Func {
return fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
// isFuncType 判断传递进来的 paramType 是否是函数类型
func isFuncType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Func
}
return nil
// isFlowType 判断传递进来的 paramType 是否是 kis.Flow 类型
func isFlowType(paramType reflect.Type) bool {
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
return paramType.Implements(flowInterfaceType)
}
// isContextType 判断传递进来的 paramType 是否是 context.Context 类型
func isContextType(paramType reflect.Type) bool {
typeName := paramType.Name()
return strings.Contains(typeName, "Context")
}
// isSliceType 判断传递进来的 paramType 是否是切片类型
func isSliceType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Slice
}

View File

@ -4,7 +4,6 @@ import (
"context"
"kis-flow/common"
"kis-flow/config"
"reflect"
"time"
)
@ -50,9 +49,3 @@ type Flow interface {
// Fork 得到Flow的一个副本(深拷贝)
Fork(ctx context.Context) Flow
}
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
func isFlowType(paramType reflect.Type) bool {
return paramType.Implements(flowInterfaceType)
}

View File

@ -78,6 +78,7 @@ func (pool *kisPool) GetFlow(name string) Flow {
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
func (pool *kisPool) FaaS(fnName string, f FaaS) {
// 当注册FaaS计算逻辑回调时创建一个FaaSDesc描述对象
faaSDesc, err := NewFaaSDesc(fnName, f)
if err != nil {
panic(err)
@ -87,6 +88,7 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {
defer pool.fnLock.Unlock()
if _, ok := pool.fnRouter[fnName]; !ok {
// 将FaaSDesc描述对象注册到fnRouter中
pool.fnRouter[fnName] = faaSDesc
} else {
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
@ -98,38 +100,58 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {
// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// 被调度Function的形参列表
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// 如果是Flow类型形参则将 flow的值传入
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// 如果是Context类型形参则将 ctx的值传入
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
if argType.Kind() == reflect.Slice {
value, err := funcDesc.FaasSerialize.DecodeParam(flow.Input(), argType)
// 如果是Slice类型形参则将 flow.Input()的值传入
if isSliceType(argType) {
// 将flow.Input()中的原始数据反序列化为argType类型的数据
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
if err != nil {
log.Logger().ErrorFX(ctx, "funcDesc.FaasSerialize.DecodeParam err=%v", err)
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
} else {
params = append(params, value)
continue
}
}
// 传递的参数既不是Flow类型也不是Context类型也不是Slice类型则默认给到零值
params = append(params, reflect.Zero(argType))
}
// 调用当前Function 的计算逻辑
retValues := funcDesc.FuncValue.Call(params)
// 取出第一个返回值如果是nil则返回nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// 如果返回值是error类型则返回error
return retValues[0].Interface().(error)
}
log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")

View File

@ -5,6 +5,8 @@ import (
"kis-flow/common"
)
// FaaS 定义移植到 faas.go 中
/*
Function Call
*/

View File

@ -2,16 +2,22 @@ package kis
import (
"kis-flow/common"
"kis-flow/serialize"
"reflect"
)
type FaasSerialize interface {
DecodeParam(common.KisRowArr, reflect.Type) (reflect.Value, error)
EncodeParam(interface{}) (common.KisRowArr, error)
// Serialize 数据序列化接口
type Serialize interface {
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
// Marshal 用于将指定类型的值序列化为 KisRowArr。
Marshal(interface{}) (common.KisRowArr, error)
}
var serializeInterfaceType = reflect.TypeOf((*FaasSerialize)(nil)).Elem()
// defaultSerialize KisFlow提供的默认序列化实现(开发者可以自定义)
var defaultSerialize = &serialize.DefaultSerialize{}
func isFaasSerialize(paramType reflect.Type) bool {
return paramType.Implements(serializeInterfaceType)
// isSerialize 判断传递进来的 paramType 是否实现了 Serialize 接口
func isSerialize(paramType reflect.Type) bool {
return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}

View File

@ -1,11 +0,0 @@
package kis
import (
"reflect"
"strings"
)
func isContextType(paramType reflect.Type) bool {
typeName := paramType.Name()
return strings.Contains(typeName, "Context")
}

View File

@ -1,4 +1,8 @@
package kis
/*
DefaultSerialize 实现了 Serialize 接口用于将 KisRowArr 序列化为指定类型的值或将指定类型的值序列化为 KisRowArr
这部分是KisFlow默认提供的序列化办法默认均是josn序列化开发者可以根据自己的需求实现自己的序列化办法
*/
package serialize
import (
"encoding/json"
@ -7,42 +11,50 @@ import (
"reflect"
)
type DefaultFaasSerialize struct {
}
type DefaultSerialize struct{}
// DecodeParam 用于将 KisRowArr 反序列化为指定类型的值。
func (f DefaultFaasSerialize) DecodeParam(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
// 确保传入的类型是一个切片
if r.Kind() != reflect.Slice {
return reflect.Value{}, fmt.Errorf("r must be a slice")
}
slice := reflect.MakeSlice(r, 0, len(arr))
// 遍历每个元素并尝试反序列化
for _, row := range arr {
var elem reflect.Value
var err error
// 先尝试断言为结构体或指针
elem, err = decodeStruct(row, r.Elem())
if err != nil {
// 如果失败,则尝试直接反序列化字符串
elem, err = decodeString(row, r.Elem())
if err != nil {
// 如果还失败,则尝试先序列化为 JSON 再反序列化
elem, err = decodeJSON(row, r.Elem())
if err != nil {
return reflect.Value{}, fmt.Errorf("failed to decode row: %v ", err)
}
}
// 尝试断言为结构体或指针
elem, err = unMarshalStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 尝试直接反序列化字符串
elem, err = unMarshalJsonString(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 尝试先序列化为 JSON 再反序列化
elem, err = unMarshalJsonStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
} else {
return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
}
}
return slice, nil
}
// 尝试断言为结构体或指针
func decodeStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 检查 row 是否为结构体或结构体指针类型
rowType := reflect.TypeOf(row)
if rowType == nil {
@ -54,14 +66,19 @@ func decodeStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, erro
// 如果 row 是指针类型,则获取它指向的类型
if rowType.Kind() == reflect.Ptr {
// 空指针
if reflect.ValueOf(row).IsNil() {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
row = reflect.ValueOf(row).Elem().Interface() // 解引用
// 解引用
row = reflect.ValueOf(row).Elem().Interface()
// 拿到解引用后的类型
rowType = reflect.TypeOf(row)
}
// 检查是否可以将 row 断言为 elemType
// 检查是否可以将 row 断言为 elemType(目标类型)
if !rowType.AssignableTo(elemType) {
return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
}
@ -70,17 +87,18 @@ func decodeStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, erro
return reflect.ValueOf(row), nil
}
// 尝试直接反序列化字符串
func decodeString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 尝试直接反序列化字符串(将Json字符串 反序列化为 结构体)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 判断源数据是否可以断言成string
str, ok := row.(string)
if !ok {
return reflect.Value{}, fmt.Errorf("not a string")
}
// 创建一个新的结构体实例,用于存储反序列化后的值
// 创建一个新的结构体实例,用于存储反序列化后的值
elem := reflect.New(elemType).Elem()
// 尝试将字符串反序列化为结构体。
// 尝试将json字符串反序列化为结构体。
if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
}
@ -88,14 +106,18 @@ func decodeString(row common.KisRow, elemType reflect.Type) (reflect.Value, erro
return elem, nil
}
// 尝试先序列化为 JSON 再反序列化
func decodeJSON(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 尝试先序列化为 JSON 再反序列化(将结构体转换成Json字符串再将Json字符串 反序列化为 结构体)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 将 row 序列化为 JSON 字符串
jsonBytes, err := json.Marshal(row)
if err != nil {
return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v ", err)
}
// 创建一个新的结构体实例,用于存储反序列化后的值
elem := reflect.New(elemType).Interface()
// 将 JSON 字符串反序列化为结构体
if err := json.Unmarshal(jsonBytes, elem); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v ", err)
}
@ -103,7 +125,8 @@ func decodeJSON(row common.KisRow, elemType reflect.Type) (reflect.Value, error)
return reflect.ValueOf(elem).Elem(), nil
}
func (f DefaultFaasSerialize) EncodeParam(i interface{}) (common.KisRowArr, error) {
// Marshal 用于将指定类型的值序列化为 KisRowArr(json 序列化)。
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
var arr common.KisRowArr
switch reflect.TypeOf(i).Kind() {

View File

@ -3,17 +3,21 @@ package faas
import (
"context"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
proto.StuScores
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{

View File

@ -4,14 +4,17 @@ import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/serialize"
"kis-flow/test/proto"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
proto.StuAvgScore
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

View File

@ -4,6 +4,7 @@ import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/file"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/faas"
@ -11,6 +12,50 @@ import (
"testing"
)
func TestAutoInjectParamWithConfig(t *testing.T) {
ctx := context.Background()
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("StuAvg")
if flow1 == nil {
panic("flow1 is nil")
}
// 3. 提交原始数据
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
_ = flow1.CommitRow(faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
},
})
// 提交原始数据json字符串
_ = flow1.CommitRow(`{"stu_id":101}`)
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
func TestAutoInjectParam(t *testing.T) {
ctx := context.Background()
@ -46,7 +91,7 @@ func TestAutoInjectParam(t *testing.T) {
// 3. 提交原始数据
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
proto.StuScores{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
@ -55,7 +100,7 @@ func TestAutoInjectParam(t *testing.T) {
})
_ = flow1.CommitRow(`{"stu_id":101}`)
_ = flow1.CommitRow(faas.AvgStuScoreIn{
proto.StuScores{
StuScores: proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,

View File

@ -0,0 +1,6 @@
kistype: flow
status: 1
flow_name: StuAvg
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore

View File

@ -0,0 +1,7 @@
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: 学生平均分
must:
- stu_id

View File

@ -0,0 +1,7 @@
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: 学生平均分
must:
- stu_id