From 9bf098f2118979592b8f1d49744c3ef01606120a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E4=B8=B9=E5=86=B0?= Date: Wed, 27 Mar 2024 14:46:13 +0800 Subject: [PATCH] Update README.md Add Example --- README.md | 502 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 347 insertions(+), 155 deletions(-) diff --git a/README.md b/README.md index a1f7f31..5ec5493 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# kis-flow +image# kis-flow #### KisFlow(Keep It Simple Flowing) @@ -17,17 +17,15 @@ Gitee(China) Git: https://gitee.com/Aceld/kis-flow -## 开发及教程文档 + +## 《KisFlow开发者文档》 + +https://www.yuque.com/aceld/kis-flow-doc -KisFlow 采用开发教程同步执行 - -教程文档地址:https://www.yuque.com/aceld/hsa94o - - - ---- +## KisFlow框架开发教程:《基于Golang的流式计算框架实战教程》 +https://www.yuque.com/aceld/hsa94o @@ -43,69 +41,11 @@ KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游 | 层级 | 层级说明 | 包括子模块 | | --- | --- | --- | -| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。

**KisConnectors**:计算数据流流中间状态持久存储及连接器。

**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。

**KisConfig:**KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。

**KisSource:**对接ODS的数据源 | -| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。

执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。

**执行器场景KisScenes:**根据业务划分的逻辑任务集合。

**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 | +| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。

**KisConnectors**:计算数据流流中间状态持久存储及连接器。

**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。

**KisConfig:** KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。

**KisSource:** 对接ODS的数据源 | +| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。

执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。

**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。

**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 | ![KisFlow架构图drawio](https://github.com/aceld/kis-flow/assets/7778936/3b829bdb-600d-4ab9-9e62-e14f90737cc3) -| 流 | 组成 | -| --- | --- | -| KisFlow(1) | KisFunction(V) + KisFunction(S) + KisFunction(C) + KisFunction(E) | -| KisFlow(2) | KisFunction(V) + KisFunction(L) + KisFunction(S) + KisFunction(C) + KisFunction(E) | -| KisFlow(3) | KisFunction(V) + KisFunction(L) + KisFunction(C) + KisFunction(E) | - - -通过 KisFunction(S) 和 KisFunction(L)的并流组合关系,各个KisFlow有如下关系: -```yaml -KisFlow(2) = KisFlow(1) + KisFlow(2) -KisFlow(3) = KisFlow(1) + KisFlow(2) + KisFlow(3) -``` - - -#### (1) KisFunction配置 -```yaml -kistype: func -fid: 测试KisFunction_S1 -fname: 测试KisFunction_S1 -fmode: Save -source: - name: 被校验的测试数据源1-用户订单维度 - must: - - userid - - orderid - -option: - cid: 测试KisConnector_1 - retry_times: 3 - retry_duration: 500 - default_params: - default1: default1_param - default2: default2_param -``` - -#### (2) KisFlow配置 -```yaml -kistype: flow -flow_id: MyFlow1 -status: 1 -flow_name: MyFlow1 -flows: - - fid: 测试PrintInput - params: - args1: value1 - args2: value2 - - fid: 测试KisFunction_S1 - - fid: 测试PrintInput - params: - args1: value11 - args2: value22 - default2: newDefault - - fid: 测试PrintInput - - fid: 测试KisFunction_S1 - params: - my_user_param1: ffffffxxxxxx - - fid: 测试PrintInput -``` ![KisFlow架构设计-KisFlow整体结构 drawio](https://github.com/aceld/kis-flow/assets/7778936/efc1b29d-9dd4-4945-a35a-fb9a618002d7) @@ -119,120 +59,372 @@ KisFlow是一种流式概念形态,具体表现的特征如下:
3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。 -#### (3) KisConnector配置 - -```yaml -kistype: conn -cid: 测试KisConnector_1 -cname: 测试KisConnector_1 -addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' -type: redis -key: userid_orderid_option -params: - args1: value1 - args2: value2 -load: null -save: - - 测试KisFunction_S1 -``` - -#### (4) KisFlow全局配置 - -```yaml -#kistype Global为kisflow的全局配置 -kistype: global -#是否启动prometheus监控 -prometheus_enable: true -#是否需要kisflow单独启动端口监听 -prometheus_listen: true -#prometheus取点监听地址 -prometheus_serve: 0.0.0.0:20004 -``` - ## Example 下面是简单的应用场景案例,具体应用单元用例请 参考 -https://github.com/aceld/kis-flow/tree/master/test -### 主流程 +https://github.com/aceld/kis-flow-usage -```go -import ( - "context" - "kis-flow/file" - "kis-flow/kis" - "kis-flow/test/faas" - "testing" -) +#### 《KisFlow开发者文档》 -func main() { - ctx := context.Background() - - // 1. 加载配置文件并构建Flow - if err := file.ConfigImportYaml("/XXX/kis-flow/test/load_conf/"); err != nil { - panic(err) - } - - // 2. 获取Flow - flow1 := kis.Pool().GetFlow("flowName1") - - // 3. 提交原始数据 - _ = flow1.CommitRow("This is Data1 from Test") - _ = flow1.CommitRow("This is Data2 from Test") - _ = flow1.CommitRow("This is Data3 from Test") - - // 4. 执行flow1 - if err := flow1.Run(ctx); err != nil { - panic(err) - } -} - -func init() { - kis.Pool().FaaS("funcName1", FuncDemo1Handler) - kis.Pool().FaaS("funcName2", FuncDemo2Handler) - kis.Pool().FaaS("funcName3", FuncDemo3Handler) -} +中文:https://www.yuque.com/aceld/kis-flow-doc +#### 安装KisFlow +```bash +$go get github.com/aceld/kis-flow ``` -### 计算逻辑 +
+1. Quick Start(快速开始) +### 案例源代码 +https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start + +### 项目目录 + +```bash +├── faas_stu_score_avg.go +├── faas_stu_score_avg_print.go +└── main.go +``` + +### Flow + +image + +### Main +> main.go ```go -package faas +package main import ( "context" "fmt" - "kis-flow/kis" + "github.com/aceld/kis-flow/common" + "github.com/aceld/kis-flow/config" + "github.com/aceld/kis-flow/flow" + "github.com/aceld/kis-flow/kis" ) -// type FaaS func(context.Context, Flow) error +func main() { + ctx := context.Background() -func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error { - fmt.Println("---> Call funcName1Handler ----") - fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) + // Create a new flow configuration + myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable) - for index, row := range flow.Input() { - // 打印数据 - str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) - fmt.Println(str) + // Create new function configuration + avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil) + printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil) - // 计算结果数据 - resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) + // Create a new flow + flow1 := flow.NewKisFlow(myFlowConfig1) - // 提交结果数据 - _ = flow.CommitRow(resultStr) + // Link functions to the flow + _ = flow1.Link(avgStuScoreConfig, nil) + _ = flow1.Link(printStuScoreConfig, nil) + + // Submit a string + _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`) + // Submit a string + _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`) + + // Run the flow + if err := flow1.Run(ctx); err != nil { + fmt.Println("err: ", err) } - return flow.Next() + return } -// ... FuncDemo2Handler - -// ... FuncDemo3Handler +func init() { + // Register functions + kis.Pool().FaaS("AvgStuScore", AvgStuScore) + kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) +} ``` +### Function1 + +> faas_stu_score_avg.go +```go +package main + +import ( + "context" + "github.com/aceld/kis-flow/kis" + "github.com/aceld/kis-flow/serialize" +) + +type AvgStuScoreIn struct { + serialize.DefaultSerialize + StuId int `json:"stu_id"` + Score1 int `json:"score_1"` + Score2 int `json:"score_2"` + Score3 int `json:"score_3"` +} + +type AvgStuScoreOut struct { + serialize.DefaultSerialize + StuId int `json:"stu_id"` + AvgScore float64 `json:"avg_score"` +} + +// AvgStuScore(FaaS) 计算学生平均分 +func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error { + for _, row := range rows { + + out := AvgStuScoreOut{ + StuId: row.StuId, + AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3, + } + + // 提交结果数据 + _ = flow.CommitRow(out) + } + + return nil +} +``` + +### Function2 +> faas_stu_score_avg_print.go +```go +package main + +import ( + "context" + "fmt" + "github.com/aceld/kis-flow/kis" + "github.com/aceld/kis-flow/serialize" +) + +type PrintStuAvgScoreIn struct { + serialize.DefaultSerialize + StuId int `json:"stu_id"` + AvgScore float64 `json:"avg_score"` +} + +type PrintStuAvgScoreOut struct { + serialize.DefaultSerialize +} + +func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error { + + for _, row := range rows { + fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore) + } + + return nil +} +``` + +### OutPut +```bash +Add KisPool FuncName=AvgStuScore +Add KisPool FuncName=PrintStuAvgScore +funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source. +funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source. +stuid: [101], avg score: [90] +stuid: [102], avg score: [76.66666666666667] +``` + + +
+ + +
+2. Quick Start With Config(快速开始) + +### 案例源代码 + +https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config + +项目目录 +```bash +├── Makefile +├── conf +│ ├── flow-CalStuAvgScore.yml +│ ├── func-AvgStuScore.yml +│ └── func-PrintStuAvgScore.yml +├── faas_stu_score_avg.go +├── faas_stu_score_avg_print.go +└── main.go +``` + +### Flow +image + +### Config +#### (1) Flow Config + +> conf/flow-CalStuAvgScore.yml +```yaml +kistype: flow +status: 1 +flow_name: CalStuAvgScore +flows: + - fname: AvgStuScore + - fname: PrintStuAvgScore +``` + +#### (2) Function1 Config + +> conf/func-AvgStuScore.yml + +```yaml +kistype: func +fname: AvgStuScore +fmode: Calculate +source: + name: 学生学分 + must: + - stu_id +``` + +#### (3) Function2(Slink) Config + +> conf/func-PrintStuAvgScore.yml + +```yaml +kistype: func +fname: PrintStuAvgScore +fmode: Expand +source: + name: 学生学分 + must: + - stu_id +``` + +### Main +> main.go + +```go +package main + +import ( + "context" + "fmt" + "github.com/aceld/kis-flow/file" + "github.com/aceld/kis-flow/kis" +) + +func main() { + ctx := context.Background() + + // Load Configuration from file + if err := file.ConfigImportYaml("conf/"); err != nil { + panic(err) + } + + // Get the flow + flow1 := kis.Pool().GetFlow("CalStuAvgScore") + if flow1 == nil { + panic("flow1 is nil") + } + + // Submit a string + _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`) + // Submit a string + _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`) + + // Run the flow + if err := flow1.Run(ctx); err != nil { + fmt.Println("err: ", err) + } + + return +} +``` + +### Function1 +> faas_stu_score_avg.go +```go +package main + +import ( + "context" + "github.com/aceld/kis-flow/kis" + "github.com/aceld/kis-flow/serialize" +) + +type AvgStuScoreIn struct { + serialize.DefaultSerialize + StuId int `json:"stu_id"` + Score1 int `json:"score_1"` + Score2 int `json:"score_2"` + Score3 int `json:"score_3"` +} + +type AvgStuScoreOut struct { + serialize.DefaultSerialize + StuId int `json:"stu_id"` + AvgScore float64 `json:"avg_score"` +} + +// AvgStuScore(FaaS) 计算学生平均分 +func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error { + for _, row := range rows { + + out := AvgStuScoreOut{ + StuId: row.StuId, + AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3, + } + + // 提交结果数据 + _ = flow.CommitRow(out) + } + + return nil +} +``` + +### Function2 +> faas_stu_score_avg_print.go +```go +package main + +import ( + "context" + "fmt" + "github.com/aceld/kis-flow/kis" + "github.com/aceld/kis-flow/serialize" +) + +type PrintStuAvgScoreIn struct { + serialize.DefaultSerialize + StuId int `json:"stu_id"` + AvgScore float64 `json:"avg_score"` +} + +type PrintStuAvgScoreOut struct { + serialize.DefaultSerialize +} + +func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error { + + for _, row := range rows { + fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore) + } + + return nil +} +``` + +### OutPut +```bash +Add KisPool FuncName=AvgStuScore +Add KisPool FuncName=PrintStuAvgScore +Add FlowRouter FlowName=CalStuAvgScore +stuid: [101], avg score: [90] +stuid: [102], avg score: [76.66666666666667] +``` + +
+ + +--- + ### 开发者 * 刘丹冰([@aceld](https://github.com/aceld))