diff --git a/test/faas/faas_demo4.go b/test/faas/faas_demo4.go new file mode 100644 index 0000000..79233f3 --- /dev/null +++ b/test/faas/faas_demo4.go @@ -0,0 +1,26 @@ +package faas + +import ( + "context" + "fmt" + "kis-flow/kis" +) + +// type FaaS func(context.Context, Flow) error + +func FuncDemo4Handler(ctx context.Context, flow kis.Flow) error { + fmt.Println("---> Call FuncDemo4Handler ----") + + for index, row := range flow.Input() { + str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) + fmt.Println(str) + + // 计算结果数据 + resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) + + // 提交结果数据 + _ = flow.CommitRow(resultStr) + } + + return nil +} diff --git a/test/init.go b/test/init.go new file mode 100644 index 0000000..fb72805 --- /dev/null +++ b/test/init.go @@ -0,0 +1,25 @@ +package test + +import ( + "kis-flow/common" + "kis-flow/kis" + "kis-flow/test/caas" + "kis-flow/test/faas" +) + +func init() { + // 0. 注册Function 回调业务 + kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) + kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) + kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) + kis.Pool().FaaS("funcName4", faas.FuncDemo4Handler) + kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // abortFunc 业务 + kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // dataReuseFunc 业务 + kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // noResultFunc 业务 + kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // jumpFunc 业务 + + // 0. 注册ConnectorInit 和 Connector 回调业务 + kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) + kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) + +} diff --git a/test/kis_action_test.go b/test/kis_action_test.go index c286adc..c91cd51 100644 --- a/test/kis_action_test.go +++ b/test/kis_action_test.go @@ -2,28 +2,18 @@ package test import ( "context" - "kis-flow/common" + "fmt" "kis-flow/file" "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" ) func TestActionAbort(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务 - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/aceld/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { + fmt.Println("Wrong Config Yaml Path!") panic(err) } @@ -44,17 +34,9 @@ func TestActionAbort(t *testing.T) { func TestActionDataReuse(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务 - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { + fmt.Println("Wrong Config Yaml Path!") panic(err) } @@ -75,17 +57,9 @@ func TestActionDataReuse(t *testing.T) { func TestActionForceEntry(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务 - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { + fmt.Println("Wrong Config Yaml Path!") panic(err) } @@ -106,17 +80,9 @@ func TestActionForceEntry(t *testing.T) { func TestActionJumpFunc(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务 - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { + fmt.Println("Wrong Config Yaml Path!") panic(err) } diff --git a/test/kis_config_export_test.go b/test/kis_config_export_test.go index 485b324..7a842ab 100644 --- a/test/kis_config_export_test.go +++ b/test/kis_config_export_test.go @@ -1,33 +1,24 @@ package test import ( - "kis-flow/common" + "fmt" "kis-flow/file" "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" ) func TestConfigExportYmal(t *testing.T) { - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/aceld/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { + fmt.Println("Wrong Config Yaml Path!") panic(err) } // 2. 讲构建的内存KisFlow结构配置导出的文件当中 flows := kis.Pool().GetFlows() for _, flow := range flows { - if err := file.ConfigExportYaml(flow, "/Users/aceld/gopath/src/kis-flow/test/export_conf/"); err != nil { + if err := file.ConfigExportYaml(flow, "/Users/Aceld/go/src/kis-flow/test/export_conf/"); err != nil { panic(err) } } diff --git a/test/kis_config_import_test.go b/test/kis_config_import_test.go index 53293a3..62909e4 100644 --- a/test/kis_config_import_test.go +++ b/test/kis_config_import_test.go @@ -2,28 +2,16 @@ package test import ( "context" - "kis-flow/common" "kis-flow/file" "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" ) func TestConfigImportYaml(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/aceld/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { panic(err) } diff --git a/test/kis_connector_test.go b/test/kis_connector_test.go index 605e063..764a6c1 100644 --- a/test/kis_connector_test.go +++ b/test/kis_connector_test.go @@ -5,9 +5,6 @@ import ( "kis-flow/common" "kis-flow/config" "kis-flow/flow" - "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" ) @@ -15,15 +12,6 @@ func TestNewKisConnector(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 创建3个KisFunction配置实例, 其中myFuncConfig2 有Connector配置 source1 := config.KisSource{ Name: "公众号抖音商城户订单数据", diff --git a/test/kis_flow_test.go b/test/kis_flow_test.go index 557d1ad..b4d3d74 100644 --- a/test/kis_flow_test.go +++ b/test/kis_flow_test.go @@ -71,9 +71,9 @@ func TestNewKisFlowData(t *testing.T) { panic("myFuncConfig1 is nil") } - myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil) + myFuncConfig2 := config.NewFuncConfig("funcName4", common.E, &source2, nil) if myFuncConfig2 == nil { - panic("myFuncConfig2 is nil") + panic("myFuncConfig4 is nil") } // 2. 创建一个 KisFlow 配置实例 @@ -82,7 +82,7 @@ func TestNewKisFlowData(t *testing.T) { // 3. 创建一个KisFlow对象 flow1 := flow.NewKisFlow(myFlowConfig1) - // 4. 拼接Functioin 到 Flow 上 + // 4. 拼接 Function 到 Flow 上 if err := flow1.Link(myFuncConfig1, nil); err != nil { panic(err) } diff --git a/test/kis_fork_test.go b/test/kis_fork_test.go index 4325a85..05d6b9c 100644 --- a/test/kis_fork_test.go +++ b/test/kis_fork_test.go @@ -2,28 +2,16 @@ package test import ( "context" - "kis-flow/common" "kis-flow/file" "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" ) func TestForkFlow(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/aceld/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { panic(err) } diff --git a/test/kis_metrics_test.go b/test/kis_metrics_test.go index 35da706..9d03e06 100644 --- a/test/kis_metrics_test.go +++ b/test/kis_metrics_test.go @@ -2,11 +2,9 @@ package test import ( "context" - "kis-flow/common" + "fmt" "kis-flow/file" "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" "time" ) @@ -14,17 +12,9 @@ import ( func TestMetricsDataTotal(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/aceld/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { + fmt.Println("Wrong Config Yaml Path!") panic(err) } diff --git a/test/kis_params_test.go b/test/kis_params_test.go index fd75dc1..bfd73e0 100644 --- a/test/kis_params_test.go +++ b/test/kis_params_test.go @@ -2,28 +2,16 @@ package test import ( "context" - "kis-flow/common" "kis-flow/file" "kis-flow/kis" - "kis-flow/test/caas" - "kis-flow/test/faas" "testing" ) func TestParams(t *testing.T) { ctx := context.Background() - // 0. 注册Function 回调业务 - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) - - // 0. 注册ConnectorInit 和 Connector 回调业务 - kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) - kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/Users/aceld/gopath/src/kis-flow/test/load_conf/"); err != nil { + if err := file.ConfigImportYaml("load_conf/"); err != nil { panic(err) } diff --git a/test/kis_pool_test.go b/test/kis_pool_test.go index 52e7726..5f49483 100644 --- a/test/kis_pool_test.go +++ b/test/kis_pool_test.go @@ -5,8 +5,6 @@ import ( "kis-flow/common" "kis-flow/config" "kis-flow/flow" - "kis-flow/kis" - "kis-flow/test/faas" "testing" ) @@ -14,10 +12,6 @@ func TestNewKisPool(t *testing.T) { ctx := context.Background() - // 0. 注册Function - kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) - kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) - // 1. 创建2个KisFunction配置实例 source1 := config.KisSource{ Name: "公众号抖音商城户订单数据", @@ -34,9 +28,9 @@ func TestNewKisPool(t *testing.T) { panic("myFuncConfig1 is nil") } - myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil) + myFuncConfig2 := config.NewFuncConfig("funcName4", common.E, &source2, nil) if myFuncConfig2 == nil { - panic("myFuncConfig2 is nil") + panic("myFuncConfig4 is nil") } // 2. 创建一个 KisFlow 配置实例