diff --git a/README.md b/README.md
index a1f7f31..5ec5493 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# kis-flow
+# kis-flow
#### KisFlow(Keep It Simple Flowing)
@@ -17,17 +17,15 @@ Gitee(China)
Git: https://gitee.com/Aceld/kis-flow
-## 开发及教程文档
+
+## 《KisFlow开发者文档》
+
+https://www.yuque.com/aceld/kis-flow-doc
-KisFlow 采用开发教程同步执行
-
-教程文档地址:https://www.yuque.com/aceld/hsa94o
-
-
-
----
+## KisFlow框架开发教程:《基于Golang的流式计算框架实战教程》
+https://www.yuque.com/aceld/hsa94o
@@ -43,69 +41,11 @@ KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游
| 层级 | 层级说明 | 包括子模块 |
| --- | --- | --- |
-| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。
**KisConnectors**:计算数据流流中间状态持久存储及连接器。
**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。
**KisConfig:**KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。
**KisSource:**对接ODS的数据源 |
-| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。
执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。
**执行器场景KisScenes:**根据业务划分的逻辑任务集合。
**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |
+| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。
**KisConnectors**:计算数据流流中间状态持久存储及连接器。
**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。
**KisConfig:** KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。
**KisSource:** 对接ODS的数据源 |
+| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。
执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。
**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。
**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |
![KisFlow架构图drawio](https://github.com/aceld/kis-flow/assets/7778936/3b829bdb-600d-4ab9-9e62-e14f90737cc3)
-| 流 | 组成 |
-| --- | --- |
-| KisFlow(1) | KisFunction(V) + KisFunction(S) + KisFunction(C) + KisFunction(E) |
-| KisFlow(2) | KisFunction(V) + KisFunction(L) + KisFunction(S) + KisFunction(C) + KisFunction(E) |
-| KisFlow(3) | KisFunction(V) + KisFunction(L) + KisFunction(C) + KisFunction(E) |
-
-
-通过 KisFunction(S) 和 KisFunction(L)的并流组合关系,各个KisFlow有如下关系:
-```yaml
-KisFlow(2) = KisFlow(1) + KisFlow(2)
-KisFlow(3) = KisFlow(1) + KisFlow(2) + KisFlow(3)
-```
-
-
-#### (1) KisFunction配置
-```yaml
-kistype: func
-fid: 测试KisFunction_S1
-fname: 测试KisFunction_S1
-fmode: Save
-source:
- name: 被校验的测试数据源1-用户订单维度
- must:
- - userid
- - orderid
-
-option:
- cid: 测试KisConnector_1
- retry_times: 3
- retry_duration: 500
- default_params:
- default1: default1_param
- default2: default2_param
-```
-
-#### (2) KisFlow配置
-```yaml
-kistype: flow
-flow_id: MyFlow1
-status: 1
-flow_name: MyFlow1
-flows:
- - fid: 测试PrintInput
- params:
- args1: value1
- args2: value2
- - fid: 测试KisFunction_S1
- - fid: 测试PrintInput
- params:
- args1: value11
- args2: value22
- default2: newDefault
- - fid: 测试PrintInput
- - fid: 测试KisFunction_S1
- params:
- my_user_param1: ffffffxxxxxx
- - fid: 测试PrintInput
-```
![KisFlow架构设计-KisFlow整体结构 drawio](https://github.com/aceld/kis-flow/assets/7778936/efc1b29d-9dd4-4945-a35a-fb9a618002d7)
@@ -119,120 +59,372 @@ KisFlow是一种流式概念形态,具体表现的特征如下:
3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。
-#### (3) KisConnector配置
-
-```yaml
-kistype: conn
-cid: 测试KisConnector_1
-cname: 测试KisConnector_1
-addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
-type: redis
-key: userid_orderid_option
-params:
- args1: value1
- args2: value2
-load: null
-save:
- - 测试KisFunction_S1
-```
-
-#### (4) KisFlow全局配置
-
-```yaml
-#kistype Global为kisflow的全局配置
-kistype: global
-#是否启动prometheus监控
-prometheus_enable: true
-#是否需要kisflow单独启动端口监听
-prometheus_listen: true
-#prometheus取点监听地址
-prometheus_serve: 0.0.0.0:20004
-```
-
## Example
下面是简单的应用场景案例,具体应用单元用例请 参考
-https://github.com/aceld/kis-flow/tree/master/test
-### 主流程
+https://github.com/aceld/kis-flow-usage
-```go
-import (
- "context"
- "kis-flow/file"
- "kis-flow/kis"
- "kis-flow/test/faas"
- "testing"
-)
+#### 《KisFlow开发者文档》
-func main() {
- ctx := context.Background()
-
- // 1. 加载配置文件并构建Flow
- if err := file.ConfigImportYaml("/XXX/kis-flow/test/load_conf/"); err != nil {
- panic(err)
- }
-
- // 2. 获取Flow
- flow1 := kis.Pool().GetFlow("flowName1")
-
- // 3. 提交原始数据
- _ = flow1.CommitRow("This is Data1 from Test")
- _ = flow1.CommitRow("This is Data2 from Test")
- _ = flow1.CommitRow("This is Data3 from Test")
-
- // 4. 执行flow1
- if err := flow1.Run(ctx); err != nil {
- panic(err)
- }
-}
-
-func init() {
- kis.Pool().FaaS("funcName1", FuncDemo1Handler)
- kis.Pool().FaaS("funcName2", FuncDemo2Handler)
- kis.Pool().FaaS("funcName3", FuncDemo3Handler)
-}
+中文:https://www.yuque.com/aceld/kis-flow-doc
+#### 安装KisFlow
+```bash
+$go get github.com/aceld/kis-flow
```
-### 计算逻辑
+
+1. Quick Start(快速开始)
+### 案例源代码
+https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start
+
+### 项目目录
+
+```bash
+├── faas_stu_score_avg.go
+├── faas_stu_score_avg_print.go
+└── main.go
+```
+
+### Flow
+
+
+
+### Main
+> main.go
```go
-package faas
+package main
import (
"context"
"fmt"
- "kis-flow/kis"
+ "github.com/aceld/kis-flow/common"
+ "github.com/aceld/kis-flow/config"
+ "github.com/aceld/kis-flow/flow"
+ "github.com/aceld/kis-flow/kis"
)
-// type FaaS func(context.Context, Flow) error
+func main() {
+ ctx := context.Background()
-func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
- fmt.Println("---> Call funcName1Handler ----")
- fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
+ // Create a new flow configuration
+ myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
- for index, row := range flow.Input() {
- // 打印数据
- str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
- fmt.Println(str)
+ // Create new function configuration
+ avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
+ printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
- // 计算结果数据
- resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
+ // Create a new flow
+ flow1 := flow.NewKisFlow(myFlowConfig1)
- // 提交结果数据
- _ = flow.CommitRow(resultStr)
+ // Link functions to the flow
+ _ = flow1.Link(avgStuScoreConfig, nil)
+ _ = flow1.Link(printStuScoreConfig, nil)
+
+ // Submit a string
+ _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
+ // Submit a string
+ _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
+
+ // Run the flow
+ if err := flow1.Run(ctx); err != nil {
+ fmt.Println("err: ", err)
}
- return flow.Next()
+ return
}
-// ... FuncDemo2Handler
-
-// ... FuncDemo3Handler
+func init() {
+ // Register functions
+ kis.Pool().FaaS("AvgStuScore", AvgStuScore)
+ kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
+}
```
+### Function1
+
+> faas_stu_score_avg.go
+```go
+package main
+
+import (
+ "context"
+ "github.com/aceld/kis-flow/kis"
+ "github.com/aceld/kis-flow/serialize"
+)
+
+type AvgStuScoreIn struct {
+ serialize.DefaultSerialize
+ StuId int `json:"stu_id"`
+ Score1 int `json:"score_1"`
+ Score2 int `json:"score_2"`
+ Score3 int `json:"score_3"`
+}
+
+type AvgStuScoreOut struct {
+ serialize.DefaultSerialize
+ StuId int `json:"stu_id"`
+ AvgScore float64 `json:"avg_score"`
+}
+
+// AvgStuScore(FaaS) 计算学生平均分
+func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
+ for _, row := range rows {
+
+ out := AvgStuScoreOut{
+ StuId: row.StuId,
+ AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
+ }
+
+ // 提交结果数据
+ _ = flow.CommitRow(out)
+ }
+
+ return nil
+}
+```
+
+### Function2
+> faas_stu_score_avg_print.go
+```go
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/aceld/kis-flow/kis"
+ "github.com/aceld/kis-flow/serialize"
+)
+
+type PrintStuAvgScoreIn struct {
+ serialize.DefaultSerialize
+ StuId int `json:"stu_id"`
+ AvgScore float64 `json:"avg_score"`
+}
+
+type PrintStuAvgScoreOut struct {
+ serialize.DefaultSerialize
+}
+
+func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
+
+ for _, row := range rows {
+ fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
+ }
+
+ return nil
+}
+```
+
+### OutPut
+```bash
+Add KisPool FuncName=AvgStuScore
+Add KisPool FuncName=PrintStuAvgScore
+funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
+funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
+stuid: [101], avg score: [90]
+stuid: [102], avg score: [76.66666666666667]
+```
+
+
+
+
+
+
+2. Quick Start With Config(快速开始)
+
+### 案例源代码
+
+https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config
+
+项目目录
+```bash
+├── Makefile
+├── conf
+│ ├── flow-CalStuAvgScore.yml
+│ ├── func-AvgStuScore.yml
+│ └── func-PrintStuAvgScore.yml
+├── faas_stu_score_avg.go
+├── faas_stu_score_avg_print.go
+└── main.go
+```
+
+### Flow
+
+
+### Config
+#### (1) Flow Config
+
+> conf/flow-CalStuAvgScore.yml
+```yaml
+kistype: flow
+status: 1
+flow_name: CalStuAvgScore
+flows:
+ - fname: AvgStuScore
+ - fname: PrintStuAvgScore
+```
+
+#### (2) Function1 Config
+
+> conf/func-AvgStuScore.yml
+
+```yaml
+kistype: func
+fname: AvgStuScore
+fmode: Calculate
+source:
+ name: 学生学分
+ must:
+ - stu_id
+```
+
+#### (3) Function2(Slink) Config
+
+> conf/func-PrintStuAvgScore.yml
+
+```yaml
+kistype: func
+fname: PrintStuAvgScore
+fmode: Expand
+source:
+ name: 学生学分
+ must:
+ - stu_id
+```
+
+### Main
+> main.go
+
+```go
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/aceld/kis-flow/file"
+ "github.com/aceld/kis-flow/kis"
+)
+
+func main() {
+ ctx := context.Background()
+
+ // Load Configuration from file
+ if err := file.ConfigImportYaml("conf/"); err != nil {
+ panic(err)
+ }
+
+ // Get the flow
+ flow1 := kis.Pool().GetFlow("CalStuAvgScore")
+ if flow1 == nil {
+ panic("flow1 is nil")
+ }
+
+ // Submit a string
+ _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
+ // Submit a string
+ _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
+
+ // Run the flow
+ if err := flow1.Run(ctx); err != nil {
+ fmt.Println("err: ", err)
+ }
+
+ return
+}
+```
+
+### Function1
+> faas_stu_score_avg.go
+```go
+package main
+
+import (
+ "context"
+ "github.com/aceld/kis-flow/kis"
+ "github.com/aceld/kis-flow/serialize"
+)
+
+type AvgStuScoreIn struct {
+ serialize.DefaultSerialize
+ StuId int `json:"stu_id"`
+ Score1 int `json:"score_1"`
+ Score2 int `json:"score_2"`
+ Score3 int `json:"score_3"`
+}
+
+type AvgStuScoreOut struct {
+ serialize.DefaultSerialize
+ StuId int `json:"stu_id"`
+ AvgScore float64 `json:"avg_score"`
+}
+
+// AvgStuScore(FaaS) 计算学生平均分
+func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
+ for _, row := range rows {
+
+ out := AvgStuScoreOut{
+ StuId: row.StuId,
+ AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
+ }
+
+ // 提交结果数据
+ _ = flow.CommitRow(out)
+ }
+
+ return nil
+}
+```
+
+### Function2
+> faas_stu_score_avg_print.go
+```go
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/aceld/kis-flow/kis"
+ "github.com/aceld/kis-flow/serialize"
+)
+
+type PrintStuAvgScoreIn struct {
+ serialize.DefaultSerialize
+ StuId int `json:"stu_id"`
+ AvgScore float64 `json:"avg_score"`
+}
+
+type PrintStuAvgScoreOut struct {
+ serialize.DefaultSerialize
+}
+
+func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
+
+ for _, row := range rows {
+ fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
+ }
+
+ return nil
+}
+```
+
+### OutPut
+```bash
+Add KisPool FuncName=AvgStuScore
+Add KisPool FuncName=PrintStuAvgScore
+Add FlowRouter FlowName=CalStuAvgScore
+stuid: [101], avg score: [90]
+stuid: [102], avg score: [76.66666666666667]
+```
+
+
+
+
+---
+
### 开发者
* 刘丹冰([@aceld](https://github.com/aceld))