add golang golint, workerflow

This commit is contained in:
aceld 2024-04-15 17:50:02 +08:00
parent 142622dfc5
commit 955bed4804
70 changed files with 806 additions and 757 deletions

20
.github/workflows/reviewdog.yml vendored Normal file
View File

@ -0,0 +1,20 @@
name: reviewdog
on: [pull_request]
jobs:
golangci-lint:
runs-on: ubuntu-latest
name: runner / golangci-lint
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v3
- name: golangci-lint
uses: reviewdog/action-golangci-lint@v2
with:
github_token: ${{ secrets.github_token }}
# Change reviewdog reporter if you need [github-pr-check,github-check,github-pr-review].
reporter: github-pr-review
# Report all results.
filter_mode: nofilter
# Exit with 1 when it finds at least one finding.
fail_on_error: true
#golangci_lint_flags

28
.golangci.yaml Normal file
View File

@ -0,0 +1,28 @@
run:
timeout: 30m
skip-dirs:
- test
linters:
disable-all: true
enable:
#- unused
- ineffassign
- goimports
- gofmt
- misspell
- unparam
- unconvert
- govet
# - errcheck
- staticcheck
linters-settings:
staticcheck:
go: "1.17"
checks:
- "all"
- "-SA1019"
unused:
go: "1.17"

View File

@ -2,93 +2,100 @@ package common
import "time"
// 用户生成KisId的字符串前缀
// Prefix string for generating KisId by users
const (
KisIdTypeFlow = "flow"
KisIdTypeConnector = "conn"
KisIdTypeFunction = "func"
KisIdTypeGlobal = "global"
KisIdJoinChar = "-"
KisIDTypeFlow = "flow" // KisId type for Flow
KisIDTypeConnector = "conn" // KisId type for Connector
KisIDTypeFunction = "func" // KisId type for Function
KisIDTypeGlobal = "global" // KisId type for Global
KisIDJoinChar = "-" // Joining character for KisId
)
const (
// FunctionIdFirstVirtual 为首结点Function上一层虚拟的Function ID
FunctionIdFirstVirtual = "FunctionIdFirstVirtual"
// FunctionIdLastVirtual 为尾结点Function下一层虚拟的Function ID
FunctionIdLastVirtual = "FunctionIdLastVirtual"
// FunctionIDFirstVirtual is the virtual Function ID for the first node Function
FunctionIDFirstVirtual = "FunctionIDFirstVirtual"
// FunctionIDLastVirtual is the virtual Function ID for the last node Function
FunctionIDLastVirtual = "FunctionIDLastVirtual"
)
// KisMode represents the mode of KisFunction
type KisMode string
const (
// V 为校验特征的KisFunction, 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理
// V is for Verify, which mainly performs data filtering, validation, field sorting, idempotence, etc.
V KisMode = "Verify"
// S 为存储特征的KisFunction, S会通过KisConnector进行将数据进行存储. S Function 会通过KisConnector进行数据存储,具备相同Connector的Function在逻辑上可以进行并流
// S is for Save, S Function will store data through KisConnector. Functions with the same Connector can logically merge.
S KisMode = "Save"
// L 为加载特征的KisFunctionL会通过KisConnector进行数据加载L Function 会通过KisConnector进行数据读取具备相同Connector的Function可以从逻辑上与对应的S Function进行并流
// L is for Load, L Function will load data through KisConnector. Functions with the same Connector can logically merge with corresponding S Function.
L KisMode = "Load"
// C 为计算特征的KisFunction, 可以生成新的字段,计算新的值,进行数据的聚合,分析等
// C is for Calculate, which can generate new fields, calculate new values, and perform data aggregation, analysis, etc.
C KisMode = "Calculate"
// E 为扩展特征的KisFunction作为流式计算的自定义特征Function也同时是KisFlow当前流中的最后一个Function概念类似Sink。
// E is for Expand, which serves as a custom feature Function for stream computing and is also the last Function in the current KisFlow, similar to Sink.
E KisMode = "Expand"
)
/*
是否启动Flow
*/
// KisOnOff Whether to enable the Flow
type KisOnOff int
const (
FlowEnable KisOnOff = 1 // 启动
FlowDisable KisOnOff = 0 // 不启动
// FlowEnable Enabled
FlowEnable KisOnOff = 1
// FlowDisable Disabled
FlowDisable KisOnOff = 0
)
// KisConnType represents the type of KisConnector
type KisConnType string
const (
// REDIS is the type of Redis
REDIS KisConnType = "redis"
// MYSQL is the type of MySQL
MYSQL KisConnType = "mysql"
// KAFKA is the type of Kafka
KAFKA KisConnType = "kafka"
// TIDB is the type of TiDB
TIDB KisConnType = "tidb"
// ES is the type of Elasticsearch
ES KisConnType = "es"
)
// cache
const (
// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
DeFaultFlowCacheCleanUp = 5 // 单位 min
// DefaultExpiration 默认GoCahce时间 ,永久保存
// DeFaultFlowCacheCleanUp is the default cleanup time for Cache in KisFlow's Flow object Cache
DeFaultFlowCacheCleanUp = 5 // unit: min
// DefaultExpiration is the default time for GoCahce, permanent storage
DefaultExpiration time.Duration = 0
)
// metrics
const (
METRICS_ROUTE string = "/metrics"
MetricsRoute string = "/metrics"
LABEL_FLOW_NAME string = "flow_name"
LABEL_FLOW_ID string = "flow_id"
LABEL_FUNCTION_NAME string = "func_name"
LABEL_FUNCTION_MODE string = "func_mode"
LabelFlowName string = "flow_name"
LabelFlowID string = "flow_id"
LabelFunctionName string = "func_name"
LabelFunctionMode string = "func_mode"
COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
CounterKisflowDataTotalName string = "kisflow_data_total"
CounterKisflowDataTotalHelp string = "Total data volume of all KisFlow Flows"
GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
GamgeFlowDataTotalName string = "flow_data_total"
GamgeFlowDataTotalHelp string = "Total data volume of each KisFlow FlowID data stream"
GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
GangeFlowScheCntsName string = "flow_schedule_cnts"
GangeFlowScheCntsHelp string = "Number of times each KisFlow FlowID is scheduled"
GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
GangeFuncScheCntsName string = "func_schedule_cnts"
GangeFuncScheCntsHelp string = "Number of times each KisFlow Function is scheduled"
HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时"
HistogramFunctionDurationName string = "func_run_duration"
HistogramFunctionDurationHelp string = "Function execution time"
HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
HISTOGRAM_FLOW_DURATION_HELP string = "Flow执行耗时"
HistogramFlowDurationName string = "flow_run_duration"
HistogramFlowDurationHelp string = "Flow execution time"
)

View File

@ -1,14 +1,12 @@
package common
// KisRow 一行数据
// KisRow represents a single row of data
type KisRow interface{}
// KisRowArr 一次业务的批量数据
// KisRowArr represents a batch of data for a single business operation
type KisRowArr []KisRow
/*
KisDataMap 当前Flow承载的全部数据
key : 数据所在的Function ID
value: 对应的KisRow
*/
// KisDataMap contains all the data carried by the current Flow
// key : Function ID where the data resides
// value: Corresponding KisRow
type KisDataMap map[string]KisRowArr

View File

@ -1,36 +1,29 @@
package config
import (
"errors"
"fmt"
"github.com/aceld/kis-flow/common"
)
// KisConnConfig KisConnector 策略配置
// KisConnConfig describes the KisConnector strategy configuration
type KisConnConfig struct {
//配置类型
KisType string `yaml:"kistype"`
//唯一描述标识
CName string `yaml:"cname"`
//基础存储媒介地址
AddrString string `yaml:"addrs"`
//存储媒介引擎类型"Mysql" "Redis" "Kafka"等
Type common.KisConnType `yaml:"type"`
//一次存储的标识如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等
Key string `yaml:"key"`
//配置信息中的自定义参数
Params map[string]string `yaml:"params"`
//存储读取所绑定的NsFuncionID
KisType string `yaml:"kistype"` // Configuration type
CName string `yaml:"cname"` // Unique descriptive identifier
AddrString string `yaml:"addrs"` // Base storage medium address
Type common.KisConnType `yaml:"type"` // Storage medium engine type: "Mysql", "Redis", "Kafka", etc.
Key string `yaml:"key"` // Identifier for a single storage: Key name for Redis, Table name for Mysql, Topic name for Kafka, etc.
Params map[string]string `yaml:"params"` // Custom parameters in the configuration information
// NsFuncionID bound to storage reading
Load []string `yaml:"load"`
Save []string `yaml:"save"`
}
// NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig {
// NewConnConfig creates a KisConnector strategy configuration object, used to describe a KisConnector information
func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param map[string]string) *KisConnConfig {
strategy := new(KisConnConfig)
strategy.CName = cName
strategy.AddrString = addr
strategy.Type = t
strategy.Key = key
strategy.Params = param
@ -38,7 +31,7 @@ func NewConnConfig(cName string, addr string, t common.KisConnType, key string,
return strategy
}
// WithFunc Connector与Function进行关系绑定
// WithFunc binds Connector to Function
func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
switch common.KisMode(fConfig.FMode) {
@ -47,7 +40,7 @@ func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error {
case common.L:
cConfig.Load = append(cConfig.Load, fConfig.FName)
default:
return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode))
return fmt.Errorf("Wrong KisMode %s", fConfig.FMode)
}
return nil

View File

@ -2,13 +2,13 @@ package config
import "github.com/aceld/kis-flow/common"
// KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数
// KisFlowFunctionParam represents the Id of a Function and carries fixed configuration parameters in a Flow configuration
type KisFlowFunctionParam struct {
FuncName string `yaml:"fname"` //必须
Params FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数
FuncName string `yaml:"fname"` // Required
Params FParam `yaml:"params"` // Optional, custom fixed configuration parameters for the Function in the current Flow
}
// KisFlowConfig 用户贯穿整条流式计算上下文环境的对象
// KisFlowConfig represents the object that spans the entire stream computing context environment
type KisFlowConfig struct {
KisType string `yaml:"kistype"`
Status int `yaml:"status"`
@ -16,7 +16,7 @@ type KisFlowConfig struct {
Flows []KisFlowFunctionParam `yaml:"flows"`
}
// NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息
// NewFlowConfig creates a Flow strategy configuration object, used to describe a KisFlow information
func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
config := new(KisFlowConfig)
config.FlowName = flowName
@ -27,7 +27,7 @@ func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig {
return config
}
// AppendFunctionConfig 添加一个Function Config 到当前Flow中
// AppendFunctionConfig adds a Function Config to the current Flow
func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) {
fConfig.Flows = append(fConfig.Flows, params)
}

View File

@ -6,24 +6,24 @@ import (
"github.com/aceld/kis-flow/log"
)
// FParam 在当前Flow中Function定制固定配置参数类型
// FParam represents the type for custom fixed configuration parameters for the Function in the current Flow
type FParam map[string]string
// KisSource 表示当前Function的业务源
// KisSource represents the business source of the current Function
type KisSource struct {
Name string `yaml:"name"` // 本层Function的数据源描述
Must []string `yaml:"must"` // source必传字段
Name string `yaml:"name"` // Description of the data source for this layer Function
Must []string `yaml:"must"` // Required fields for the source
}
// KisFuncOption 可选配置
// KisFuncOption represents optional configurations
type KisFuncOption struct {
CName string `yaml:"cname"` // 连接器Connector名称
RetryTimes int `yaml:"retry_times"` // 选填,Function调度重试(不包括正常调度)最大次数
RetryDuration int `yaml:"return_duration"` // 选填,Function调度每次重试最大时间间隔(单位:ms)
Params FParam `yaml:"default_params"` // 选填,在当前Flow中Function定制固定配置参数
CName string `yaml:"cname"` // Connector name
RetryTimes int `yaml:"retry_times"` // Optional, maximum retry times for Function scheduling (excluding normal scheduling)
RetryDuration int `yaml:"return_duration"` // Optional, maximum time interval for each retry in Function scheduling (unit: ms)
Params FParam `yaml:"default_params"` // Optional, custom fixed configuration parameters for the Function in the current Flow
}
// KisFuncConfig 一个KisFunction策略配置
// KisFuncConfig represents a KisFunction strategy configuration
type KisFuncConfig struct {
KisType string `yaml:"kistype"`
FName string `yaml:"fname"`
@ -33,7 +33,7 @@ type KisFuncConfig struct {
connConf *KisConnConfig
}
// NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息
// NewFuncConfig creates a Function strategy configuration object, used to describe a KisFunction information
func NewFuncConfig(
funcName string, mode common.KisMode,
source *KisSource, option *KisFuncOption) *KisFuncConfig {
@ -53,13 +53,13 @@ func NewFuncConfig(
config.FMode = string(mode)
/*
// FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系
// Functions S and L require the KisConnector parameters to be passed as they need to establish streaming relationships through Connector
if mode == common.S || mode == common.L {
if option == nil {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
log.Logger().ErrorF("Function S/L needs option->Cid\n")
return nil
} else if option.CName == "" {
log.Logger().ErrorF("Funcion S/L need option->Cid\n")
log.Logger().ErrorF("Function S/L needs option->Cid\n")
return nil
}
}
@ -72,23 +72,25 @@ func NewFuncConfig(
return config
}
// AddConnConfig WithConn binds Function to Connector
func (fConf *KisFuncConfig) AddConnConfig(cConf *KisConnConfig) error {
if cConf == nil {
return errors.New("KisConnConfig is nil")
}
// Function需要和Connector进行关联
// Function needs to be associated with Connector
fConf.connConf = cConf
// Connector需要和Function进行关联
// Connector needs to be associated with Function
_ = cConf.WithFunc(fConf)
// 更新Function配置中的CName
// Update CName in Function configuration
fConf.Option.CName = cConf.CName
return nil
}
// GetConnConfig gets the Connector configuration
func (fConf *KisFuncConfig) GetConnConfig() (*KisConnConfig, error) {
if fConf.connConf == nil {
return nil, errors.New("KisFuncConfig.connConf not set")

View File

@ -1,15 +1,16 @@
package config
// KisGlobalConfig represents the global configuration for KisFlow
type KisGlobalConfig struct {
//kistype Global为kisflow的全局配置
// KisType Global is the global configuration for kisflow
KisType string `yaml:"kistype"`
//是否启动prometheus监控
// EnableProm indicates whether to start Prometheus monitoring
EnableProm bool `yaml:"prometheus_enable"`
//是否需要kisflow单独启动端口监听
// PrometheusListen indicates whether kisflow needs to start a separate port for listening
PrometheusListen bool `yaml:"prometheus_listen"`
//prometheus取点监听地址
// PrometheusServe is the address for Prometheus scraping
PrometheusServe string `yaml:"prometheus_serve"`
}
// GlobalConfig 默认全局配置,全部均为关闭
// GlobalConfig is the default global configuration, all are set to off
var GlobalConfig = new(KisGlobalConfig)

View File

@ -9,6 +9,7 @@ import (
"sync"
)
// KisConnector represents a KisConnector instance
type KisConnector struct {
// Connector ID
CId string
@ -20,16 +21,16 @@ type KisConnector struct {
// Connector Init
onceInit sync.Once
// KisConnector的自定义临时数据
// KisConnector's custom temporary data
metaData map[string]interface{}
// 管理metaData的读写锁
// Lock for reading and writing metaData
mLock sync.RWMutex
}
// NewKisConnector 根据配置策略创建一个KisConnector
// NewKisConnector creates a KisConnector based on the given configuration
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn := new(KisConnector)
conn.CId = id.KisID(common.KisIdTypeConnector)
conn.CId = id.KisID(common.KisIDTypeConnector)
conn.CName = config.CName
conn.Conf = config
conn.metaData = make(map[string]interface{})
@ -37,11 +38,11 @@ func NewKisConnector(config *config.KisConnConfig) *KisConnector {
return conn
}
// Init 初始化Connector所关联的存储引擎链接等
// Init initializes the connection to the associated storage engine of the Connector
func (conn *KisConnector) Init() error {
var err error
// 一个Connector只能执行初始化业务一次
// The initialization business of a Connector can only be executed once
conn.onceInit.Do(func() {
err = kis.Pool().CallConnInit(conn)
})
@ -49,7 +50,7 @@ func (conn *KisConnector) Init() error {
return err
}
// Call 调用Connector 外挂存储逻辑的读写操作
// Call invokes the read-write operations of the external storage logic through the Connector
func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interface{}) (interface{}, error) {
var result interface{}
var err error
@ -62,19 +63,22 @@ func (conn *KisConnector) Call(ctx context.Context, flow kis.Flow, args interfac
return result, nil
}
// GetName returns the name of the Connector
func (conn *KisConnector) GetName() string {
return conn.CName
}
// GetConfig returns the configuration of the Connector
func (conn *KisConnector) GetConfig() *config.KisConnConfig {
return conn.Conf
}
func (conn *KisConnector) GetId() string {
// GetID returns the ID of the Connector
func (conn *KisConnector) GetID() string {
return conn.CId
}
// GetMetaData 得到当前Connector的临时数据
// GetMetaData gets the temporary data of the current Connector
func (conn *KisConnector) GetMetaData(key string) interface{} {
conn.mLock.RLock()
defer conn.mLock.RUnlock()
@ -87,7 +91,7 @@ func (conn *KisConnector) GetMetaData(key string) interface{} {
return data
}
// SetMetaData 设置当前Connector的临时数据
// SetMetaData sets the temporary data of the current Connector
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
conn.mLock.Lock()
defer conn.mLock.Unlock()

View File

@ -1,7 +1,6 @@
package file
import (
"errors"
"fmt"
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/kis"
@ -10,14 +9,19 @@ import (
"gopkg.in/yaml.v3"
)
// ConfigExportYaml 将flow配置输出且存储本地
func ConfigExportYaml(flow kis.Flow, savaPath string) error {
// ConfigExportYaml exports the flow configuration and saves it locally
func ConfigExportYaml(flow kis.Flow, savePath string) error {
if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
var data []byte
var err error
data, err = yaml.Marshal(flow.GetConfig())
if err != nil {
return err
} else {
}
// flow
err := os.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
err = os.WriteFile(savePath+common.KisIDTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
if err != nil {
return err
}
@ -26,15 +30,16 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
for _, fp := range flow.GetConfig().Flows {
fConf := flow.GetFuncConfigByName(fp.FuncName)
if fConf == nil {
return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
return fmt.Errorf("function name = %s config is nil ", fp.FuncName)
}
if fdata, err := yaml.Marshal(fConf); err != nil {
return err
} else {
if err := os.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
fData, err := yaml.Marshal(fConf)
if err != nil {
return err
}
if err := os.WriteFile(savePath+common.KisIDTypeFunction+"-"+fp.FuncName+".yaml", fData, 0644); err != nil {
return err
}
// Connector
@ -43,13 +48,14 @@ func ConfigExportYaml(flow kis.Flow, savaPath string) error {
if err != nil {
return err
}
if cdata, err := yaml.Marshal(cConf); err != nil {
return err
} else {
if err := os.WriteFile(savaPath+common.KisIdTypeConnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
cdata, err := yaml.Marshal(cConf)
if err != nil {
return err
}
}
if err := os.WriteFile(savePath+common.KisIDTypeConnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
return err
}
}
}

View File

@ -1,18 +1,16 @@
package file
import (
"errors"
"fmt"
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/flow"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/metrics"
"gopkg.in/yaml.v3"
"os"
"path"
"path/filepath"
"gopkg.in/yaml.v3"
)
type allConfig struct {
@ -21,75 +19,75 @@ type allConfig struct {
Conns map[string]*config.KisConnConfig
}
// kisTypeFlowConfigure 解析Flow配置文件yaml格式
// kisTypeFlowConfigure parses Flow configuration file in yaml format
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
flowCfg := new(config.KisFlowConfig)
if ok := yaml.Unmarshal(confData, flowCfg); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
return fmt.Errorf("%s has wrong format kisType = %s", fileName, kisType)
}
// 如果FLow状态为关闭则不做配置加载
// Skip the configuration loading if the Flow status is disabled
if common.KisOnOff(flowCfg.Status) == common.FlowDisable {
return nil
}
if _, ok := all.Flows[flowCfg.FlowName]; ok {
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flowCfg.FlowName))
return fmt.Errorf("%s set repeat flow_id:%s", fileName, flowCfg.FlowName)
}
// 加入配置集合中
// Add to the configuration set
all.Flows[flowCfg.FlowName] = flowCfg
return nil
}
// kisTypeFuncConfigure 解析Function配置文件yaml格式
// kisTypeFuncConfigure parses Function configuration file in yaml format
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
function := new(config.KisFuncConfig)
if ok := yaml.Unmarshal(confData, function); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
return fmt.Errorf("%s has wrong format kisType = %s", fileName, kisType)
}
if _, ok := all.Funcs[function.FName]; ok {
return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
return fmt.Errorf("%s set repeat function_id:%s", fileName, function.FName)
}
// 加入配置集合中
// Add to the configuration set
all.Funcs[function.FName] = function
return nil
}
// kisTypeConnConfigure 解析Connector配置文件yaml格式
// kisTypeConnConfigure parses Connector configuration file in yaml format
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
conn := new(config.KisConnConfig)
if ok := yaml.Unmarshal(confData, conn); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
return fmt.Errorf("%s has wrong format kisType = %s", fileName, kisType)
}
if _, ok := all.Conns[conn.CName]; ok {
return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
return fmt.Errorf("%s set repeat conn_id:%s", fileName, conn.CName)
}
// 加入配置集合中
// Add to the configuration set
all.Conns[conn.CName] = conn
return nil
}
// kisTypeGlobalConfigure 解析Global配置文件yaml格式
// kisTypeGlobalConfigure parses Global configuration file in yaml format
func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
// 全局配置
// Global configuration
if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
return fmt.Errorf("%s is wrong format kisType = %s", fileName, kisType)
}
// 启动Metrics服务
// Start Metrics service
metrics.RunMetrics()
return nil
}
// parseConfigWalkYaml 全盘解析配置文件yaml格式, 讲配置信息解析到allConfig中
// parseConfigWalkYaml recursively parses all configuration files in yaml format and stores the configuration information in allConfig
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
all := new(allConfig)
@ -99,12 +97,12 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
all.Conns = make(map[string]*config.KisConnConfig)
err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
// 校验文件后缀是否合法
// Validate the file extension
if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
return nil
}
// 读取文件内容
// Read file content
confData, err := os.ReadFile(filePath)
if err != nil {
return err
@ -112,31 +110,34 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
confMap := make(map[string]interface{})
// 校验yaml合法性
// Validate yaml format
if err := yaml.Unmarshal(confData, confMap); err != nil {
return err
}
// 判断kisType是否存在
if kisType, ok := confMap["kistype"]; !ok {
return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
} else {
// Check if kisType exists
var kisType interface{}
kisType, ok := confMap["kistype"]
if !ok {
return fmt.Errorf("%s has no field [kistype]", filePath)
}
switch kisType {
case common.KisIdTypeFlow:
case common.KisIDTypeFlow:
return kisTypeFlowConfigure(all, confData, filePath, kisType)
case common.KisIdTypeFunction:
case common.KisIDTypeFunction:
return kisTypeFuncConfigure(all, confData, filePath, kisType)
case common.KisIdTypeConnector:
case common.KisIDTypeConnector:
return kisTypeConnConfigure(all, confData, filePath, kisType)
case common.KisIdTypeGlobal:
case common.KisIDTypeGlobal:
return kisTypeGlobalConfigure(confData, filePath, kisType)
default:
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
}
return fmt.Errorf("%s set wrong kistype %s", filePath, kisType)
}
})
@ -148,17 +149,17 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
}
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
// 加载当前Flow依赖的Function
// Load the Functions that the current Flow depends on
if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
return fmt.Errorf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName)
} else {
// flow add connector
if funcConfig.Option.CName != "" {
// 加载当前Function依赖的Connector
// Load the Connectors that the current Function depends on
if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
return fmt.Errorf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName)
} else {
// Function Config 关联 Connector Config
// Function Config associates with Connector Config
_ = funcConfig.AddConnConfig(connConf)
}
}
@ -172,7 +173,7 @@ func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow,
return nil
}
// ConfigImportYaml 全盘解析配置文件yaml格式
// ConfigImportYaml recursively parses all configuration files in yaml format
func ConfigImportYaml(loadPath string) error {
all, err := parseConfigWalkYaml(loadPath)
@ -182,7 +183,7 @@ func ConfigImportYaml(loadPath string) error {
for flowName, flowConfig := range all.Flows {
// 构建一个Flow
// Build a new Flow
newFlow := flow.NewKisFlow(flowConfig)
for _, fp := range flowConfig.Flows {
@ -191,7 +192,7 @@ func ConfigImportYaml(loadPath string) error {
}
}
// 将flow添加到FlowPool中
// Add the flow to FlowPool
kis.Pool().AddFlow(flowName, newFlow)
}

View File

@ -18,81 +18,81 @@ import (
"github.com/patrickmn/go-cache"
)
// KisFlow 用于贯穿整条流式计算的上下文环境
// KisFlow is used to manage the context environment of the entire streaming computation.
type KisFlow struct {
// 基础信息
Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
Name string // Flow的可读名称
Conf *config.KisFlowConfig // Flow配置策略
// Basic information
Id string // Distributed instance ID of the Flow (used internally by KisFlow to distinguish different instances)
Name string // Readable name of the Flow
Conf *config.KisFlowConfig // Flow configuration policy
// Function列表
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
FlowHead kis.Function // 当前Flow所拥有的Function列表表头
FlowTail kis.Function // 当前Flow所拥有的Function列表表尾
flock sync.RWMutex // 管理链表插入读写的锁
ThisFunction kis.Function // Flow当前正在执行的KisFunction对象
ThisFunctionId string // 当前执行到的Function ID
PrevFunctionId string // 当前执行到的Function 上一层FunctionID
// List of Functions
Funcs map[string]kis.Function // All managed Function objects of the current flow, key: FunctionName
FlowHead kis.Function // Head of the Function list owned by the current Flow
FlowTail kis.Function // Tail of the Function list owned by the current Flow
flock sync.RWMutex // Lock for managing linked list insertion and reading
ThisFunction kis.Function // KisFunction object currently being executed in the Flow
ThisFunctionId string // ID of the Function currently being executed
PrevFunctionId string // ID of the previous layer Function
// Function列表参数
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
fplock sync.RWMutex // 管理funcParams的读写锁
// Function list parameters
funcParams map[string]config.FParam // Custom fixed configuration parameters of the Flow in the current Function, Key: KisID of the function instance, value: FParam
fplock sync.RWMutex // Lock for managing funcParams read and write
// 数据
buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
data common.KisDataMap // 流式计算各个层级的数据源
inPut common.KisRowArr // 当前Function的计算输入数据
abort bool // 是否中断Flow
action kis.Action // 当前Flow所携带的Action动作
// Data
buffer common.KisRowArr // Internal buffer used to temporarily store input byte data, one data is interface{}, multiple data is []interface{} i.e. KisBatch
data common.KisDataMap // Data sources at various levels of the streaming computation
inPut common.KisRowArr // Input data for the current Function computation
abort bool // Whether to abort the Flow
action kis.Action // Action carried by the current Flow
// flow的本地缓存
cache *cache.Cache // Flow流的临时缓存上线文环境
// Local cache of the flow
cache *cache.Cache // Temporary cache context environment of the Flow
// flow的metaData
metaData map[string]interface{} // Flow的自定义临时数据
mLock sync.RWMutex // 管理metaData的读写锁
// metaData of the flow
metaData map[string]interface{} // Custom temporary data of the Flow
mLock sync.RWMutex // Lock for managing metaData read and write
}
// NewKisFlow 创建一个KisFlow.
// NewKisFlow creates a KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// 实例Id
flow.Id = id.KisID(common.KisIdTypeFlow)
// Instance Id
flow.Id = id.KisID(common.KisIDTypeFlow)
// 基础信息
// Basic information
flow.Name = conf.FlowName
flow.Conf = conf
// Function列表
// List of Functions
flow.Funcs = make(map[string]kis.Function)
flow.funcParams = make(map[string]config.FParam)
// 数据data
// Data
flow.data = make(common.KisDataMap)
// 初始化本地缓存
// Initialize local cache
flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)
// 初始化临时数据
// Initialize temporary data
flow.metaData = make(map[string]interface{})
return flow
}
// Fork 得到Flow的一个副本(深拷贝)
// Fork gets a copy (deep copy) of the Flow.
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
cfg := flow.Conf
// 通过之前的配置生成一个新的Flow
// Generate a new Flow based on the previous configuration
newFlow := NewKisFlow(cfg)
for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
// 当前function没有配置Params
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetID()]; !ok {
// The current function has no Params configured
_ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else {
// 当前function有配置Params
// The current function has configured Params
_ = newFlow.AppendNewFunction(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
}
}
@ -103,15 +103,15 @@ func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
return newFlow
}
// Link 将Function链接到Flow中, 同时会将Function的配置参数添加到Flow的配置中
// fConf: 当前Function策略
// fParams: 当前Flow携带的Function动态参数
// Link links the Function to the Flow, and also adds the Function's configuration parameters to the Flow's configuration.
// fConf: Current Function strategy
// fParams: Dynamic parameters carried by the current Flow's Function
func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) error {
// Flow 添加Function
// Add Function to Flow
_ = flow.AppendNewFunction(fConf, fParams)
// FlowConfig 添加Function
// Add Function to FlowConfig
flowFuncParam := config.KisFlowFunctionParam{
FuncName: fConf.FName,
Params: fParams,
@ -121,33 +121,33 @@ func (flow *KisFlow) Link(fConf *config.KisFuncConfig, fParams config.FParam) er
return nil
}
// AppendNewFunction 将一个新的Function追加到到Flow中
// AppendNewFunction appends a new Function to the Flow.
func (flow *KisFlow) AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error {
// 创建Function实例
// Create Function instance
f := function.NewKisFunction(flow, fConf)
if fConf.Option.CName != "" {
// 当前Function有Connector关联需要初始化Connector实例
// The current Function has a Connector association and needs to initialize the Connector instance
// 获取Connector配置
// Get Connector configuration
connConfig, err := fConf.GetConnConfig()
if err != nil {
panic(err)
}
// 创建Connector对象
// Create Connector object
connector := conn.NewKisConnector(connConfig)
// 初始化Connector, 执行Connector Init 方法
// Initialize Connector, execute the Connector Init method
if err = connector.Init(); err != nil {
panic(err)
}
// 关联Function实例和Connector实例关系
// Associate the Function instance with the Connector instance
_ = f.AddConnector(connector)
}
// Flow 添加 Function
// Add Function to Flow
if err := flow.appendFunc(f, fParams); err != nil {
return err
}
@ -155,7 +155,7 @@ func (flow *KisFlow) AppendNewFunction(fConf *config.KisFuncConfig, fParams conf
return nil
}
// appendFunc 将Function添加到Flow中, 链表操作
// appendFunc adds the Function to the Flow, linked list operation
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
if function == nil {
@ -166,7 +166,7 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err
defer flow.flock.Unlock()
if flow.FlowHead == nil {
// 首次添加节点
// First time adding a node
flow.FlowHead = function
flow.FlowTail = function
@ -174,7 +174,7 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err
function.SetP(nil)
} else {
// 将function插入到链表的尾部
// Insert the function at the end of the linked list
function.SetP(flow.FlowTail)
function.SetN(nil)
@ -182,28 +182,28 @@ func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) err
flow.FlowTail = function
}
// 将Function Name 详细Hash对应关系添加到flow对象中
// Add the detailed Function Name-Hash correspondence to the flow object
flow.Funcs[function.GetConfig().FName] = function
// 先添加function 默认携带的Params参数
// First add the Params parameters carried by the function by default
params := make(config.FParam)
for key, value := range function.GetConfig().Option.Params {
params[key] = value
}
// 再添加flow携带的function定义参数(重复即覆盖)
// Then add the function definition parameters carried by the flow (overwriting duplicates)
for key, value := range fParam {
params[key] = value
}
// 将得到的FParams存留在flow结构体中用来function业务直接通过Hash获取
// key 为当前Function的KisId不用Fid的原因是为了防止一个Flow添加两个相同策略Id的Function
flow.funcParams[function.GetId()] = params
// Store the obtained FParams in the flow structure for direct access by the function
// The key is the KisId of the current Function, not using Fid to prevent adding two Functions with the same strategy Id to a Flow
flow.funcParams[function.GetID()] = params
return nil
}
// Run 启动KisFlow的流式计算, 从起始Function开始执行流
// Run starts the streaming computation of KisFlow, starting from the initial Function.
func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
@ -212,7 +212,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
flow.abort = false
if flow.Conf.Status == int(common.FlowDisable) {
// flow被配置关闭
// Flow is configured to be disabled
return nil
}
@ -220,27 +220,27 @@ func (flow *KisFlow) Run(ctx context.Context) error {
var funcStart time.Time
var flowStart time.Time
// 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
flow.PrevFunctionId = common.FunctionIdFirstVirtual
// Since no Function has been executed at this time, PrevFunctionId is FirstVirtual because there is no previous layer Function
flow.PrevFunctionId = common.FunctionIDFirstVirtual
// 提交数据流原始数据
// Commit the original data stream
if err := flow.commitSrcData(ctx); err != nil {
return err
}
// Metrics
if config.GlobalConfig.EnableProm == true {
// 统计Flow的调度次数
// Count the number of Flow schedules
metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
// 统计Flow的执行消耗时长
// Count the execution time of Flow
flowStart = time.Now()
}
// 流式链式调用
// Streaming chain call
for fn != nil && flow.abort == false {
// flow记录当前执行到的Function 标记
fid := fn.GetId()
// Record the current Function being executed by the flow
fid := fn.GetID()
flow.ThisFunction = fn
flow.ThisFunctionId = fid
@ -248,14 +248,14 @@ func (flow *KisFlow) Run(ctx context.Context) error {
fMode := fn.GetConfig().FMode
if config.GlobalConfig.EnableProm == true {
// 统计Function调度次数
// Count the number of Function schedules
metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
// 统计Function 耗时 记录开始时间
// Count the time consumed by Function, record the start time
funcStart = time.Now()
}
// 得到当前Function要处理与的源数据
// Get the source data that the current Function needs to process
if inputData, err := flow.getCurData(); err != nil {
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
return err
@ -273,16 +273,16 @@ func (flow *KisFlow) Run(ctx context.Context) error {
return err
}
// 统计Function 耗时
// Count the time consumed by Function
if config.GlobalConfig.EnableProm == true {
// Function消耗时间
// Function consumption time
duration := time.Since(funcStart)
// 统计当前Function统计指标,做时间统计
// Count the current Function metrics, do time statistics
metrics.Metrics.FunctionDuration.With(
prometheus.Labels{
common.LABEL_FUNCTION_NAME: fName,
common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000)
common.LabelFunctionName: fName,
common.LabelFunctionMode: fMode}).Observe(duration.Seconds() * 1000)
}
}
@ -290,7 +290,7 @@ func (flow *KisFlow) Run(ctx context.Context) error {
// Metrics
if config.GlobalConfig.EnableProm == true {
// 统计Flow执行耗时
// Count the execution time of Flow
duration := time.Since(flowStart)
metrics.Metrics.FlowDuration.WithLabelValues(flow.Name).Observe(duration.Seconds() * 1000)
}
@ -298,10 +298,10 @@ func (flow *KisFlow) Run(ctx context.Context) error {
return nil
}
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
// Next the current Flow enters the Action action carried by the next layer Function.
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
// 加载Function FaaS 传递的 Action动作
// Load the Action actions carried by Function FaaS
flow.action = kis.LoadActions(acts)
return nil
@ -311,7 +311,7 @@ func (flow *KisFlow) GetName() string {
return flow.Name
}
func (flow *KisFlow) GetId() string {
func (flow *KisFlow) GetID() string {
return flow.Id
}
@ -323,7 +323,7 @@ func (flow *KisFlow) GetThisFuncConf() *config.KisFuncConfig {
return flow.ThisFunction.GetConfig()
}
// GetConnector 得到当前正在执行的Function的Connector
// GetConnector gets the Connector of the Function currently being executed by the Flow.
func (flow *KisFlow) GetConnector() (kis.Connector, error) {
if connector := flow.ThisFunction.GetConnector(); connector != nil {
return connector, nil
@ -332,7 +332,7 @@ func (flow *KisFlow) GetConnector() (kis.Connector, error) {
}
}
// GetConnConf 得到当前正在执行的Function的Connector的配置
// GetConnConf gets the Connector configuration of the Function currently being executed by the Flow.
func (flow *KisFlow) GetConnConf() (*config.KisConnConfig, error) {
if connector := flow.ThisFunction.GetConnector(); connector != nil {
return connector.GetConfig(), nil
@ -345,7 +345,7 @@ func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
return flow.Conf
}
// GetFuncConfigByName 得到当前Flow的配置
// GetFuncConfigByName gets the configuration of the current Flow by Function name.
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
if f, ok := flow.Funcs[funcName]; ok {
return f.GetConfig()

View File

@ -7,7 +7,7 @@ import (
"github.com/aceld/kis-flow/kis"
)
// dealAction 处理Action决定接下来Flow的流程走向
// dealAction handles Action to determine the next direction of the Flow.
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// DataReuse Action
@ -32,31 +32,31 @@ func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Funct
// JumpFunc Action
if flow.action.JumpFunc != "" {
if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
//当前JumpFunc不在flow中
// The current JumpFunc is not in the flow
return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
}
jumpFunction := flow.Funcs[flow.action.JumpFunc]
// 更新上层Function
// Update the upper layer Function
flow.PrevFunctionId = jumpFunction.GetPrevId()
fn = jumpFunction
// 如果设置跳跃,强制跳跃
// If set to jump, force the jump
flow.abort = false
} else {
// 更新上一层 FuncitonId 游标
// Update the upper layer FunctionId cursor
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
}
// Abort Action 强制终止
// Abort Action force termination
if flow.action.Abort {
flow.abort = true
}
// 清空Action
// Clear Action
flow.action = kis.Action{}
return fn, nil

View File

@ -13,7 +13,7 @@ import (
"time"
)
// CommitRow 提交Flow数据, 一行数据,如果是批量数据可以提交多次
// CommitRow submits a single row of data to the Flow; multiple rows can be submitted multiple times
func (flow *KisFlow) CommitRow(row interface{}) error {
flow.buffer = append(flow.buffer, row)
@ -21,7 +21,7 @@ func (flow *KisFlow) CommitRow(row interface{}) error {
return nil
}
// CommitRowBatch 提交Flow数据, 批量数据
// CommitRowBatch submits a batch of data to the Flow
func (flow *KisFlow) CommitRowBatch(rows interface{}) error {
v := reflect.ValueOf(rows)
if v.Kind() != reflect.Slice {
@ -36,17 +36,17 @@ func (flow *KisFlow) CommitRowBatch(rows interface{}) error {
return nil
}
// Input 得到flow当前执行Function的输入源数据
// Input gets the input data for the currently executing Function in the Flow
func (flow *KisFlow) Input() common.KisRowArr {
return flow.inPut
}
// commitSrcData 提交当前Flow的数据源数据, 表示首次提交当前Flow的原始数据源
// 将flow的临时数据buffer提交到flow的data中,(data为各个Function层级的源数据备份)
// 会清空之前所有的flow数据
// commitSrcData submits the data source data for the current Flow, indicating the first submission of the original data source for the current Flow
// The flow's temporary data buffer is submitted to the flow's data (data is the source data backup for each Function level)
// All previous flow data will be cleared
func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// 制作批量数据batch
// Create a batch of data
dataCnt := len(flow.buffer)
batch := make(common.KisRowArr, 0, dataCnt)
@ -54,22 +54,22 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error {
batch = append(batch, row)
}
// 清空之前所有数据
// Clear all previous data
flow.clearData(flow.data)
// 首次提交记录flow原始数据
// 因为首次提交所以PrevFunctionId为FirstVirtual 因为没有上一层Function
flow.data[common.FunctionIdFirstVirtual] = batch
// Record the original data for the flow for the first time
// Because it is the first submission, PrevFunctionId is FirstVirtual because there is no upper Function
flow.data[common.FunctionIDFirstVirtual] = batch
// 清空缓冲Buf
// Clear the buffer
flow.buffer = flow.buffer[0:0]
// 首次提交数据源数据,进行统计数据总量
// The first submission of data source data, for statistical total data
if config.GlobalConfig.EnableProm == true {
// 统计数据总量 Metrics.DataTota 指标累计加1
// Statistics for total data Metrics.DataTotal accumulates by 1
metrics.Metrics.DataTotal.Add(float64(dataCnt))
//统计当前Flow数量指标
// Statistics for current Flow quantity index
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
}
@ -78,7 +78,7 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error {
return nil
}
// getCurData 获取flow当前Function层级的输入数据
// getCurData gets the input data for the current Function level of the flow
func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
if flow.PrevFunctionId == "" {
return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set"))
@ -94,16 +94,16 @@ func (flow *KisFlow) getCurData() (common.KisRowArr, error) {
// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
// 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
// Check if there are result data from the upper layer; if not, exit the current Flow Run loop
if len(flow.data[flow.PrevFunctionId]) == 0 {
flow.abort = true
return nil
}
// 本层结果数据等于上层结果数据(复用上层结果数据到本层)
// This layer's result data is equal to the upper layer's result data (reuse the upper layer's result data to this layer)
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
// 清空缓冲Buf (如果是ReuseData选项那么提交的全部数据都将不会携带到下一层)
// Clear the buffer (If it is a ReuseData option, all the submitted data will not be carried to the next layer)
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
@ -116,10 +116,10 @@ func (flow *KisFlow) commitVoidData(ctx context.Context) error {
return nil
}
// 制作空数据
// Create empty data
batch := make(common.KisRowArr, 0)
// 将本层计算的缓冲数据提交到本层结果数据中
// Submit the calculated buffer data of this layer to the result data of this layer
flow.data[flow.ThisFunctionId] = batch
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
@ -127,27 +127,27 @@ func (flow *KisFlow) commitVoidData(ctx context.Context) error {
return nil
}
//commitCurData 提交Flow当前执行Function的结果数据
// commitCurData submits the result data of the currently executing Function in the Flow
func (flow *KisFlow) commitCurData(ctx context.Context) error {
// 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
// Check if this layer's calculation has result data; if not, exit the current Flow Run loop
if len(flow.buffer) == 0 {
flow.abort = true
return nil
}
// 制作批量数据batch
// Create a batch of data
batch := make(common.KisRowArr, 0, len(flow.buffer))
// 如果strBuf为空则没有添加任何数据
// If strBuf is empty, no data has been added
for _, row := range flow.buffer {
batch = append(batch, row)
}
// 将本层计算的缓冲数据提交到本层结果数据中
// Submit the calculated buffer data of this layer to the result data of this layer
flow.data[flow.ThisFunctionId] = batch
// 清空缓冲Buf
// Clear the buffer
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
@ -155,7 +155,7 @@ func (flow *KisFlow) commitCurData(ctx context.Context) error {
return nil
}
// clearData 清空flow所有数据
// clearData clears all flow data
func (flow *KisFlow) clearData(data common.KisDataMap) {
for k := range data {
delete(data, k)
@ -179,7 +179,7 @@ func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Durati
}
}
// GetMetaData 得到当前Flow对象的临时数据
// GetMetaData gets the temporary data of the current Flow object
func (flow *KisFlow) GetMetaData(key string) interface{} {
flow.mLock.RLock()
defer flow.mLock.RUnlock()
@ -192,7 +192,7 @@ func (flow *KisFlow) GetMetaData(key string) interface{} {
return data
}
// SetMetaData 设置当前Flow对象的临时数据
// SetMetaData sets the temporary data of the current Flow object
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.mLock.Lock()
defer flow.mLock.Unlock()
@ -200,7 +200,7 @@ func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.metaData[key] = value
}
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数取出一对key-value
// GetFuncParam gets the default configuration parameters of the currently executing Function in the Flow, retrieves a key-value pair
func (flow *KisFlow) GetFuncParam(key string) string {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
@ -214,7 +214,7 @@ func (flow *KisFlow) GetFuncParam(key string) string {
return ""
}
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数取出全部Key-Value
// GetFuncParamAll gets the default configuration parameters of the currently executing Function in the Flow, retrieves all Key-Value pairs
func (flow *KisFlow) GetFuncParamAll() config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
@ -227,7 +227,7 @@ func (flow *KisFlow) GetFuncParamAll() config.FParam {
return param
}
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams取出全部Key-Value
// GetFuncParamsAllFuncs gets the FuncParams of all Functions in the Flow, retrieves all Key-Value pairs
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()

View File

@ -11,28 +11,29 @@ import (
)
type BaseFunction struct {
// Id , KisFunction的实例ID用于KisFlow内部区分不同的实例对象
// Id, the instance ID of KisFunction, used to differentiate different instance objects within KisFlow
Id string
Config *config.KisFuncConfig
// flow
flow kis.Flow //上下文环境KisFlow
flow kis.Flow // Context environment KisFlow
// connector
connector kis.Connector
// Function的自定义临时数据
// Custom temporary data of Function
metaData map[string]interface{}
// 管理metaData的读写锁
// Manage the read-write lock of metaData
mLock sync.RWMutex
// link
N kis.Function //下一个流计算Function
P kis.Function //上一个流计算Function
N kis.Function // Next flow computing Function
P kis.Function // Previous flow computing Function
}
// Call
// BaseFunction 为空实现目的为了让其他具体类型的KisFunction如KisFunction_V 来继承BaseFuncion来重写此方法
// BaseFunction is an empty implementation, designed to allow other specific types of KisFunction,
// such as KisFunction_V, to inherit BaseFuncion and override this method
func (base *BaseFunction) Call(ctx context.Context, flow kis.Flow) error { return nil }
func (base *BaseFunction) Next() kis.Function {
@ -61,24 +62,24 @@ func (base *BaseFunction) SetConfig(s *config.KisFuncConfig) error {
return nil
}
func (base *BaseFunction) GetId() string {
func (base *BaseFunction) GetID() string {
return base.Id
}
func (base *BaseFunction) GetPrevId() string {
if base.P == nil {
//Function为首结点
return common.FunctionIdFirstVirtual
// Function is the first node
return common.FunctionIDFirstVirtual
}
return base.P.GetId()
return base.P.GetID()
}
func (base *BaseFunction) GetNextId() string {
if base.N == nil {
//Function为尾结点
return common.FunctionIdLastVirtual
// Function is the last node
return common.FunctionIDLastVirtual
}
return base.N.GetId()
return base.N.GetID()
}
func (base *BaseFunction) GetConfig() *config.KisFuncConfig {
@ -97,7 +98,7 @@ func (base *BaseFunction) GetFlow() kis.Flow {
return base.flow
}
// AddConnector 给当前Function实例添加一个Connector
// AddConnector adds a Connector to the current Function instance
func (base *BaseFunction) AddConnector(conn kis.Connector) error {
if conn == nil {
return errors.New("conn is nil")
@ -108,22 +109,22 @@ func (base *BaseFunction) AddConnector(conn kis.Connector) error {
return nil
}
// GetConnector 获取当前Function实例所关联的Connector
// GetConnector gets the Connector associated with the current Function instance
func (base *BaseFunction) GetConnector() kis.Connector {
return base.connector
}
func (base *BaseFunction) CreateId() {
base.Id = id.KisID(common.KisIdTypeFunction)
base.Id = id.KisID(common.KisIDTypeFunction)
}
// NewKisFunction 创建一个NsFunction
// flow: 当前所属的flow实例
// s : 当前function的配置策略
// NewKisFunction creates a new NsFunction
// flow: the current belonging flow instance
// s: the configuration strategy of the current function
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
var f kis.Function
//工厂生产泛化对象
// Factory produces generic objects
switch common.KisMode(config.FMode) {
case common.V:
f = NewKisFunctionV()
@ -136,19 +137,19 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
case common.E:
f = NewKisFunctionE()
default:
//LOG ERROR
// LOG ERROR
return nil
}
// 生成随机实例唯一ID
// Generate a random unique instance ID
f.CreateId()
// 设置基础信息属性
// Set basic information attributes
if err := f.SetConfig(config); err != nil {
panic(err)
}
// 设置Flow
// Set Flow
if err := f.SetFlow(flow); err != nil {
panic(err)
}
@ -156,7 +157,7 @@ func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
return f
}
// GetMetaData 得到当前Function的临时数据
// GetMetaData gets the temporary data of the current Function
func (base *BaseFunction) GetMetaData(key string) interface{} {
base.mLock.RLock()
defer base.mLock.RUnlock()
@ -169,7 +170,7 @@ func (base *BaseFunction) GetMetaData(key string) interface{} {
return data
}
// SetMetaData 设置当前Function的临时数据
// SetMetaData sets the temporary data of the current Function
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
base.mLock.Lock()
defer base.mLock.Unlock()

View File

@ -13,7 +13,7 @@ type KisFunctionC struct {
func NewKisFunctionC() kis.Function {
f := new(KisFunctionC)
// 初始化metaData
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
@ -22,7 +22,7 @@ func NewKisFunctionC() kis.Function {
func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().DebugF("KisFunctionC, flow = %+v\n", flow)
// 通过KisPool 路由到具体的执行计算Function中
// Route to the specific computing Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err

View File

@ -13,7 +13,7 @@ type KisFunctionE struct {
func NewKisFunctionE() kis.Function {
f := new(KisFunctionE)
// 初始化metaData
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
@ -22,7 +22,7 @@ func NewKisFunctionE() kis.Function {
func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().DebugF("KisFunctionE, flow = %+v\n", flow)
// 通过KisPool 路由到具体的执行计算Function中
// Route to the specific computing Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err

View File

@ -13,7 +13,7 @@ type KisFunctionL struct {
func NewKisFunctionL() kis.Function {
f := new(KisFunctionL)
// 初始化metaData
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
@ -22,7 +22,7 @@ func NewKisFunctionL() kis.Function {
func (f *KisFunctionL) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().DebugF("KisFunctionL, flow = %+v\n", flow)
// 通过KisPool 路由到具体的执行计算Function中
// Route to the specific computing Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err

View File

@ -13,7 +13,7 @@ type KisFunctionS struct {
func NewKisFunctionS() kis.Function {
f := new(KisFunctionS)
// 初始化metaData
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
@ -22,7 +22,7 @@ func NewKisFunctionS() kis.Function {
func (f *KisFunctionS) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().DebugF("KisFunctionS, flow = %+v\n", flow)
// 通过KisPool 路由到具体的执行计算Function中
// Route to the specific computing Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err

View File

@ -13,7 +13,7 @@ type KisFunctionV struct {
func NewKisFunctionV() kis.Function {
f := new(KisFunctionV)
// 初始化metaData
// Initialize metaData
f.metaData = make(map[string]interface{})
return f
@ -22,7 +22,7 @@ func NewKisFunctionV() kis.Function {
func (f *KisFunctionV) Call(ctx context.Context, flow kis.Flow) error {
log.Logger().DebugF("KisFunctionV, flow = %+v\n", flow)
// 通过KisPool 路由到具体的执行计算Function中
// Route to the specific computing Function through KisPool
if err := kis.Pool().CallFunction(ctx, f.Config.FName, flow); err != nil {
log.Logger().ErrorFX(ctx, "Function Called Error err = %s\n", err)
return err

2
go.mod
View File

@ -17,6 +17,6 @@ require (
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/sys v0.19.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)

View File

@ -6,12 +6,12 @@ import (
"strings"
)
// KisID 获取一个中随机实例ID
// 格式为 "prefix1-[prefix2-][prefix3-]ID"
// 如:flow-1234567890
// 如:func-1234567890
// : conn-1234567890
// : func-1-1234567890
// KisID generates a random instance ID.
// The format is "prefix1-[prefix2-][prefix3-]ID"
// Example: flow-1234567890
// Example: func-1234567890
// Example: conn-1234567890
// Example: func-1-1234567890
func KisID(prefix ...string) (kisId string) {
idStr := strings.Replace(uuid.New().String(), "-", "", -1)
@ -25,7 +25,7 @@ func formatKisID(idStr string, prefix ...string) string {
for _, fix := range prefix {
kisId += fix
kisId += common.KisIdJoinChar
kisId += common.KisIDJoinChar
}
kisId += idStr

View File

@ -1,25 +1,27 @@
package kis
// Action KisFlow执行流程Actions
// Action defines the actions for KisFlow execution.
type Action struct {
// DataReuse 是否复用上层Function数据
// DataReuse indicates whether to reuse data from the upper Function.
DataReuse bool
// 默认Next()为如果本层Function计算结果为0条数据之后Function将不会继续执行
// ForceEntryNext 为忽略上述默认规则没有数据强制进入下一层Function
// ForceEntryNext overrides the default rule, where if the current Function's calculation result is 0 data entries,
// subsequent Functions will not continue execution.
// With ForceEntryNext set to true, the next Function will be entered regardless of the data.
ForceEntryNext bool
// JumpFunc 跳转到指定Function继续执行
// JumpFunc specifies the Function to jump to for continued execution.
// (Note: This can easily lead to Flow loop calls, causing an infinite loop.)
JumpFunc string
// Abort 终止Flow的执行
// Abort terminates the execution of the Flow.
Abort bool
}
// ActionFunc KisFlow Functional Option 类型
// ActionFunc is the type for KisFlow Functional Option.
type ActionFunc func(ops *Action)
// LoadActions 加载Actions依次执行ActionFunc操作函数
// LoadActions loads Actions and sequentially executes the ActionFunc operations.
func LoadActions(acts []ActionFunc) Action {
action := Action{}
@ -34,25 +36,25 @@ func LoadActions(acts []ActionFunc) Action {
return action
}
// ActionDataReuse Next复用上层Function数据Option
// ActionDataReuse sets the option for reusing data from the upper Function.
func ActionDataReuse(act *Action) {
act.DataReuse = true
}
// ActionForceEntryNext 强制进入下一层
// ActionForceEntryNext sets the option to forcefully enter the next layer.
func ActionForceEntryNext(act *Action) {
act.ForceEntryNext = true
}
// ActionJumpFunc 会返回一个ActionFunc函数并且会将funcName赋值给Action.JumpFunc
// (注意容易出现Flow循环调用导致死循环)
// ActionJumpFunc returns an ActionFunc function and sets the funcName to Action.JumpFunc.
// (Note: This can easily lead to Flow loop calls, causing an infinite loop.)
func ActionJumpFunc(funcName string) ActionFunc {
return func(act *Action) {
act.JumpFunc = funcName
}
}
// ActionAbort 终止Flow的执行
// ActionAbort terminates the execution of the Flow.
func ActionAbort(action *Action) {
action.Abort = true
}

View File

@ -5,19 +5,20 @@ import (
"github.com/aceld/kis-flow/config"
)
// Connector defines the interface for connectors associated with external storage.
type Connector interface {
// Init 初始化Connector所关联的存储引擎链接等
// Init initializes the connection to the storage engine associated with the Connector.
Init() error
// Call 调用Connector 外挂存储逻辑的读写操作
// Call invokes the read-write operations of the external storage logic.
Call(ctx context.Context, flow Flow, args interface{}) (interface{}, error)
// GetId 获取Connector的ID
GetId() string
// GetName 获取Connector的名称
// GetID returns the ID of the Connector.
GetID() string
// GetName returns the name of the Connector.
GetName() string
// GetConfig 获取Connector的配置信息
// GetConfig returns the configuration information of the Connector.
GetConfig() *config.KisConnConfig
// GetMetaData 得到当前Connector的临时数据
// GetMetaData gets the temporary data of the current Connector.
GetMetaData(key string) interface{}
// SetMetaData 设置当前Connector的临时数据
// SetMetaData sets the temporary data of the current Connector.
SetMetaData(key string, value interface{})
}

View File

@ -10,113 +10,111 @@ import (
// FaaS Function as a Service
//
// Change the type definition from:
// type FaaS func(context.Context, Flow) error
// 改为
// to:
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通过可变参数的任意输入类型进行数据传递
// This allows passing data through variadic parameters of any type.
type FaaS interface{}
// FaaSDesc FaaS 回调计算业务函数 描述
// FaaSDesc describes the FaaS callback computation function.
type FaaSDesc struct {
Serialize // 当前Function的数据输入输出序列化实现
FnName string // Function名称
f interface{} // FaaS 函数
fName string // 函数名称
ArgsType []reflect.Type // 函数参数类型(集合)
ArgNum int // 函数参数个数
FuncType reflect.Type // 函数类型
FuncValue reflect.Value // 函数值(函数地址)
Serialize // Serialization implementation for the current Function's data input and output
FnName string // Function name
f interface{} // FaaS function
fName string // Function name
ArgsType []reflect.Type // Function parameter types (collection)
ArgNum int // Number of function parameters
FuncType reflect.Type // Function type
FuncValue reflect.Value // Function value (function address)
}
// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
// NewFaaSDesc creates an instance of FaaSDesc description based on the registered FnName and FaaS callback function.
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
// 输入输出序列化实例
// Serialization instance
var serializeImpl Serialize
// 传入的回调函数FaaS,函数值(函数地址)
// Callback function value (function address)
funcValue := reflect.ValueOf(f)
// 传入的回调函数FaaS 类型
// Callback function type
funcType := funcValue.Type()
// 判断传递的FaaS指针是否是函数类型
// Check if the provided FaaS pointer is a function type
if !isFuncType(funcType) {
return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
}
// 判断传递的FaaS函数是否有返回值类型是只包括(error)
// Check if the FaaS function has a return value that only includes (error)
if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
return nil, errors.New("function must have exactly one return value of type error")
}
// FaaS函数的参数类型集合
// FaaS function parameter types
argsType := make([]reflect.Type, funcType.NumIn())
// 获取FaaS的函数名称
// Get the FaaS function name
fullName := runtime.FuncForPC(funcValue.Pointer()).Name()
// 确保 FaaS func(context.Context, Flow, ...interface{}) error 形参列表存在context.Context 和 kis.Flow
// 是否包含kis.Flow类型的形参
// Ensure that the FaaS function parameter list contains context.Context and kis.Flow
// Check if the function contains a parameter of type kis.Flow
containsKisFlow := false
// 是否包含context.Context类型的形参
// Check if the function contains a parameter of type context.Context
containsCtx := false
// 遍历FaaS的形参类型
// Iterate over the FaaS function parameter types
for i := 0; i < funcType.NumIn(); i++ {
// 取出第i个形式参数类型
// Get the i-th formal parameter type
paramType := funcType.In(i)
if isFlowType(paramType) {
// 判断是否包含kis.Flow类型的形参
// Check if the function contains a parameter of type kis.Flow
containsKisFlow = true
} else if isContextType(paramType) {
// 判断是否包含context.Context类型的形参
// Check if the function contains a parameter of type context.Context
containsCtx = true
} else if isSliceType(paramType) {
// 获取当前参数Slice的元素类型
// Get the element type of the current parameter Slice
itemType := paramType.Elem()
// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
// If the current parameter is a pointer type, get the struct type that the pointer points to
if itemType.Kind() == reflect.Ptr {
itemType = itemType.Elem() // 获取指针指向的结构体类型
itemType = itemType.Elem() // Get the struct type that the pointer points to
}
// Check if f implements Serialize interface
// (检测传递的FaaS函数是否实现了Serialize接口)
if isSerialize(itemType) {
// 如果当前形参实现了Serialize接口则使用当前形参的序列化实现
// If the current parameter implements the Serialize interface, use the serialization implementation of the current parameter
serializeImpl = reflect.New(itemType).Interface().(Serialize)
} else {
// 如果当前形参没有实现Serialize接口则使用默认的序列化实现
// If the current parameter does not implement the Serialize interface, use the default serialization implementation
serializeImpl = defaultSerialize // Use global default implementation
}
} else {
// Other types are not supported
}
// 将当前形参类型追加到argsType集合中
// Append the current parameter type to the argsType collection
argsType[i] = paramType
}
if !containsKisFlow {
// 不包含kis.Flow类型的形参返回错误
// If the function parameter list does not contain a parameter of type kis.Flow, return an error
return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
if !containsCtx {
// 不包含context.Context类型的形参返回错误
// If the function parameter list does not contain a parameter of type context.Context, return an error
return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
}
// 返回FaaSDesc描述实例
// Return the FaaSDesc description instance
return &FaaSDesc{
Serialize: serializeImpl,
FnName: fnName,
@ -129,26 +127,26 @@ func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
}, nil
}
// isFuncType 判断传递进来的 paramType 是否是函数类型
// isFuncType checks whether the provided paramType is a function type
func isFuncType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Func
}
// isFlowType 判断传递进来的 paramType 是否是 kis.Flow 类型
// isFlowType checks whether the provided paramType is of type kis.Flow
func isFlowType(paramType reflect.Type) bool {
var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()
return paramType.Implements(flowInterfaceType)
}
// isContextType 判断传递进来的 paramType 是否是 context.Context 类型
// isContextType checks whether the provided paramType is of type context.Context
func isContextType(paramType reflect.Type) bool {
typeName := paramType.Name()
return strings.Contains(typeName, "Context")
}
// isSliceType 判断传递进来的 paramType 是否是切片类型
// isSliceType checks whether the provided paramType is a slice type
func isSliceType(paramType reflect.Type) bool {
return paramType.Kind() == reflect.Slice
}

View File

@ -8,51 +8,51 @@ import (
)
type Flow interface {
// Run 调度Flow依次调度Flow中的Function并且执行
// Run schedules the Flow, sequentially dispatching and executing Functions in the Flow
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接, 同时Flow的配置也会更新
// Link connects the Functions in the Flow according to the configuration in the config file, and the Flow's configuration will also be updated
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// AppendNewFunction 将一个新的Function追加到到Flow中
// AppendNewFunction appends a new Function to the Flow
AppendNewFunction(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
// CommitRow submits Flow data to the upcoming Function layer
CommitRow(row interface{}) error
// CommitRowBatch 提交Flow数据到即将执行的Function层(批量提交)
// CommitRowBatch submits Flow data to the upcoming Function layer (batch submission)
// row: Must be a slice
CommitRowBatch(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
// Input gets the input source data of the currently executing Function in the Flow
Input() common.KisRowArr
// GetName 得到Flow的名称
// GetName gets the name of the Flow
GetName() string
// GetThisFunction 得到当前正在执行的Function
// GetThisFunction gets the currently executing Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
// GetThisFuncConf gets the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
// GetConnector gets the Connector of the currently executing Function
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
// GetConnConf gets the configuration of the Connector of the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
// GetConfig gets the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
// GetFuncConfigByName gets the configuration of the current Flow by Function name
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
// Next carries the Action actions of the next layer Function that the current Flow is executing
Next(acts ...ActionFunc) error
// GetCacheData 得到当前Flow的缓存数据
// GetCacheData gets the cached data of the current Flow
GetCacheData(key string) interface{}
// SetCacheData 设置当前Flow的缓存数据
// SetCacheData sets the cached data of the current Flow
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData 得到当前Flow的临时数据
// GetMetaData gets the temporary data of the current Flow
GetMetaData(key string) interface{}
// SetMetaData 设置当前Flow的临时数据
// SetMetaData sets the temporary data of the current Flow
SetMetaData(key string, value interface{})
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数取出一对key-value
// GetFuncParam gets the default parameters of the current Flow's currently executing Function, retrieving a key-value pair
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数取出全部Key-Value
// GetFuncParamAll gets the default parameters of the current Flow's currently executing Function, retrieving all Key-Value pairs
GetFuncParamAll() config.FParam
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams取出全部Key-Value
// GetFuncParamsAllFuncs gets the FuncParams of all Functions in the Flow, retrieving all Key-Value pairs
GetFuncParamsAllFuncs() map[string]config.FParam
// Fork 得到Flow的一个副本(深拷贝)
// Fork gets a copy of the Flow (deep copy)
Fork(ctx context.Context) Flow
// GetId 得到Flow的Id
GetId() string
// GetID gets the Id of the Flow
GetID() string
}

View File

@ -5,46 +5,46 @@ import (
"github.com/aceld/kis-flow/config"
)
// Function 流式计算基础计算模块KisFunction是一条流式计算的基本计算逻辑单元
// 任意个KisFunction可以组合成一个KisFlow
// Function is the basic computation unit of streaming computation. KisFunction is a basic logical unit of streaming computation,
// any number of KisFunctions can be combined into a KisFlow
type Function interface {
// Call 执行流式计算逻辑
// Call executes the streaming computation logic
Call(ctx context.Context, flow Flow) error
// SetConfig 给当前Function实例配置策略
// SetConfig configures the current Function instance
SetConfig(s *config.KisFuncConfig) error
// GetConfig 获取当前Function实例配置策略
// GetConfig retrieves the configuration of the current Function instance
GetConfig() *config.KisFuncConfig
// SetFlow 给当前Function实例设置所依赖的Flow实例
// SetFlow sets the Flow instance that the current Function instance depends on
SetFlow(f Flow) error
// GetFlow 获取当前Functioin实力所依赖的Flow
// GetFlow retrieves the Flow instance that the current Function instance depends on
GetFlow() Flow
// AddConnector 给当前Function实例添加一个Connector
// AddConnector adds a Connector to the current Function instance
AddConnector(conn Connector) error
// GetConnector 获取当前Function实例所关联的Connector
// GetConnector retrieves the Connector associated with the current Function instance
GetConnector() Connector
// CreateId 给当前Funciton实力生成一个随机的实例KisID
// CreateId generates a random KisID for the current Function instance
CreateId()
// GetId 获取当前Function的FID
GetId() string
// GetPrevId 获取当前Function上一个Function节点FID
// GetID retrieves the FID of the current Function
GetID() string
// GetPrevId retrieves the FID of the previous Function node of the current Function
GetPrevId() string
// GetNextId 获取当前Function下一个Function节点FID
// GetNextId retrieves the FID of the next Function node of the current Function
GetNextId() string
// Next 返回下一层计算流Function如果当前层为最后一层则返回nil
// Next returns the next layer of the computation flow Function. If the current layer is the last layer, it returns nil
Next() Function
// Prev 返回上一层计算流Function如果当前层为最后一层则返回nil
// Prev returns the previous layer of the computation flow Function. If the current layer is the last layer, it returns nil
Prev() Function
// SetN 设置下一层Function实例
// SetN sets the next Function instance
SetN(f Function)
// SetP 设置上一层Function实例
// SetP sets the previous Function instance
SetP(f Function)
// GetMetaData 得到当前Function的临时数据
// GetMetaData retrieves the temporary data of the current Function
GetMetaData(key string) interface{}
// SetMetaData 设置当前Function的临时数据
// SetMetaData sets the temporary data of the current Function
SetMetaData(key string, value interface{})
}

View File

@ -12,37 +12,37 @@ import (
var _poolOnce sync.Once
// kisPool 用于管理全部的Function和Flow配置的池子
type kisPool struct {
fnRouter funcRouter // 全部的Function管理路由
fnLock sync.RWMutex // fnRouter
// KisPool manages all Function and Flow configurations
type KisPool struct {
fnRouter funcRouter // All Function management routes
fnLock sync.RWMutex // fnRouter lock
flowRouter flowRouter // 全部的flow对象
flowLock sync.RWMutex // flowRouter
flowRouter flowRouter // All flow objects
flowLock sync.RWMutex // flowRouter lock
cInitRouter connInitRouter // 全部的Connector初始化路由
ciLock sync.RWMutex // cInitRouter
cInitRouter connInitRouter // All Connector initialization routes
ciLock sync.RWMutex // cInitRouter lock
cTree connTree // 全部Connector管理路由
cLock sync.RWMutex // cTree
cTree connTree // All Connector management routes
cLock sync.RWMutex // cTree lock
}
// 单例
var _pool *kisPool
// Singleton
var _pool *KisPool
// Pool 单例构造
func Pool() *kisPool {
// Pool Singleton constructor
func Pool() *KisPool {
_poolOnce.Do(func() {
//创建kisPool对象
_pool = new(kisPool)
// Create KisPool object
_pool = &KisPool{}
// fnRouter初始化
// Initialize fnRouter
_pool.fnRouter = make(funcRouter)
// flowRouter初始化
// Initialize flowRouter
_pool.flowRouter = make(flowRouter)
// connTree初始化
// Initialize connTree
_pool.cTree = make(connTree)
_pool.cInitRouter = make(connInitRouter)
})
@ -50,8 +50,8 @@ func Pool() *kisPool {
return _pool
}
func (pool *kisPool) AddFlow(name string, flow Flow) {
pool.flowLock.Lock() // 写锁
func (pool *KisPool) AddFlow(name string, flow Flow) {
pool.flowLock.Lock() // Write lock
defer pool.flowLock.Unlock()
if _, ok := pool.flowRouter[name]; !ok {
@ -64,8 +64,8 @@ func (pool *kisPool) AddFlow(name string, flow Flow) {
log.Logger().InfoF("Add FlowRouter FlowName=%s", name)
}
func (pool *kisPool) GetFlow(name string) Flow {
pool.flowLock.RLock() // 读锁
func (pool *KisPool) GetFlow(name string) Flow {
pool.flowLock.RLock() // Read lock
defer pool.flowLock.RUnlock()
if flow, ok := pool.flowRouter[name]; ok {
@ -75,20 +75,20 @@ func (pool *kisPool) GetFlow(name string) Flow {
}
}
// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
func (pool *kisPool) FaaS(fnName string, f FaaS) {
// FaaS registers Function computation business logic, indexed and registered by Function Name
func (pool *KisPool) FaaS(fnName string, f FaaS) {
// 当注册FaaS计算逻辑回调时创建一个FaaSDesc描述对象
// When registering the FaaS computation logic callback, create a FaaSDesc description object
faaSDesc, err := NewFaaSDesc(fnName, f)
if err != nil {
panic(err)
}
pool.fnLock.Lock() // 写锁
pool.fnLock.Lock() // Write lock
defer pool.fnLock.Unlock()
if _, ok := pool.fnRouter[fnName]; !ok {
// 将FaaSDesc描述对象注册到fnRouter中
// Register the FaaSDesc description object to fnRouter
pool.fnRouter[fnName] = faaSDesc
} else {
errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
@ -98,33 +98,33 @@ func (pool *kisPool) FaaS(fnName string, f FaaS) {
log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}
// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
pool.fnLock.RLock() // 读锁
// CallFunction schedules Function
func (pool *KisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {
pool.fnLock.RLock() // Read lock
defer pool.fnLock.RUnlock()
if funcDesc, ok := pool.fnRouter[fnName]; ok {
// 被调度Function的形参列表
// Parameters list for the scheduled Function
params := make([]reflect.Value, 0, funcDesc.ArgNum)
for _, argType := range funcDesc.ArgsType {
// 如果是Flow类型形参则将 flow的值传入
// If it is a Flow type parameter, pass in the value of flow
if isFlowType(argType) {
params = append(params, reflect.ValueOf(flow))
continue
}
// 如果是Context类型形参则将 ctx的值传入
// If it is a Context type parameter, pass in the value of ctx
if isContextType(argType) {
params = append(params, reflect.ValueOf(ctx))
continue
}
// 如果是Slice类型形参则将 flow.Input()的值传入
// If it is a Slice type parameter, pass in the value of flow.Input()
if isSliceType(argType) {
// 将flow.Input()中的原始数据反序列化为argType类型的数据
// Deserialize the raw data in flow.Input() to data of type argType
value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
if err != nil {
log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
@ -135,20 +135,20 @@ func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow)
}
// 传递的参数既不是Flow类型也不是Context类型也不是Slice类型则默认给到零值
// If the passed parameter is neither a Flow type, nor a Context type, nor a Slice type, it defaults to zero value
params = append(params, reflect.Zero(argType))
}
// 调用当前Function 的计算逻辑
// Call the computation logic of the current Function
retValues := funcDesc.FuncValue.Call(params)
// 取出第一个返回值如果是nil则返回nil
// Extract the first return value, if it is nil, return nil
ret := retValues[0].Interface()
if ret == nil {
return nil
}
// 如果返回值是error类型则返回error
// If the return value is of type error, return error
return retValues[0].Interface().(error)
}
@ -158,9 +158,9 @@ func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow)
return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}
// CaaSInit 注册Connector初始化业务
func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
pool.ciLock.Lock() // 写锁
// CaaSInit registers Connector initialization business
func (pool *KisPool) CaaSInit(cname string, c ConnInit) {
pool.ciLock.Lock() // Write lock
defer pool.ciLock.Unlock()
if _, ok := pool.cInitRouter[cname]; !ok {
@ -173,9 +173,9 @@ func (pool *kisPool) CaaSInit(cname string, c ConnInit) {
log.Logger().InfoF("Add KisPool CaaSInit CName=%s", cname)
}
// CallConnInit 调度 ConnInit
func (pool *kisPool) CallConnInit(conn Connector) error {
pool.ciLock.RLock() // 读锁
// CallConnInit schedules ConnInit
func (pool *KisPool) CallConnInit(conn Connector) error {
pool.ciLock.RLock() // Read lock
defer pool.ciLock.RUnlock()
init, ok := pool.cInitRouter[conn.GetName()]
@ -187,16 +187,16 @@ func (pool *kisPool) CallConnInit(conn Connector) error {
return init(conn)
}
// CaaS 注册Connector Call业务
func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
pool.cLock.Lock() // 写锁
// CaaS registers Connector Call business
func (pool *KisPool) CaaS(cname string, fname string, mode common.KisMode, c CaaS) {
pool.cLock.Lock() // Write lock
defer pool.cLock.Unlock()
if _, ok := pool.cTree[cname]; !ok {
//cid 首次注册,不存在,创建二级树NsConnSL
//cid First registration, does not exist, create a second-level tree NsConnSL
pool.cTree[cname] = make(connSL)
//初始化各类型FunctionMode
// Initialize various FunctionMode
pool.cTree[cname][common.S] = make(connFuncRouter)
pool.cTree[cname][common.L] = make(connFuncRouter)
}
@ -211,9 +211,9 @@ func (pool *kisPool) CaaS(cname string, fname string, mode common.KisMode, c Caa
log.Logger().InfoF("Add KisPool CaaS CName=%s, FName=%s, Mode =%s", cname, fname, mode)
}
// CallConnector 调度 Connector
func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) (interface{}, error) {
pool.cLock.RLock() // 读锁
// CallConnector schedules Connector
func (pool *KisPool) CallConnector(ctx context.Context, flow Flow, conn Connector, args interface{}) (interface{}, error) {
pool.cLock.RLock() // Read lock
defer pool.cLock.RUnlock()
fn := flow.GetThisFunction()
fnConf := fn.GetConfig()
@ -228,9 +228,9 @@ func (pool *kisPool) CallConnector(ctx context.Context, flow Flow, conn Connecto
return nil, errors.New(fmt.Sprintf("CName:%s FName:%s mode:%s Can not find in KisPool, Not Added.", conn.GetName(), fnConf.FName, mode))
}
// GetFlows 得到全部的Flow
func (pool *kisPool) GetFlows() []Flow {
pool.flowLock.RLock() // 读锁
// GetFlows retrieves all Flows
func (pool *KisPool) GetFlows() []Flow {
pool.flowLock.RLock() // Read lock
defer pool.flowLock.RUnlock()
var flows []Flow

View File

@ -5,14 +5,12 @@ import (
"github.com/aceld/kis-flow/common"
)
// FaaS 定义移植到 faas.go 中
/*
Function Call
*/
// funcRouter
// key: Function Name
// value: FaaSDesc 回调自定义业务的描述
// value: FaaSDesc callback description for custom business
type funcRouter map[string]*FaaSDesc
// flowRouter
@ -23,30 +21,31 @@ type flowRouter map[string]Flow
/*
Connector Init
*/
// ConnInit Connector 第三方挂载存储初始化
// ConnInit Connector third-party storage initialization
type ConnInit func(conn Connector) error
// connInitRouter
// key:
// key: Connector Name
// value: ConnInit
type connInitRouter map[string]ConnInit
/*
Connector Call
*/
// CaaS Connector的存储读取业务实现
// CaaS Connector storage read/write business implementation
type CaaS func(context.Context, Connector, Function, Flow, interface{}) (interface{}, error)
// connFuncRouter 通过FunctionName索引到CaaS回调存储业务的映射关系
// connFuncRouter Maps CaaS callback storage business to FunctionName
// key: Function Name
// value: Connector的存储读取业务实现
// value: Connector storage read/write business implementation
type connFuncRouter map[string]CaaS
// connSL 通过KisMode 将connFuncRouter分为两个子树
// connSL Splits connFuncRouter into two subtrees based on KisMode
// key: Function KisMode S/L
// value: NsConnRouter
// value: connFuncRouter
type connSL map[common.KisMode]connFuncRouter
// connTree
// key: Connector Name
// value: connSL 二级树
// value: connSL second-level tree
type connTree map[string]connSL

View File

@ -6,18 +6,18 @@ import (
"reflect"
)
// Serialize 数据序列化接口
// Serialize Data serialization interface
type Serialize interface {
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
// UnMarshal is used to deserialize KisRowArr to a value of the specified type.
UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
// Marshal 用于将指定类型的值序列化为 KisRowArr。
// Marshal is used to serialize a value of the specified type to KisRowArr.
Marshal(interface{}) (common.KisRowArr, error)
}
// defaultSerialize KisFlow提供的默认序列化实现(开发者可以自定义)
// defaultSerialize Default serialization implementation provided by KisFlow (developers can customize)
var defaultSerialize = &serialize.DefaultSerialize{}
// isSerialize 判断传递进来的 paramType 是否实现了 Serialize 接口
// isSerialize checks if the provided paramType implements the Serialize interface
func isSerialize(paramType reflect.Type) bool {
return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}

View File

@ -6,7 +6,7 @@ import (
"sync"
)
// kisDefaultLog 默认提供的日志对象
// kisDefaultLog Default provided log object
type kisDefaultLog struct {
debugMode bool
mu sync.Mutex
@ -60,7 +60,7 @@ func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interfac
}
func init() {
// 如果没有设置Logger, 则启动时使用默认的kisDefaultLog对象
// If no logger is set, use the default kisDefaultLog object at startup
if Logger() == nil {
SetLogger(&kisDefaultLog{})
}

View File

@ -3,33 +3,33 @@ package log
import "context"
type KisLogger interface {
// InfoFX 有上下文的Info级别日志接口, format字符串格式
// InfoFX with context Info-level log interface, format string format
InfoFX(ctx context.Context, str string, v ...interface{})
// ErrorFX 有上下文的Error级别日志接口, format字符串格式
// ErrorFX with context Error-level log interface, format string format
ErrorFX(ctx context.Context, str string, v ...interface{})
// DebugFX 有上下文的Debug级别日志接口, format字符串格式
// DebugFX with context Debug-level log interface, format string format
DebugFX(ctx context.Context, str string, v ...interface{})
// InfoF 无上下文的Info级别日志接口, format字符串格式
// InfoF without context Info-level log interface, format string format
InfoF(str string, v ...interface{})
// ErrorF 无上下文的Error级别日志接口, format字符串格式
// ErrorF without context Error-level log interface, format string format
ErrorF(str string, v ...interface{})
// DebugF 无上下文的Debug级别日志接口, format字符串格式
// DebugF without context Debug-level log interface, format string format
DebugF(str string, v ...interface{})
// SetDebugMode 设置Debug模式
// SetDebugMode set Debug mode
SetDebugMode(enable bool)
}
// kisLog 默认的KisLog 对象, 提供默认的日志打印方式, 均是打印在标准输出上。
// kisLog Default KisLog object, providing default log printing methods, all of which print to standard output.
var kisLog KisLogger
// SetLogger 设置KisLog对象, 可以是用户自定义的Logger对象
// SetLogger set KisLog object, can be a user-defined Logger object
func SetLogger(newlog KisLogger) {
kisLog = newlog
}
// Logger 获取到kisLog对象
// Logger get the kisLog object
func Logger() KisLogger {
return kisLog
}

View File

@ -9,32 +9,32 @@ import (
"net/http"
)
// kisMetrics kisFlow的Prometheus监控指标
type kisMetrics struct {
// 数据数量总量
// KisMetrics kisFlow's Prometheus monitoring metrics
type KisMetrics struct {
// Total data quantity
DataTotal prometheus.Counter
// 各Flow处理数据总量
// Total data processed by each Flow
FlowDataTotal *prometheus.GaugeVec
// Flow被调度次数
// Flow scheduling counts
FlowScheduleCntsToTal *prometheus.GaugeVec
// Function被调度次数
// Function scheduling counts
FuncScheduleCntsTotal *prometheus.GaugeVec
// Function执行时间
// Function execution time
FunctionDuration *prometheus.HistogramVec
// Flow执行时间
// Flow execution time
FlowDuration *prometheus.HistogramVec
}
var Metrics *kisMetrics
var Metrics *KisMetrics
// RunMetricsService 启动Prometheus监控服务
// RunMetricsService starts the Prometheus monitoring service
func RunMetricsService(serverAddr string) error {
// 注册Prometheus 监控路由路径
http.Handle(common.METRICS_ROUTE, promhttp.Handler())
// Register Prometheus monitoring route path
http.Handle(common.MetricsRoute, promhttp.Handler())
// 启动HttpServer
err := http.ListenAndServe(serverAddr, nil) //多个进程不可监听同一个端口
// Start HttpServer
err := http.ListenAndServe(serverAddr, nil) // Multiple processes cannot listen on the same port
if err != nil {
log.Logger().ErrorF("RunMetricsService err = %s\n", err)
}
@ -43,64 +43,64 @@ func RunMetricsService(serverAddr string) error {
}
func InitMetrics() {
Metrics = new(kisMetrics)
Metrics = new(KisMetrics)
// DataTotal初始化Counter
// Initialize DataTotal Counter
Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
Name: common.CounterKisflowDataTotalName,
Help: common.CounterKisflowDataTotalHelp,
})
// FlowDataTotal初始化GaugeVec
// Initialize FlowDataTotal GaugeVec
Metrics.FlowDataTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
Name: common.GamgeFlowDataTotalName,
Help: common.GamgeFlowDataTotalHelp,
},
// 标签名称
[]string{common.LABEL_FLOW_NAME},
// Label names
[]string{common.LabelFlowName},
)
// FlowScheduleCntsToTal初始化GaugeVec
// Initialize FlowScheduleCntsToTal GaugeVec
Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
Name: common.GangeFlowScheCntsName,
Help: common.GangeFlowScheCntsHelp,
},
//标签名称
[]string{common.LABEL_FLOW_NAME},
// Label names
[]string{common.LabelFlowName},
)
// FuncScheduleCntsTotal初始化GaugeVec
// Initialize FuncScheduleCntsTotal GaugeVec
Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: common.GANGE_FUNC_SCHE_CNTS_NAME,
Help: common.GANGE_FUNC_SCHE_CNTS_HELP,
Name: common.GangeFuncScheCntsName,
Help: common.GangeFuncScheCntsHelp,
},
//标签名称
[]string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
// Label names
[]string{common.LabelFunctionName, common.LabelFunctionMode},
)
// FunctionDuration初始化HistogramVec
// Initialize FunctionDuration HistogramVec
Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: common.HISTOGRAM_FUNCTION_DURATION_NAME,
Help: common.HISTOGRAM_FUNCTION_DURATION_HELP,
Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, //单位ms,最大半分钟
Name: common.HistogramFunctionDurationName,
Help: common.HistogramFunctionDurationHelp,
Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, // Unit: ms, maximum half a minute
},
[]string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
[]string{common.LabelFunctionName, common.LabelFunctionMode},
)
// FlowDuration初始化HistogramVec
// Initialize FlowDuration HistogramVec
Metrics.FlowDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: common.HISTOGRAM_FLOW_DURATION_NAME,
Help: common.HISTOGRAM_FLOW_DURATION_HELP,
Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000, 60000}, //单位ms,最大1分钟
Name: common.HistogramFlowDurationName,
Help: common.HistogramFlowDurationHelp,
Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000, 60000}, // Unit: ms, maximum 1 minute
},
[]string{common.LABEL_FLOW_NAME},
[]string{common.LabelFlowName},
)
// 注册Metrics
// Register Metrics
prometheus.MustRegister(Metrics.DataTotal)
prometheus.MustRegister(Metrics.FlowDataTotal)
prometheus.MustRegister(Metrics.FlowScheduleCntsToTal)
@ -109,13 +109,13 @@ func InitMetrics() {
prometheus.MustRegister(Metrics.FlowDuration)
}
// RunMetrics 启动Prometheus指标服务
// RunMetrics starts the Prometheus metrics service
func RunMetrics() {
// 初始化Prometheus指标
// Initialize Prometheus metrics
InitMetrics()
if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true {
// 启动Prometheus指标Metrics服务
// Start Prometheus metrics service
go RunMetricsService(config.GlobalConfig.PrometheusServe)
}
}

View File

@ -1,9 +1,14 @@
/*
DefaultSerialize 实现了 Serialize 接口用于将 KisRowArr 序列化为指定类型的值或将指定类型的值序列化为 KisRowArr
这部分是KisFlow默认提供的序列化办法默认均是josn序列化开发者可以根据自己的需求实现自己的序列化办法
*/
package serialize
/*
DefaultSerialize implements the Serialize interface,
which is used to serialize KisRowArr into a specified type, or serialize a specified type into KisRowArr.
This section is the default serialization method provided by KisFlow, and it defaults to json serialization.
Developers can implement their own serialization methods according to their needs.
*/
import (
"encoding/json"
"fmt"
@ -13,35 +18,35 @@ import (
type DefaultSerialize struct{}
// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
// UnMarshal is used to deserialize KisRowArr into a specified type.
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
// 确保传入的类型是一个切片
// Ensure the passed-in type is a slice
if r.Kind() != reflect.Slice {
return reflect.Value{}, fmt.Errorf("r must be a slice")
}
slice := reflect.MakeSlice(r, 0, len(arr))
// 遍历每个元素并尝试反序列化
// Iterate through each element and attempt deserialization
for _, row := range arr {
var elem reflect.Value
var err error
// 尝试断言为结构体或指针
// Try to assert as a struct or pointer
elem, err = unMarshalStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 尝试直接反序列化字符串
// Try to directly deserialize the string
elem, err = unMarshalJsonString(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
continue
}
// 尝试先序列化为 JSON 再反序列化
// Try to serialize to JSON first and then deserialize
elem, err = unMarshalJsonStruct(row, r.Elem())
if err == nil {
slice = reflect.Append(slice, elem)
@ -53,9 +58,9 @@ func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (refl
return slice, nil
}
// 尝试断言为结构体或指针
// Try to assert as a struct or pointer
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 检查 row 是否为结构体或结构体指针类型
// Check if row is of struct or struct pointer type
rowType := reflect.TypeOf(row)
if rowType == nil {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
@ -64,41 +69,41 @@ func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, e
return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
}
// 如果 row 是指针类型,则获取它指向的类型
// If row is a pointer type, get the type it points to
if rowType.Kind() == reflect.Ptr {
// 空指针
// Nil pointer
if reflect.ValueOf(row).IsNil() {
return reflect.Value{}, fmt.Errorf("row is nil pointer")
}
// 解引用
// Dereference
row = reflect.ValueOf(row).Elem().Interface()
// 拿到解引用后的类型
// Get the type after dereferencing
rowType = reflect.TypeOf(row)
}
// 检查是否可以将 row 断言为 elemType(目标类型)
// Check if row can be asserted to elemType(target type)
if !rowType.AssignableTo(elemType) {
return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
}
// 将 row 转换为 reflect.Value 并返回
// Convert row to reflect.Value and return
return reflect.ValueOf(row), nil
}
// 尝试直接反序列化字符串(将Json字符串 反序列化为 结构体)
// Try to directly deserialize the string(Deserialize the Json string into a struct)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 判断源数据是否可以断言成string
// Check if the source data can be asserted as a string
str, ok := row.(string)
if !ok {
return reflect.Value{}, fmt.Errorf("not a string")
}
// 创建一个新的结构体实例,用于存储反序列化后的值
// Create a new struct instance to store the deserialized value
elem := reflect.New(elemType).Elem()
// 尝试将json字符串反序列化为结构体。
// Try to deserialize the json string into a struct.
if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
}
@ -106,18 +111,18 @@ func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Valu
return elem, nil
}
// 尝试先序列化为 JSON 再反序列化(将结构体转换成Json字符串再将Json字符串 反序列化为 结构体)
// Try to serialize to JSON first and then deserialize(Serialize the struct to Json string, and then deserialize the Json string into a struct)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
// 将 row 序列化为 JSON 字符串
// Serialize row to JSON string
jsonBytes, err := json.Marshal(row)
if err != nil {
return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v ", err)
}
// 创建一个新的结构体实例,用于存储反序列化后的值
// Create a new struct instance to store the deserialized value
elem := reflect.New(elemType).Interface()
// 将 JSON 字符串反序列化为结构体
// Deserialize the JSON string into a struct
if err := json.Unmarshal(jsonBytes, elem); err != nil {
return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v ", err)
}
@ -125,7 +130,7 @@ func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Valu
return reflect.ValueOf(elem).Elem(), nil
}
// Marshal 用于将指定类型的值序列化为 KisRowArr(json 序列化)。
// Marshal is used to serialize a specified type into KisRowArr(json serialization).
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
var arr common.KisRowArr
@ -133,7 +138,7 @@ func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
case reflect.Slice, reflect.Array:
slice := reflect.ValueOf(i)
for i := 0; i < slice.Len(); i++ {
// 序列化每个元素为 JSON 字符串,并将其添加到切片中。
// Serialize each element to a JSON string and append it to the slice.
jsonBytes, err := json.Marshal(slice.Index(i).Interface())
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v ", err)
@ -141,7 +146,7 @@ func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
arr = append(arr, string(jsonBytes))
}
default:
// 如果不是切片或数组类型,则直接序列化整个结构体为 JSON 字符串。
// If it's not a slice or array type, serialize the entire struct to a JSON string directly.
jsonBytes, err := json.Marshal(i)
if err != nil {
return nil, fmt.Errorf("failed to marshal element to JSON: %v ", err)

View File

@ -2,7 +2,7 @@ kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
name: TikTok-Order
must:
- order_id
- user_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -12,7 +12,7 @@ func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call AbortFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
}

View File

@ -12,13 +12,11 @@ func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call DataReuseFuncHandler ----")
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}

View File

@ -13,14 +13,11 @@ func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
// 打印数据
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}

View File

@ -14,7 +14,7 @@ func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
conn, err := flow.GetConnector()
@ -28,10 +28,8 @@ func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
return err
}
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}

View File

@ -13,7 +13,7 @@ func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
}

View File

@ -12,13 +12,11 @@ func FuncDemo4Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call FuncDemo4Handler ----")
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}

View File

@ -12,7 +12,7 @@ func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call JumpFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
}

View File

@ -12,7 +12,7 @@ func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call NoResultFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetID(), row)
fmt.Println(str)
}

View File

@ -17,14 +17,15 @@ type AvgStuScoreOut struct {
proto.StuAvgScore
}
// AvgStuScore(FaaS) 计算学生平均分
// AvgStuScore(FaaS) calculates the average score of students
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
avgScore := proto.StuAvgScore{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// 提交结果数据
// Submit result data
_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
}

View File

@ -8,18 +8,17 @@ import (
)
func init() {
// 0. 注册Function 回调业务
// Register Function callback business
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
kis.Pool().FaaS("funcName4", faas.FuncDemo4Handler)
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // abortFunc 业务
kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // dataReuseFunc 业务
kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // noResultFunc 业务
kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // jumpFunc 业务
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // abortFunc business
kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // dataReuseFunc business
kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // noResultFunc business
kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // jumpFunc business
// 0. 注册ConnectorInit 和 Connector 回调业务
// Register ConnectorInit and Connector callback business
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
}

View File

@ -11,21 +11,21 @@ import (
func TestActionAbort(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
fmt.Println("Wrong Config Yaml Path!")
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName2")
// 3. 提交原始数据
// 3. Commit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
@ -34,21 +34,21 @@ func TestActionAbort(t *testing.T) {
func TestActionDataReuse(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
fmt.Println("Wrong Config Yaml Path!")
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName3")
// 3. 提交原始数据
// 3. Commit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
@ -57,21 +57,21 @@ func TestActionDataReuse(t *testing.T) {
func TestActionForceEntry(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
fmt.Println("Wrong Config Yaml Path!")
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName4")
// 3. 提交原始数据
// 3. Commit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
@ -80,21 +80,21 @@ func TestActionForceEntry(t *testing.T) {
func TestActionJumpFunc(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
fmt.Println("Wrong Config Yaml Path!")
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName5")
// 3. 提交原始数据
// 3. Commit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -18,18 +18,18 @@ func TestAutoInjectParamWithConfig(t *testing.T) {
kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("StuAvg")
if flow1 == nil {
panic("flow1 is nil")
}
// 3. 提交原始数据
// 3. Commit original data
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
@ -47,10 +47,10 @@ func TestAutoInjectParamWithConfig(t *testing.T) {
},
})
// 提交原始数据json字符串
// Commit original data (as JSON string)
_ = flow1.CommitRow(`{"stu_id":101}`)
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
@ -81,7 +81,7 @@ func TestAutoInjectParam(t *testing.T) {
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接Functioin 到 Flow 上
// 4. Link Functions to Flow
if err := flow1.Link(avgStuScoreConfig, nil); err != nil {
panic(err)
}
@ -89,7 +89,7 @@ func TestAutoInjectParam(t *testing.T) {
panic(err)
}
// 3. 提交原始数据
// 3. Commit original data
_ = flow1.CommitRow(&faas.AvgStuScoreIn{
StuScores: proto.StuScores{
StuId: 100,
@ -108,7 +108,7 @@ func TestAutoInjectParam(t *testing.T) {
},
})
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -7,15 +7,15 @@ import (
"testing"
)
func TestConfigExportYmal(t *testing.T) {
func TestConfigExportYaml(t *testing.T) {
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
fmt.Println("Wrong Config Yaml Path!")
panic(err)
}
// 2. 讲构建的内存KisFlow结构配置导出的文件当中
// 2. Export the built memory KisFlow structure configuration to files
flows := kis.Pool().GetFlows()
for _, flow := range flows {
if err := file.ConfigExportYaml(flow, "/Users/Aceld/go/src/kis-flow/test/export_conf/"); err != nil {

View File

@ -10,20 +10,20 @@ import (
func TestConfigImportYaml(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. 提交原始数据
// 3. Commit the raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -9,7 +9,7 @@ import (
func TestNewFuncConfig(t *testing.T) {
source := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}
@ -56,7 +56,7 @@ func TestNewFlowConfig(t *testing.T) {
func TestNewConnConfig(t *testing.T) {
source := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}

View File

@ -12,14 +12,14 @@ func TestNewKisConnector(t *testing.T) {
ctx := context.Background()
// 1. 创建3个KisFunction配置实例, 其中myFuncConfig2 有Connector配置
// 1. Create three KisFunction configuration instances, with myFuncConfig2 having a Connector configuration
source1 := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用户订单错误率",
Name: "UserOrderErrorRate",
Must: []string{"order_id", "user_id"},
}
@ -42,22 +42,22 @@ func TestNewKisConnector(t *testing.T) {
panic("myFuncConfig3 is nil")
}
// 2. 创建一个KisConnector配置实例
// 2. Create a KisConnector configuration instance
myConnConfig1 := config.NewConnConfig("ConnName1", "0.0.0.0:9998", common.REDIS, "redis-key", nil)
if myConnConfig1 == nil {
panic("myConnConfig1 is nil")
}
// 3. 将KisConnector配置实例绑定到KisFunction配置实例上
// 3. Bind the KisConnector configuration instance to the KisFunction configuration instance
_ = myFuncConfig2.AddConnConfig(myConnConfig1)
// 4. 创建一个 KisFlow 配置实例
// 4. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 5. 创建一个KisFlow对象
// 5. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 6. 拼接Functioin 到 Flow 上
// 6. Link Functions to the Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
@ -68,12 +68,12 @@ func TestNewKisConnector(t *testing.T) {
panic(err)
}
// 7. 提交原始数据
// 7. Commit raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 8. 执行flow1
// 8. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -11,12 +11,12 @@ import (
func TestForkFlowCommitBatch(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load the configuration file and build the Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName1")
stringRows := []string{
@ -25,13 +25,13 @@ func TestForkFlowCommitBatch(t *testing.T) {
"This is Data3 from Test",
}
// 3. 提交原始数据
// 3. Commit raw data
if err := flow1.CommitRowBatch(stringRows); err != nil {
log.Logger().ErrorF("CommitRowBatch Error, err = %+v", err)
panic(err)
}
// 4. 执行flow1
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -11,14 +11,14 @@ import (
func TestNewKisFlow(t *testing.T) {
ctx := context.Background()
// 1. 创建2个KisFunction配置实例
// 1. Create 2 KisFunction configuration instances
source1 := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用户订单错误率",
Name: "UserOrderErrorRate",
Must: []string{"order_id", "user_id"},
}
@ -32,13 +32,13 @@ func TestNewKisFlow(t *testing.T) {
panic("myFuncConfig2 is nil")
}
// 2. 创建一个 KisFlow 配置实例
// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 创建一个KisFlow对象
// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接Functioin 到 Flow 上
// 4. Link functions to the Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
@ -46,7 +46,7 @@ func TestNewKisFlow(t *testing.T) {
panic(err)
}
// 5. 执行flow1
// 5. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
@ -55,14 +55,14 @@ func TestNewKisFlow(t *testing.T) {
func TestNewKisFlowData(t *testing.T) {
ctx := context.Background()
// 1. 创建2个KisFunction配置实例
// 1. Create 2 KisFunction configuration instances
source1 := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用户订单错误率",
Name: "UserOrderErrorRate",
Must: []string{"order_id", "user_id"},
}
@ -76,13 +76,13 @@ func TestNewKisFlowData(t *testing.T) {
panic("myFuncConfig4 is nil")
}
// 2. 创建一个 KisFlow 配置实例
// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 创建一个KisFlow对象
// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接 Function 到 Flow 上
// 4. Link Function to the Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
@ -90,12 +90,12 @@ func TestNewKisFlowData(t *testing.T) {
panic(err)
}
// 5. 提交原始数据
// 5. Commit raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 6. 执行flow1
// 6. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -14,12 +14,12 @@ import (
func TestForkFlow(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
// 1. Load configuration file and build Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
// 2. Get Flow
flow1 := kis.Pool().GetFlow("flowFork1")
fmt.Println("----> flow1: ", flow1.GetFuncParamsAllFuncs())
@ -27,10 +27,10 @@ func TestForkFlow(t *testing.T) {
flow1Clone1 := flow1.Fork(ctx)
fmt.Println("----> flow1Clone1: ", flow1Clone1.GetFuncParamsAllFuncs())
// 3. 提交原始数据
// 3. Commit raw data
_ = flow1Clone1.CommitRow("This is Data1 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}
@ -58,10 +58,10 @@ func TestForkFlowWithLink(t *testing.T) {
fmt.Println("----> flow1Clone1: ", flow1Clone1.GetFuncParamsAllFuncs())
// 3. 提交原始数据
// 3. Commit raw data
_ = flow1Clone1.CommitRow("This is Data1 from Test")
// 4. 执行flow1
// 4. Execute flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}

View File

@ -12,9 +12,9 @@ import (
func TestNewKisFunction(t *testing.T) {
ctx := context.Background()
// 1. 创建一个KisFunction配置实例
// 1. Create a KisFunction configuration instance
source := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TikTokOrder",
Must: []string{"order_id", "user_id"},
}
@ -23,13 +23,13 @@ func TestNewKisFunction(t *testing.T) {
panic("myFuncConfig1 is nil")
}
// 2. 创建一个 KisFlow 配置实例
// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 创建一个KisFlow对象
// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 创建一个KisFunction对象
// 4. Create a KisFunction object
func1 := function.NewKisFunction(flow1, myFuncConfig1)
if err := func1.Call(ctx, flow1); err != nil {

View File

@ -12,22 +12,18 @@ import (
func TestMetricsDataTotal(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
fmt.Println("Wrong Config Yaml Path!")
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
n := 0
for n < 10 {
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -10,20 +10,16 @@ import (
func TestParams(t *testing.T) {
ctx := context.Background()
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -12,14 +12,14 @@ func TestNewKisPool(t *testing.T) {
ctx := context.Background()
// 1. 创建2个KisFunction配置实例
// 1. Create 2 KisFunction configuration instances
source1 := config.KisSource{
Name: "公众号抖音商城户订单数据",
Name: "TickTokOrder",
Must: []string{"order_id", "user_id"},
}
source2 := config.KisSource{
Name: "用户订单错误率",
Name: "UserOrderErrorRate",
Must: []string{"order_id", "user_id"},
}
@ -33,13 +33,13 @@ func TestNewKisPool(t *testing.T) {
panic("myFuncConfig4 is nil")
}
// 2. 创建一个 KisFlow 配置实例
// 2. Create a KisFlow configuration instance
myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable)
// 3. 创建一个KisFlow对象
// 3. Create a KisFlow object
flow1 := flow.NewKisFlow(myFlowConfig1)
// 4. 拼接Functioin 到 Flow 上
// 4. Link Functions to Flow
if err := flow1.Link(myFuncConfig1, nil); err != nil {
panic(err)
}
@ -47,12 +47,12 @@ func TestNewKisPool(t *testing.T) {
panic(err)
}
// 5. 提交原始数据
// 5. Commit raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 6. 执行flow1
// 6. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}

View File

@ -2,7 +2,7 @@ kistype: func
fname: abortFunc
fmode: Calculate
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -2,6 +2,6 @@ kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: 学生平均分
name: StudentAverageScore
must:
- stu_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
name: TiktokOrder
must:
- order_id
- user_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: noResultFunc
fmode: Calculate
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -2,6 +2,6 @@ kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: 学生平均分
name: StudentAverageScore
must:
- stu_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -2,7 +2,7 @@ kistype: func
fname: jumpFunc
fmode: Calculate
source:
name: 用户订单错误率
name: UserOrderErrorRate
must:
- order_id
- user_id

View File

@ -1,8 +1,8 @@
#kistype Global为kisflow的全局配置
# kistype Global is the global configuration for kisflow
kistype: global
#是否启动prometheus监控
# Whether to enable prometheus monitoring
prometheus_enable: true
#是否需要kisflow单独启动端口监听
# Whether kisflow needs to listen on a separate port
prometheus_listen: true
#prometheus取点监听地址
# Prometheus scrape address
prometheus_serve: 0.0.0.0:20004