mirror of
https://github.com/aceld/kis-flow.git
synced 2025-01-23 07:30:23 +08:00
Stream processing framework based on Golang.
.github | ||
common | ||
config | ||
conn | ||
file | ||
flow | ||
function | ||
id | ||
kis | ||
log | ||
metrics | ||
test | ||
go.mod | ||
LICENSE | ||
README.md |
kis-flow
KisFlow(Keep It Simple Flowing)
基于Golang的流式计算框架. 为保持简单的流动,强调在进行各种活动或工作时保持简洁、清晰、流畅的过程。
KisFlow源代码
Github Git: https://github.com/aceld/kis-flow
Gitee(China) Git: https://gitee.com/Aceld/kis-flow
开发及教程文档
KisFlow 采用开发教程同步执行
教程文档地址:https://www.yuque.com/aceld/hsa94o
KisFlow系统定位
KisFlow为业务上游计算层,上层接数仓/其他业务方ODS层、下游接本业务存储数据中心。
KisFlow整体架构图
层级 | 层级说明 | 包括子模块 |
---|---|---|
流式计算层 | 为KisFlow上游计算层,直接对接业务存储及数仓ODS层,如上游可以为Mysql Binlog、日志、接口数据等,为被动消费模式,提供KisFlow实时计算能力。 | KisFlow:分布式批量消费者,一个KisFlow是由多个KisFunction组合。 KisConnectors:计算数据流流中间状态持久存储及连接器。 KisFunctions:支持算子表达式拼接,Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。 **KisConfig:**KisFunction的绑定的流处理策略,可以绑定ReSource让Function具有固定的独立流处理能力。 **KisSource:**对接ODS的数据源 |
任务调度层 | 定时任务调度及执行器业务逻辑,包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 | 任务调度平台可视化:包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。 执行器管理KisJobs:Golang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。 **执行器场景KisScenes:**根据业务划分的逻辑任务集合。 调度日志及用户管理:任务调度日志收集、调度详细、调度流程痕迹等。 |
流 | 组成 |
---|---|
KisFlow(1) | KisFunction(V) + KisFunction(S) + KisFunction(C) + KisFunction(E) |
KisFlow(2) | KisFunction(V) + KisFunction(L) + KisFunction(S) + KisFunction(C) + KisFunction(E) |
KisFlow(3) | KisFunction(V) + KisFunction(L) + KisFunction(C) + KisFunction(E) |
通过 KisFunction(S) 和 KisFunction(L)的并流组合关系,各个KisFlow有如下关系:
KisFlow(2) = KisFlow(1) + KisFlow(2)
KisFlow(3) = KisFlow(1) + KisFlow(2) + KisFlow(3)
(1) KisFunction配置
kistype: func
fid: 测试KisFunction_S1
fname: 测试KisFunction_S1
fmode: Save
source:
name: 被校验的测试数据源1-用户订单维度
must:
- userid
- orderid
option:
cid: 测试KisConnector_1
retry_times: 3
retry_duration: 500
default_params:
default1: default1_param
default2: default2_param
(2) KisFlow配置
kistype: flow
flow_id: MyFlow1
status: 1
flow_name: MyFlow1
flows:
- fid: 测试PrintInput
params:
args1: value1
args2: value2
- fid: 测试KisFunction_S1
- fid: 测试PrintInput
params:
args1: value11
args2: value22
default2: newDefault
- fid: 测试PrintInput
- fid: 测试KisFunction_S1
params:
my_user_param1: ffffffxxxxxx
- fid: 测试PrintInput
KisFlow是一种流式概念形态,具体表现的特征如下:
1、一个KisFlow可以由任意KisFunction组成,且KisFlow可以动态的调整长度。
2、一个KisFunction可以随时动态的加入到某个KisFlow中,且KisFlow和KisFlow之间的关系可以通过KisFunction的Load和Save节点的加入,进行动态的并流和分流动作。
3、KisFlow在编程行为上,从面向流进行数据业务编程,变成了面向KisFunction的函数单计算逻辑的开发,接近FaaS(Function as a service)体系。
(3) KisConnector配置
kistype: conn
cid: 测试KisConnector_1
cname: 测试KisConnector_1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: userid_orderid_option
params:
args1: value1
args2: value2
load: null
save:
- 测试KisFunction_S1
(4) KisFlow全局配置
#kistype Global为kisflow的全局配置
kistype: global
#是否启动prometheus监控
prometheus_enable: true
#是否需要kisflow单独启动端口监听
prometheus_listen: true
#prometheus取点监听地址
prometheus_serve: 0.0.0.0:20004
Example
下面是简单的应用场景案例,具体应用单元用例请 参考 https://github.com/aceld/kis-flow/tree/master/test
主流程
import (
"context"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/faas"
"testing"
)
func main() {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/XXX/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
func init() {
kis.Pool().FaaS("funcName1", FuncDemo1Handler)
kis.Pool().FaaS("funcName2", FuncDemo2Handler)
kis.Pool().FaaS("funcName3", FuncDemo3Handler)
}
计算逻辑
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName1Handler ----")
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
// 打印数据
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}
return flow.Next()
}
// ... FuncDemo2Handler
// ... FuncDemo3Handler
开发者
- 刘丹冰(@aceld)
- 胡辰豪(@ChenHaoHu)
Thanks to all the developers who contributed to KisFlow!
加入KisFlow 社区
platform | Entry |
---|---|
https://discord.gg/xQ8Xxfyfcz | |
加微信: ace_ld 或扫二维码,备注flow 即可。 |
|
WeChat Public Account | |
QQ Group |