mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-23 07:30:23 +08:00
Update README.md Add Example
This commit is contained in:
parent
bf15648a62
commit
9bf098f211
502
README.md
502
README.md
@ -1,4 +1,4 @@
|
||||
# kis-flow
|
||||
<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/0d907741-8909-465f-95e6-fa3671a4eb5e"># kis-flow
|
||||
|
||||
|
||||
#### KisFlow(Keep It Simple Flowing)
|
||||
@ -17,17 +17,15 @@ Gitee(China)
|
||||
Git: https://gitee.com/Aceld/kis-flow
|
||||
|
||||
|
||||
## 开发及教程文档
|
||||
|
||||
## 《KisFlow开发者文档》
|
||||
|
||||
https://www.yuque.com/aceld/kis-flow-doc
|
||||
|
||||
|
||||
KisFlow 采用开发教程同步执行
|
||||
|
||||
教程文档地址:https://www.yuque.com/aceld/hsa94o
|
||||
|
||||
|
||||
|
||||
---
|
||||
## KisFlow框架开发教程:《基于Golang的流式计算框架实战教程》
|
||||
|
||||
https://www.yuque.com/aceld/hsa94o
|
||||
|
||||
|
||||
|
||||
@ -43,69 +41,11 @@ KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游
|
||||
|
||||
| 层级 | 层级说明 | 包括子模块 |
|
||||
| --- | --- | --- |
|
||||
| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。<br /><br />**KisConnectors**:计算数据流流中间状态持久存储及连接器。<br /><br />**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。<br /><br />**KisConfig:**KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。<br /><br />**KisSource:**对接ODS的数据源 |
|
||||
| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。<br /><br />执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。<br /><br />**执行器场景KisScenes:**根据业务划分的逻辑任务集合。<br /><br />**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |
|
||||
| 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。<br /><br />**KisConnectors**:计算数据流流中间状态持久存储及连接器。<br /><br />**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。<br /><br />**KisConfig:** KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。<br /><br />**KisSource:** 对接ODS的数据源 |
|
||||
| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。<br /><br />执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。<br /><br />**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。<br /><br />**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 |
|
||||
|
||||
![KisFlow架构图drawio](https://github.com/aceld/kis-flow/assets/7778936/3b829bdb-600d-4ab9-9e62-e14f90737cc3)
|
||||
|
||||
| 流 | 组成 |
|
||||
| --- | --- |
|
||||
| KisFlow(1) | KisFunction(V) + KisFunction(S) + KisFunction(C) + KisFunction(E) |
|
||||
| KisFlow(2) | KisFunction(V) + KisFunction(L) + KisFunction(S) + KisFunction(C) + KisFunction(E) |
|
||||
| KisFlow(3) | KisFunction(V) + KisFunction(L) + KisFunction(C) + KisFunction(E) |
|
||||
|
||||
|
||||
通过 KisFunction(S) 和 KisFunction(L)的并流组合关系,各个KisFlow有如下关系:
|
||||
```yaml
|
||||
KisFlow(2) = KisFlow(1) + KisFlow(2)
|
||||
KisFlow(3) = KisFlow(1) + KisFlow(2) + KisFlow(3)
|
||||
```
|
||||
|
||||
|
||||
#### (1) KisFunction配置
|
||||
```yaml
|
||||
kistype: func
|
||||
fid: 测试KisFunction_S1
|
||||
fname: 测试KisFunction_S1
|
||||
fmode: Save
|
||||
source:
|
||||
name: 被校验的测试数据源1-用户订单维度
|
||||
must:
|
||||
- userid
|
||||
- orderid
|
||||
|
||||
option:
|
||||
cid: 测试KisConnector_1
|
||||
retry_times: 3
|
||||
retry_duration: 500
|
||||
default_params:
|
||||
default1: default1_param
|
||||
default2: default2_param
|
||||
```
|
||||
|
||||
#### (2) KisFlow配置
|
||||
```yaml
|
||||
kistype: flow
|
||||
flow_id: MyFlow1
|
||||
status: 1
|
||||
flow_name: MyFlow1
|
||||
flows:
|
||||
- fid: 测试PrintInput
|
||||
params:
|
||||
args1: value1
|
||||
args2: value2
|
||||
- fid: 测试KisFunction_S1
|
||||
- fid: 测试PrintInput
|
||||
params:
|
||||
args1: value11
|
||||
args2: value22
|
||||
default2: newDefault
|
||||
- fid: 测试PrintInput
|
||||
- fid: 测试KisFunction_S1
|
||||
params:
|
||||
my_user_param1: ffffffxxxxxx
|
||||
- fid: 测试PrintInput
|
||||
```
|
||||
|
||||
|
||||
![KisFlow架构设计-KisFlow整体结构 drawio](https://github.com/aceld/kis-flow/assets/7778936/efc1b29d-9dd4-4945-a35a-fb9a618002d7)
|
||||
@ -119,120 +59,372 @@ KisFlow是一种流式概念形态,具体表现的特征如下:<br />
|
||||
|
||||
3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。
|
||||
|
||||
#### (3) KisConnector配置
|
||||
|
||||
```yaml
|
||||
kistype: conn
|
||||
cid: 测试KisConnector_1
|
||||
cname: 测试KisConnector_1
|
||||
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
|
||||
type: redis
|
||||
key: userid_orderid_option
|
||||
params:
|
||||
args1: value1
|
||||
args2: value2
|
||||
load: null
|
||||
save:
|
||||
- 测试KisFunction_S1
|
||||
```
|
||||
|
||||
#### (4) KisFlow全局配置
|
||||
|
||||
```yaml
|
||||
#kistype Global为kisflow的全局配置
|
||||
kistype: global
|
||||
#是否启动prometheus监控
|
||||
prometheus_enable: true
|
||||
#是否需要kisflow单独启动端口监听
|
||||
prometheus_listen: true
|
||||
#prometheus取点监听地址
|
||||
prometheus_serve: 0.0.0.0:20004
|
||||
```
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
下面是简单的应用场景案例,具体应用单元用例请 参考
|
||||
https://github.com/aceld/kis-flow/tree/master/test
|
||||
|
||||
### 主流程
|
||||
https://github.com/aceld/kis-flow-usage
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"kis-flow/file"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/test/faas"
|
||||
"testing"
|
||||
)
|
||||
#### 《KisFlow开发者文档》
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
// 1. 加载配置文件并构建Flow
|
||||
if err := file.ConfigImportYaml("/XXX/kis-flow/test/load_conf/"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// 2. 获取Flow
|
||||
flow1 := kis.Pool().GetFlow("flowName1")
|
||||
|
||||
// 3. 提交原始数据
|
||||
_ = flow1.CommitRow("This is Data1 from Test")
|
||||
_ = flow1.CommitRow("This is Data2 from Test")
|
||||
_ = flow1.CommitRow("This is Data3 from Test")
|
||||
|
||||
// 4. 执行flow1
|
||||
if err := flow1.Run(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
kis.Pool().FaaS("funcName1", FuncDemo1Handler)
|
||||
kis.Pool().FaaS("funcName2", FuncDemo2Handler)
|
||||
kis.Pool().FaaS("funcName3", FuncDemo3Handler)
|
||||
}
|
||||
中文:https://www.yuque.com/aceld/kis-flow-doc
|
||||
|
||||
#### 安装KisFlow
|
||||
```bash
|
||||
$go get github.com/aceld/kis-flow
|
||||
```
|
||||
|
||||
### 计算逻辑
|
||||
<details>
|
||||
<summary>1. Quick Start(快速开始)</summary>
|
||||
|
||||
### 案例源代码
|
||||
https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start
|
||||
|
||||
### 项目目录
|
||||
|
||||
```bash
|
||||
├── faas_stu_score_avg.go
|
||||
├── faas_stu_score_avg_print.go
|
||||
└── main.go
|
||||
```
|
||||
|
||||
### Flow
|
||||
|
||||
<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/3747ed10-aba1-417e-a3c1-c6205a02444b">
|
||||
|
||||
### Main
|
||||
> main.go
|
||||
```go
|
||||
package faas
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"kis-flow/kis"
|
||||
"github.com/aceld/kis-flow/common"
|
||||
"github.com/aceld/kis-flow/config"
|
||||
"github.com/aceld/kis-flow/flow"
|
||||
"github.com/aceld/kis-flow/kis"
|
||||
)
|
||||
|
||||
// type FaaS func(context.Context, Flow) error
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
|
||||
fmt.Println("---> Call funcName1Handler ----")
|
||||
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
|
||||
// Create a new flow configuration
|
||||
myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
|
||||
|
||||
for index, row := range flow.Input() {
|
||||
// 打印数据
|
||||
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
|
||||
fmt.Println(str)
|
||||
// Create new function configuration
|
||||
avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
|
||||
printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
|
||||
|
||||
// 计算结果数据
|
||||
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
|
||||
// Create a new flow
|
||||
flow1 := flow.NewKisFlow(myFlowConfig1)
|
||||
|
||||
// 提交结果数据
|
||||
_ = flow.CommitRow(resultStr)
|
||||
// Link functions to the flow
|
||||
_ = flow1.Link(avgStuScoreConfig, nil)
|
||||
_ = flow1.Link(printStuScoreConfig, nil)
|
||||
|
||||
// Submit a string
|
||||
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
|
||||
// Submit a string
|
||||
_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
|
||||
|
||||
// Run the flow
|
||||
if err := flow1.Run(ctx); err != nil {
|
||||
fmt.Println("err: ", err)
|
||||
}
|
||||
|
||||
return flow.Next()
|
||||
return
|
||||
}
|
||||
|
||||
// ... FuncDemo2Handler
|
||||
|
||||
// ... FuncDemo3Handler
|
||||
func init() {
|
||||
// Register functions
|
||||
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
|
||||
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
|
||||
}
|
||||
```
|
||||
|
||||
### Function1
|
||||
|
||||
> faas_stu_score_avg.go
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/aceld/kis-flow/kis"
|
||||
"github.com/aceld/kis-flow/serialize"
|
||||
)
|
||||
|
||||
type AvgStuScoreIn struct {
|
||||
serialize.DefaultSerialize
|
||||
StuId int `json:"stu_id"`
|
||||
Score1 int `json:"score_1"`
|
||||
Score2 int `json:"score_2"`
|
||||
Score3 int `json:"score_3"`
|
||||
}
|
||||
|
||||
type AvgStuScoreOut struct {
|
||||
serialize.DefaultSerialize
|
||||
StuId int `json:"stu_id"`
|
||||
AvgScore float64 `json:"avg_score"`
|
||||
}
|
||||
|
||||
// AvgStuScore(FaaS) 计算学生平均分
|
||||
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
|
||||
for _, row := range rows {
|
||||
|
||||
out := AvgStuScoreOut{
|
||||
StuId: row.StuId,
|
||||
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
|
||||
}
|
||||
|
||||
// 提交结果数据
|
||||
_ = flow.CommitRow(out)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
### Function2
|
||||
> faas_stu_score_avg_print.go
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aceld/kis-flow/kis"
|
||||
"github.com/aceld/kis-flow/serialize"
|
||||
)
|
||||
|
||||
type PrintStuAvgScoreIn struct {
|
||||
serialize.DefaultSerialize
|
||||
StuId int `json:"stu_id"`
|
||||
AvgScore float64 `json:"avg_score"`
|
||||
}
|
||||
|
||||
type PrintStuAvgScoreOut struct {
|
||||
serialize.DefaultSerialize
|
||||
}
|
||||
|
||||
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
|
||||
|
||||
for _, row := range rows {
|
||||
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
### OutPut
|
||||
```bash
|
||||
Add KisPool FuncName=AvgStuScore
|
||||
Add KisPool FuncName=PrintStuAvgScore
|
||||
funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
|
||||
funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
|
||||
stuid: [101], avg score: [90]
|
||||
stuid: [102], avg score: [76.66666666666667]
|
||||
```
|
||||
|
||||
|
||||
</details>
|
||||
|
||||
|
||||
<details>
|
||||
<summary>2. Quick Start With Config(快速开始)</summary>
|
||||
|
||||
### 案例源代码
|
||||
|
||||
https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config
|
||||
|
||||
项目目录
|
||||
```bash
|
||||
├── Makefile
|
||||
├── conf
|
||||
│ ├── flow-CalStuAvgScore.yml
|
||||
│ ├── func-AvgStuScore.yml
|
||||
│ └── func-PrintStuAvgScore.yml
|
||||
├── faas_stu_score_avg.go
|
||||
├── faas_stu_score_avg_print.go
|
||||
└── main.go
|
||||
```
|
||||
|
||||
### Flow
|
||||
<img width="770" alt="image" src="https://github.com/aceld/kis-flow/assets/7778936/3747ed10-aba1-417e-a3c1-c6205a02444b">
|
||||
|
||||
### Config
|
||||
#### (1) Flow Config
|
||||
|
||||
> conf/flow-CalStuAvgScore.yml
|
||||
```yaml
|
||||
kistype: flow
|
||||
status: 1
|
||||
flow_name: CalStuAvgScore
|
||||
flows:
|
||||
- fname: AvgStuScore
|
||||
- fname: PrintStuAvgScore
|
||||
```
|
||||
|
||||
#### (2) Function1 Config
|
||||
|
||||
> conf/func-AvgStuScore.yml
|
||||
|
||||
```yaml
|
||||
kistype: func
|
||||
fname: AvgStuScore
|
||||
fmode: Calculate
|
||||
source:
|
||||
name: 学生学分
|
||||
must:
|
||||
- stu_id
|
||||
```
|
||||
|
||||
#### (3) Function2(Slink) Config
|
||||
|
||||
> conf/func-PrintStuAvgScore.yml
|
||||
|
||||
```yaml
|
||||
kistype: func
|
||||
fname: PrintStuAvgScore
|
||||
fmode: Expand
|
||||
source:
|
||||
name: 学生学分
|
||||
must:
|
||||
- stu_id
|
||||
```
|
||||
|
||||
### Main
|
||||
> main.go
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aceld/kis-flow/file"
|
||||
"github.com/aceld/kis-flow/kis"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
// Load Configuration from file
|
||||
if err := file.ConfigImportYaml("conf/"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Get the flow
|
||||
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
|
||||
if flow1 == nil {
|
||||
panic("flow1 is nil")
|
||||
}
|
||||
|
||||
// Submit a string
|
||||
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
|
||||
// Submit a string
|
||||
_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
|
||||
|
||||
// Run the flow
|
||||
if err := flow1.Run(ctx); err != nil {
|
||||
fmt.Println("err: ", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
```
|
||||
|
||||
### Function1
|
||||
> faas_stu_score_avg.go
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/aceld/kis-flow/kis"
|
||||
"github.com/aceld/kis-flow/serialize"
|
||||
)
|
||||
|
||||
type AvgStuScoreIn struct {
|
||||
serialize.DefaultSerialize
|
||||
StuId int `json:"stu_id"`
|
||||
Score1 int `json:"score_1"`
|
||||
Score2 int `json:"score_2"`
|
||||
Score3 int `json:"score_3"`
|
||||
}
|
||||
|
||||
type AvgStuScoreOut struct {
|
||||
serialize.DefaultSerialize
|
||||
StuId int `json:"stu_id"`
|
||||
AvgScore float64 `json:"avg_score"`
|
||||
}
|
||||
|
||||
// AvgStuScore(FaaS) 计算学生平均分
|
||||
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
|
||||
for _, row := range rows {
|
||||
|
||||
out := AvgStuScoreOut{
|
||||
StuId: row.StuId,
|
||||
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
|
||||
}
|
||||
|
||||
// 提交结果数据
|
||||
_ = flow.CommitRow(out)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
### Function2
|
||||
> faas_stu_score_avg_print.go
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/aceld/kis-flow/kis"
|
||||
"github.com/aceld/kis-flow/serialize"
|
||||
)
|
||||
|
||||
type PrintStuAvgScoreIn struct {
|
||||
serialize.DefaultSerialize
|
||||
StuId int `json:"stu_id"`
|
||||
AvgScore float64 `json:"avg_score"`
|
||||
}
|
||||
|
||||
type PrintStuAvgScoreOut struct {
|
||||
serialize.DefaultSerialize
|
||||
}
|
||||
|
||||
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
|
||||
|
||||
for _, row := range rows {
|
||||
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
### OutPut
|
||||
```bash
|
||||
Add KisPool FuncName=AvgStuScore
|
||||
Add KisPool FuncName=PrintStuAvgScore
|
||||
Add FlowRouter FlowName=CalStuAvgScore
|
||||
stuid: [101], avg score: [90]
|
||||
stuid: [102], avg score: [76.66666666666667]
|
||||
```
|
||||
|
||||
</details>
|
||||
|
||||
|
||||
---
|
||||
|
||||
### 开发者
|
||||
|
||||
* 刘丹冰([@aceld](https://github.com/aceld))
|
||||
|
Loading…
Reference in New Issue
Block a user