init kis-flow, kis_config init

This commit is contained in:
aceld 2023-12-30 17:38:08 +08:00
parent 3eb0c0252e
commit 93ca0194c2
5 changed files with 121 additions and 1 deletions

View File

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2023 刘丹冰
Copyright (c) 2023 刘丹冰(Aceld)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

20
common/const.go Normal file
View File

@ -0,0 +1,20 @@
package common
type KisMode string
const (
// V 为校验特征的KisFunction, 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
V KisMode = "Verify"
// S 为存储特征的KisFunction, S会通过NsConnector进行将数据进行存储数据的临时声明周期为NsWindow
S KisMode = "Save"
// L 为加载特征的KisFunctionL会通过KisConnector进行数据加载通过该Function可以从逻辑上与对应的S Function进行并流
L KisMode = "Load"
// C 为计算特征的KisFunction, C会通过KisFlow中的数据计算生成新的字段将数据流传递给下游S进行存储或者自己也已直接通过KisConnector进行存储
C KisMode = "Calculate"
// E 为扩展特征的KisFunction作为流式计算的自定义特征FunctionNotify 调度器触发任务的消息发送,删除一些数据,重置状态等。
E KisMode = "Expand"
)

29
example/kisflow_test.go Normal file
View File

@ -0,0 +1,29 @@
package test
import (
"fmt"
"kis-flow/flow/kis_config"
"testing"
)
func TestNewFuncConfig(t *testing.T) {
source := flow.KisSource{
Name: "公众号抖音商城户订单数据",
Must: []string{"order_id", "user_id"},
}
option := flow.KisFuncOption{
Cid: "connector_id",
RetryTimes: 3,
RetryDuriton: 300,
Params: flow.FParam{
"param1": "value1",
"param2": "value2",
},
}
myFunc1 := flow.NewFuncConfig("funcId", "funcName", "Save", &source, &option)
fmt.Printf("myFunc1: %+v\n", myFunc1)
}

View File

@ -0,0 +1,68 @@
package flow
import (
"fmt"
"kis-flow/common"
)
// FParam 在当前Flow中Function定制固定配置参数类型
type FParam map[string]string
// KisSource 表示当前Function的业务源
type KisSource struct {
Name string `yaml:"name"` //本层Function的数据源描述
Must []string `yaml:"must"` //source必传字段
}
// KisFuncOption 可选配置
type KisFuncOption struct {
Cid string `yaml:"cid"`
RetryTimes int `yaml:"retry_times"` //选填,Function调度重试(不包括正常调度)最大次数
RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms)
Params FParam `yaml:"default_params"` //选填,在当前Flow中Function定制固定配置参数
}
// KisFuncConfig 一个NsFunction策略配置
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
Fid string `yaml:"fid"`
Fname string `yaml:"fname"`
Fmode string `yaml:"fmode"`
Source KisSource `yaml:"source"`
Option KisFuncOption `yaml:"option"`
}
// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
func NewFuncConfig(
funcId string, funcName string, mode common.KisMode,
source *KisSource, option *KisFuncOption) *KisFuncConfig {
config := new(KisFuncConfig)
config.Fid = funcId
config.Fname = funcName
if source == nil {
fmt.Printf("funcName NewConfig Error, source is nil, funcName = %s\n", funcId)
return nil
}
config.Source = *source
config.Fmode = string(mode)
//FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
if mode == common.S || mode == common.L {
if option == nil {
fmt.Printf("Funcion S/L need option->Cid\n")
return nil
} else if option.Cid == "" {
fmt.Printf("Funcion S/L need option->Cid\n")
return nil
}
}
if option != nil {
config.Option = *option
}
return config
}

3
go.mod Normal file
View File

@ -0,0 +1,3 @@
module kis-flow
go 1.18