feat: automatic injection parameters

This commit is contained in:
huchenhao 2024-03-20 11:34:15 +08:00
parent cadbd48538
commit 4417653032
4 changed files with 136 additions and 0 deletions

View File

@ -0,0 +1,28 @@
package faas
import (
"context"
"kis-flow/kis"
"kis-flow/test/proto"
)
type AvgStuScoreIn struct {
proto.StuScores
}
type AvgStuScoreOut struct {
proto.StuAvgScore
}
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// 提交结果数据
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}
return nil
}

View File

@ -0,0 +1,24 @@
package faas
import (
"context"
"fmt"
"kis-flow/kis"
"kis-flow/test/proto"
)
type PrintStuAvgScoreIn struct {
proto.StuAvgScore
}
type PrintStuAvgScoreOut struct {
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}

View File

@ -0,0 +1,71 @@
package test
import (
"context"
"kis-flow/common"
"kis-flow/config"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/test/faas"
"kis-flow/test/proto"
"testing"
)
func TestAutoInjectParam(t *testing.T) {
ctx := context.Background()
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
source1 := config.KisSource{
Name: "Test",
Must: []string{},
}
avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, &source1, nil)
if avgStuScoreConfig == nil {
panic("AvgStuScore is nil")
}
printStuAvgScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.C, &source1, nil)
if printStuAvgScoreConfig == nil {
panic("printStuAvgScoreConfig is nil")
}
myFlowConfig1 := config.NewFlowConfig("cal_stu_avg_score", common.FlowEnable)
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接Functioin 到 Flow 上
if err := flow1.Link(avgStuScoreConfig, nil); err != nil {
panic(err)
}
if err := flow1.Link(printStuAvgScoreConfig, nil); err != nil {
panic(err)
}
// 3. 提交原始数据
_ = flow1.CommitRow(proto.StuScores{
StuId: 100,
Score1: 1,
Score2: 2,
Score3: 3,
})
_ = flow1.CommitRow(proto.StuScores{
StuId: 101,
Score1: 11,
Score2: 22,
Score3: 33,
})
_ = flow1.CommitRow(proto.StuScores{
StuId: 102,
Score1: 111,
Score2: 222,
Score3: 333,
})
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}

13
test/proto/stu_score.go Normal file
View File

@ -0,0 +1,13 @@
package proto
type StuScores struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type StuAvgScore struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}