From 93ca0194c2bbd9e7f2a3b72acf4e144944d92d51 Mon Sep 17 00:00:00 2001 From: aceld Date: Sat, 30 Dec 2023 17:38:08 +0800 Subject: [PATCH] init kis-flow, kis_config init --- LICENSE | 2 +- common/const.go | 20 +++++++++ example/kisflow_test.go | 29 +++++++++++++ flow/kis_config/kis_func_config.go | 68 ++++++++++++++++++++++++++++++ go.mod | 3 ++ 5 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 common/const.go create mode 100644 example/kisflow_test.go create mode 100644 flow/kis_config/kis_func_config.go create mode 100644 go.mod diff --git a/LICENSE b/LICENSE index b4e14f9..fca1955 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2023 刘丹冰 +Copyright (c) 2023 刘丹冰(Aceld) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/common/const.go b/common/const.go new file mode 100644 index 0000000..737f343 --- /dev/null +++ b/common/const.go @@ -0,0 +1,20 @@ +package common + +type KisMode string + +const ( + // V 为校验特征的KisFunction, 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理 + V KisMode = "Verify" + + // S 为存储特征的KisFunction, S会通过NsConnector进行将数据进行存储,数据的临时声明周期为NsWindow + S KisMode = "Save" + + // L 为加载特征的KisFunction,L会通过KisConnector进行数据加载,通过该Function可以从逻辑上与对应的S Function进行并流 + L KisMode = "Load" + + // C 为计算特征的KisFunction, C会通过KisFlow中的数据计算,生成新的字段,将数据流传递给下游S进行存储,或者自己也已直接通过KisConnector进行存储 + C KisMode = "Calculate" + + // E 为扩展特征的KisFunction,作为流式计算的自定义特征Function,如,Notify 调度器触发任务的消息发送,删除一些数据,重置状态等。 + E KisMode = "Expand" +) diff --git a/example/kisflow_test.go b/example/kisflow_test.go new file mode 100644 index 0000000..6d1ad25 --- /dev/null +++ b/example/kisflow_test.go @@ -0,0 +1,29 @@ +package test + +import ( + "fmt" + "kis-flow/flow/kis_config" + "testing" +) + +func TestNewFuncConfig(t *testing.T) { + source := flow.KisSource{ + Name: "公众号抖音商城户订单数据", + Must: []string{"order_id", "user_id"}, + } + + option := flow.KisFuncOption{ + Cid: "connector_id", + RetryTimes: 3, + RetryDuriton: 300, + + Params: flow.FParam{ + "param1": "value1", + "param2": "value2", + }, + } + + myFunc1 := flow.NewFuncConfig("funcId", "funcName", "Save", &source, &option) + + fmt.Printf("myFunc1: %+v\n", myFunc1) +} diff --git a/flow/kis_config/kis_func_config.go b/flow/kis_config/kis_func_config.go new file mode 100644 index 0000000..3936722 --- /dev/null +++ b/flow/kis_config/kis_func_config.go @@ -0,0 +1,68 @@ +package flow + +import ( + "fmt" + "kis-flow/common" +) + +// FParam 在当前Flow中Function定制固定配置参数类型 +type FParam map[string]string + +// KisSource 表示当前Function的业务源 +type KisSource struct { + Name string `yaml:"name"` //本层Function的数据源描述 + Must []string `yaml:"must"` //source必传字段 +} + +// KisFuncOption 可选配置 +type KisFuncOption struct { + Cid string `yaml:"cid"` + RetryTimes int `yaml:"retry_times"` //选填,Function调度重试(不包括正常调度)最大次数 + RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms) + Params FParam `yaml:"default_params"` //选填,在当前Flow中Function定制固定配置参数 +} + +// KisFuncConfig 一个NsFunction策略配置 +type KisFuncConfig struct { + KisType string `yaml:"kistype"` + Fid string `yaml:"fid"` + Fname string `yaml:"fname"` + Fmode string `yaml:"fmode"` + Source KisSource `yaml:"source"` + Option KisFuncOption `yaml:"option"` +} + +// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息 +func NewFuncConfig( + funcId string, funcName string, mode common.KisMode, + source *KisSource, option *KisFuncOption) *KisFuncConfig { + + config := new(KisFuncConfig) + config.Fid = funcId + config.Fname = funcName + + if source == nil { + fmt.Printf("funcName NewConfig Error, source is nil, funcName = %s\n", funcId) + return nil + } + config.Source = *source + + config.Fmode = string(mode) + + //FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系 + if mode == common.S || mode == common.L { + if option == nil { + fmt.Printf("Funcion S/L need option->Cid\n") + return nil + } else if option.Cid == "" { + fmt.Printf("Funcion S/L need option->Cid\n") + return nil + } + } + + if option != nil { + config.Option = *option + } + + return config +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..51e77c6 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module kis-flow + +go 1.18