Stream processing framework based on Golang.
Go to file
2024-04-07 12:02:02 +08:00
.github Create FUNDING.yml 2024-03-07 15:15:03 +08:00
common update 2024-03-28 14:57:35 +08:00
config update Function Cname When call AddConnConfig() 2024-04-03 19:06:46 +08:00
conn change CaaS Function Type, add return value:interface{} 2024-04-03 12:36:34 +08:00
file fix: use Link Fork dosen't work 2024-03-29 18:07:57 +08:00
flow fix: use Link Fork dosen't work 2024-03-29 18:07:57 +08:00
function change some output Info->Debug 2024-03-26 15:59:48 +08:00
id change module export 2024-03-26 14:54:50 +08:00
kis change CaaS Function Type, add return value:interface{} 2024-04-03 12:36:34 +08:00
log fix: 抽象 slog 修改 2024-04-07 12:02:02 +08:00
metrics change module export 2024-03-26 14:54:50 +08:00
serialize change module export 2024-03-26 14:54:50 +08:00
test change CaaS Function Type, add return value:interface{} 2024-04-03 12:36:34 +08:00
go.mod change some output Info->Debug 2024-03-26 15:48:36 +08:00
LICENSE init kis-flow, kis_config init 2023-12-30 17:38:08 +08:00
README.md Update README.md 2024-04-03 14:22:38 +08:00

License Discord KisFlow-tutorial KisFlow-Doc

KisFlow(Keep It Simple Flowing)

基于Golang的流式计算框架. 为保持简单的流动,强调在进行各种活动或工作时保持简洁、清晰、流畅的过程。

KisFlow源代码

Github Git: https://github.com/aceld/kis-flow

Gitee(China) Git: https://gitee.com/Aceld/kis-flow

《KisFlow开发者文档》

https://www.yuque.com/aceld/kis-flow-doc

KisFlow框架开发教程

《基于Golang的流式计算框架实战教程》

https://www.yuque.com/aceld/hsa94o

KisFlow系统定位

KisFlow为业务上游计算层上层接数仓/其他业务方ODS层、下游接本业务存储数据中心。

KisFlow整体架构图

层级 层级说明 包括子模块
流式计算层 为KisFlow上游计算层直接对接业务存储及数仓ODS层如上游可以为Mysql Binlog、日志、接口数据等为被动消费模式提供KisFlow实时计算能力。 KisFlow:分布式批量消费者,一个KisFlow是由多个KisFunction组合。

KisConnectors:计算数据流流中间状态持久存储及连接器。

KisFunctions支持算子表达式拼接Connectors集成、策略配置、Stateful Function模式、Slink流式拼接等。

KisConfig KisFunction的绑定的流处理策略可以绑定ReSource让Function具有固定的独立流处理能力。

KisSource 对接ODS的数据源
任务调度层 定时任务调度及执行器业务逻辑包括任务调度平台、执行器管理、调度日志及用户管理等。提供KisFlow的定时任务、统计、聚合运算等调度计算能力。 任务调度平台可视化包括任务的运行报表、调度报表、成功比例、任务管理、配置管理、GLUE IDE等可视化管理平台。

执行器管理KisJobsGolang SDK及计算自定义业务逻辑、执行器的自动注册、任务触发、终止及摘除等。

执行器场景KisScenes 根据业务划分的逻辑任务集合。

调度日志及用户管理:任务调度日志收集、调度详细、调度流程痕迹等。

KisFlow架构图drawio

KisFlow架构设计-KisFlow整体结构 drawio

KisFlow是一种流式概念形态具体表现的特征如下

1、一个KisFlow可以由任意KisFunction组成且KisFlow可以动态的调整长度。

2、一个KisFunction可以随时动态的加入到某个KisFlow中且KisFlow和KisFlow之间的关系可以通过KisFunction的Load和Save节点的加入进行动态的并流和分流动作。

3、KisFlow在编程行为上从面向流进行数据业务编程变成了面向KisFunction的函数单计算逻辑的开发接近FaaS(Function as a service)体系。

Example

下面是简单的应用场景案例,具体应用单元用例请 参考

https://github.com/aceld/kis-flow-usage

《KisFlow开发者文档》

https://www.yuque.com/aceld/kis-flow-doc

安装KisFlow

$go get github.com/aceld/kis-flow
1. Quick Start快速开始

案例源代码

https://github.com/aceld/kis-flow-usage/tree/main/1-quick_start

项目目录

├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go

Flow

image

Main

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/common"
	"github.com/aceld/kis-flow/config"
	"github.com/aceld/kis-flow/flow"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Create a new flow configuration
	myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)

	// Create new function configuration
	avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
	printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)

	// Create a new flow
	flow1 := flow.NewKisFlow(myFlowConfig1)

	// Link functions to the flow
	_ = flow1.Link(avgStuScoreConfig, nil)
	_ = flow1.Link(printStuScoreConfig, nil)

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

Function1

faas_stu_score_avg.go

package main

import (
	"context"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {

		out := AvgStuScoreOut{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}

		// 提交结果数据
		_ = flow.CommitRow(out)
	}

	return nil
}

Function2

faas_stu_score_avg_print.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return nil
}

OutPut

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
2. Quick Start With Config快速开始

案例源代码

https://github.com/aceld/kis-flow-usage/tree/main/2-quick_start_with_config

项目目录

├── Makefile
├── conf
│   ├── flow-CalStuAvgScore.yml
│   ├── func-AvgStuScore.yml
│   └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go

Flow

image

Config

(1) Flow Config

conf/flow-CalStuAvgScore.yml

kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
  - fname: AvgStuScore
  - fname: PrintStuAvgScore

(2) Function1 Config

conf/func-AvgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
  name: 学生学分
  must:
    - stu_id

conf/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
  name: 学生学分
  must:
    - stu_id

Main

main.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/file"
	"github.com/aceld/kis-flow/kis"
)

func main() {
	ctx := context.Background()

	// Load Configuration from file
	if err := file.ConfigImportYaml("conf/"); err != nil {
		panic(err)
	}

	// Get the flow
	flow1 := kis.Pool().GetFlow("CalStuAvgScore")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
	// Submit a string
	_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

	// Run the flow
	if err := flow1.Run(ctx); err != nil {
		fmt.Println("err: ", err)
	}

	return
}

func init() {
	// Register functions
	kis.Pool().FaaS("AvgStuScore", AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}

Function1

faas_stu_score_avg.go

package main

import (
	"context"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {

		out := AvgStuScoreOut{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}

		// 提交结果数据
		_ = flow.CommitRow(out)
	}

	return nil
}

Function2

faas_stu_score_avg_print.go

package main

import (
	"context"
	"fmt"
	"github.com/aceld/kis-flow/kis"
	"github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return nil
}

OutPut

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]

开发者

Thanks to all the developers who contributed to KisFlow!

加入KisFlow 社区

platform Entry
https://discord.gg/xQ8Xxfyfcz
加微信: ace_ld 或扫二维码,备注flow即可。
WeChat Public Account
QQ Group