diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go index 73b68d5..df55d10 100644 --- a/flow/kis_flow_data.go +++ b/flow/kis_flow_data.go @@ -9,6 +9,7 @@ import ( "github.com/aceld/kis-flow/log" "github.com/aceld/kis-flow/metrics" "github.com/patrickmn/go-cache" + "reflect" "time" ) @@ -20,6 +21,21 @@ func (flow *KisFlow) CommitRow(row interface{}) error { return nil } +// CommitRowBatch 提交Flow数据, 批量数据 +func (flow *KisFlow) CommitRowBatch(rows interface{}) error { + v := reflect.ValueOf(rows) + if v.Kind() != reflect.Slice { + return fmt.Errorf("Commit Data is not a slice") + } + + for i := 0; i < v.Len(); i++ { + row := v.Index(i).Interface().(common.KisRow) + flow.buffer = append(flow.buffer, row) + } + + return nil +} + // Input 得到flow当前执行Function的输入源数据 func (flow *KisFlow) Input() common.KisRowArr { return flow.inPut diff --git a/kis/flow.go b/kis/flow.go index 34d856e..25b8c86 100644 --- a/kis/flow.go +++ b/kis/flow.go @@ -14,6 +14,9 @@ type Flow interface { Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error + // CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交) + // row: Must be a slice + CommitRowBatch(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 diff --git a/test/kis_flow_commit_batch_test.go b/test/kis_flow_commit_batch_test.go new file mode 100644 index 0000000..fb919f9 --- /dev/null +++ b/test/kis_flow_commit_batch_test.go @@ -0,0 +1,38 @@ +package test + +import ( + "context" + "github.com/aceld/kis-flow/file" + "github.com/aceld/kis-flow/kis" + "github.com/aceld/kis-flow/log" + "testing" +) + +func TestForkFlowCommitBatch(t *testing.T) { + ctx := context.Background() + + // 1. 加载配置文件并构建Flow + if err := file.ConfigImportYaml("load_conf/"); err != nil { + panic(err) + } + + // 2. 获取Flow + flow1 := kis.Pool().GetFlow("flowName1") + + stringRows := []string{ + "This is Data1 from Test", + "This is Data2 from Test", + "This is Data3 from Test", + } + + // 3. 提交原始数据 + if err := flow1.CommitRowBatch(stringRows); err != nil { + log.Logger().ErrorF("CommitRowBatch Error, err = %+v", err) + panic(err) + } + + // 4. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } +}