diff --git a/README.md b/README.md index cb100a3..07e8b39 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ # kis-flow - -#### KisFlow(Keep It Simple Flowing) +#### KisFlow(Keep It Simple Flowing) 基于Golang的流式计算框架. 为保持简单的流动,强调在进行各种活动或工作时保持简洁、清晰、流畅的过程。 @@ -10,55 +9,47 @@ --- ## KisFlow源代码 + Github Git: https://github.com/aceld/kis-flow Gitee(China) Git: https://gitee.com/Aceld/kis-flow - - ## 《KisFlow开发者文档》 https://www.yuque.com/aceld/kis-flow-doc - ## KisFlow框架开发教程:《基于Golang的流式计算框架实战教程》 https://www.yuque.com/aceld/hsa94o - - - ## KisFlow系统定位 KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游接本业务存储数据中心。
![yuque_diagram (2)](https://github.com/aceld/kis-flow/assets/7778936/b9e1957a-2d11-45d9-84c1-e92c9ac833cc) - + ## KisFlow整体架构图 -| 层级 | 层级说明 | 包括子模块 | -| --- | --- | --- | +| 层级 | 层级说明 | 包括子模块 | +|-------|------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | **KisFlow**:分布式批量消费者,一个KisFlow是由多个KisFunction组合。

**KisConnectors**:计算数据流流中间状态持久存储及连接器。

**KisFunctions**:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。

**KisConfig:** KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。

**KisSource:** 对接ODS的数据源 | -| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。

执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。

**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。

**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 | +| 任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | **任务调度平台可视化**:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。

执行器管理**KisJobs**:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。

**执行器场景KisScenes:** 根据业务划分的逻辑任务集合。

**调度日志及用户管理**:任务调度日志收集、调度详细、调度流程痕迹等。 | ![KisFlow架构图drawio](https://github.com/aceld/kis-flow/assets/7778936/3b829bdb-600d-4ab9-9e62-e14f90737cc3) - - ![KisFlow架构设计-KisFlow整体结构 drawio](https://github.com/aceld/kis-flow/assets/7778936/efc1b29d-9dd4-4945-a35a-fb9a618002d7) - KisFlow是一种流式概念形态,具体表现的特征如下:
1、一个KisFlow可以由任意KisFunction组成,且KisFlow可以动态的调整长度。
2、一个KisFunction可以随时动态的加入到某个KisFlow中,且KisFlow和KisFlow之间的关系可以通过KisFunction的Load和Save节点的加入,进行动态的并流和分流动作。
-3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。 - +3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a +service)体系。 ## Example @@ -71,6 +62,7 @@ https://github.com/aceld/kis-flow-usage 中文:https://www.yuque.com/aceld/kis-flow-doc #### 安装KisFlow + ```bash $go get github.com/aceld/kis-flow ``` @@ -79,6 +71,7 @@ $go get github.com/aceld/kis-flow 1. Quick Start(快速开始) ### 案例源代码 + https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start ### 项目目录 @@ -94,7 +87,9 @@ https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start image ### Main + > main.go + ```go package main @@ -147,6 +142,7 @@ func init() { ### Function1 > faas_stu_score_avg.go + ```go package main @@ -188,7 +184,9 @@ func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) erro ``` ### Function2 + > faas_stu_score_avg_print.go + ```go package main @@ -220,6 +218,7 @@ func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgSco ``` ### OutPut + ```bash Add KisPool FuncName=AvgStuScore Add KisPool FuncName=PrintStuAvgScore @@ -229,18 +228,18 @@ stuid: [101], avg score: [90] stuid: [102], avg score: [76.66666666666667] ``` -
2. Quick Start With Config(快速开始) - + ### 案例源代码 https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config 项目目录 + ```bash ├── Makefile ├── conf @@ -253,19 +252,22 @@ https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config ``` ### Flow + image ### Config + #### (1) Flow Config > conf/flow-CalStuAvgScore.yml + ```yaml kistype: flow status: 1 flow_name: CalStuAvgScore flows: - - fname: AvgStuScore - - fname: PrintStuAvgScore + - fname: AvgStuScore + - fname: PrintStuAvgScore ``` #### (2) Function1 Config @@ -277,9 +279,9 @@ kistype: func fname: AvgStuScore fmode: Calculate source: - name: 学生学分 - must: - - stu_id + name: 学生学分 + must: + - stu_id ``` #### (3) Function2(Slink) Config @@ -291,12 +293,13 @@ kistype: func fname: PrintStuAvgScore fmode: Expand source: - name: 学生学分 - must: - - stu_id + name: 学生学分 + must: + - stu_id ``` ### Main + > main.go ```go @@ -344,7 +347,9 @@ func init() { ``` ### Function1 + > faas_stu_score_avg.go + ```go package main @@ -386,7 +391,9 @@ func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) erro ``` ### Function2 + > faas_stu_score_avg_print.go + ```go package main @@ -418,6 +425,7 @@ func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgSco ``` ### OutPut + ```bash Add KisPool FuncName=AvgStuScore Add KisPool FuncName=PrintStuAvgScore @@ -433,23 +441,21 @@ stuid: [102], avg score: [76.66666666666667] ### 开发者 -* 刘丹冰([@aceld](https://github.com/aceld)) +* 刘丹冰([@aceld](https://github.com/aceld)) * 胡辰豪([@ChenHaoHu](https://github.com/ChenHaoHu)) - Thanks to all the developers who contributed to KisFlow! - ### 加入KisFlow 社区 -| platform | Entry | -| ---- | ---- | -| | https://discord.gg/xQ8Xxfyfcz| -| | 加微信: `ace_ld` 或扫二维码,备注`flow`即可。
| -|| **WeChat Public Account** | -|| **QQ Group** | +| platform | Entry | +|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| | https://discord.gg/xQ8Xxfyfcz | +| | 加微信: `ace_ld` 或扫二维码,备注`flow`即可。
| +| | **WeChat Public Account** | +| | **QQ Group** | diff --git a/kis/pool.go b/kis/pool.go index a86217e..310ba80 100644 --- a/kis/pool.go +++ b/kis/pool.go @@ -100,7 +100,8 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) { // CallFunction 调度 Function func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error { - + pool.fnLock.RLock() // 读锁 + defer pool.fnLock.RUnlock() if funcDesc, ok := pool.fnRouter[fnName]; ok { // 被调度Function的形参列表