diff --git a/test/faas/faas_stu_score_avg.go b/test/faas/faas_stu_score_avg.go new file mode 100644 index 0000000..755acc5 --- /dev/null +++ b/test/faas/faas_stu_score_avg.go @@ -0,0 +1,28 @@ +package faas + +import ( + "context" + "kis-flow/kis" + "kis-flow/test/proto" +) + +type AvgStuScoreIn struct { + proto.StuScores +} + +type AvgStuScoreOut struct { + proto.StuAvgScore +} + +func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error { + for _, row := range rows { + avgScore := proto.StuAvgScore{ + StuId: row.StuId, + AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3, + } + // 提交结果数据 + _ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore}) + } + + return nil +} diff --git a/test/faas/faas_stu_score_avg_print.go b/test/faas/faas_stu_score_avg_print.go new file mode 100644 index 0000000..7632d4d --- /dev/null +++ b/test/faas/faas_stu_score_avg_print.go @@ -0,0 +1,24 @@ +package faas + +import ( + "context" + "fmt" + "kis-flow/kis" + "kis-flow/test/proto" +) + +type PrintStuAvgScoreIn struct { + proto.StuAvgScore +} + +type PrintStuAvgScoreOut struct { +} + +func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error { + + for _, row := range rows { + fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore) + } + + return nil +} diff --git a/test/kis_auto_inject_param_test.go b/test/kis_auto_inject_param_test.go new file mode 100644 index 0000000..8da2932 --- /dev/null +++ b/test/kis_auto_inject_param_test.go @@ -0,0 +1,71 @@ +package test + +import ( + "context" + "kis-flow/common" + "kis-flow/config" + "kis-flow/flow" + "kis-flow/kis" + "kis-flow/test/faas" + "kis-flow/test/proto" + "testing" +) + +func TestAutoInjectParam(t *testing.T) { + ctx := context.Background() + + kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore) + kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore) + + source1 := config.KisSource{ + Name: "Test", + Must: []string{}, + } + + avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, &source1, nil) + if avgStuScoreConfig == nil { + panic("AvgStuScore is nil") + } + + printStuAvgScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.C, &source1, nil) + if printStuAvgScoreConfig == nil { + panic("printStuAvgScoreConfig is nil") + } + + myFlowConfig1 := config.NewFlowConfig("cal_stu_avg_score", common.FlowEnable) + + flow1 := flow.NewKisFlow(myFlowConfig1) + + // 4. 拼接Functioin 到 Flow 上 + if err := flow1.Link(avgStuScoreConfig, nil); err != nil { + panic(err) + } + if err := flow1.Link(printStuAvgScoreConfig, nil); err != nil { + panic(err) + } + + // 3. 提交原始数据 + _ = flow1.CommitRow(proto.StuScores{ + StuId: 100, + Score1: 1, + Score2: 2, + Score3: 3, + }) + _ = flow1.CommitRow(proto.StuScores{ + StuId: 101, + Score1: 11, + Score2: 22, + Score3: 33, + }) + _ = flow1.CommitRow(proto.StuScores{ + StuId: 102, + Score1: 111, + Score2: 222, + Score3: 333, + }) + + // 4. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +} diff --git a/test/proto/stu_score.go b/test/proto/stu_score.go new file mode 100644 index 0000000..3aa1119 --- /dev/null +++ b/test/proto/stu_score.go @@ -0,0 +1,13 @@ +package proto + +type StuScores struct { + StuId int `json:"stu_id"` + Score1 int `json:"score_1"` + Score2 int `json:"score_2"` + Score3 int `json:"score_3"` +} + +type StuAvgScore struct { + StuId int `json:"stu_id"` + AvgScore float64 `json:"avg_score"` +}