From 765bd4a32e6e4a5e642c732464ef847d3ad48c77 Mon Sep 17 00:00:00 2001 From: aceld Date: Mon, 4 Mar 2024 14:53:29 +0800 Subject: [PATCH] add mertics server --- common/const.go | 8 +++++ config/kis_global_config.go | 15 +++++++++ file/config_import.go | 19 +++++++++++- flow/kis_flow_data.go | 7 +++++ go.mod | 17 ++++++++-- metrics/kis_metrics.go | 57 ++++++++++++++++++++++++++++++++++ test/kis_metrics_test.go | 50 +++++++++++++++++++++++++++++ test/load_conf/kis-flow.yml | 8 +++++ test/prometheus_server_test.go | 14 +++++++++ 9 files changed, 192 insertions(+), 3 deletions(-) create mode 100644 config/kis_global_config.go create mode 100644 metrics/kis_metrics.go create mode 100644 test/kis_metrics_test.go create mode 100644 test/load_conf/kis-flow.yml create mode 100644 test/prometheus_server_test.go diff --git a/common/const.go b/common/const.go index d9d2901..24f63f6 100644 --- a/common/const.go +++ b/common/const.go @@ -64,3 +64,11 @@ const ( // DefaultExpiration 默认GoCahce时间 ,永久保存 DefaultExpiration time.Duration = 0 ) + +// metrics +const ( + METRICS_ROUTE string = "/metrics" + + COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" + COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量" +) diff --git a/config/kis_global_config.go b/config/kis_global_config.go new file mode 100644 index 0000000..8220967 --- /dev/null +++ b/config/kis_global_config.go @@ -0,0 +1,15 @@ +package config + +type KisGlobalConfig struct { + //kistype Global为kisflow的全局配置 + KisType string `yaml:"kistype"` + //是否启动prometheus监控 + EnableProm bool `yaml:"prometheus_enable"` + //是否需要kisflow单独启动端口监听 + PrometheusListen bool `yaml:"prometheus_listen"` + //prometheus取点监听地址 + PrometheusServe string `yaml:"prometheus_serve"` +} + +// GlobalConfig 默认全局配置,全部均为关闭 +var GlobalConfig = new(KisGlobalConfig) diff --git a/file/config_import.go b/file/config_import.go index ec4ebc6..4dc6aa1 100644 --- a/file/config_import.go +++ b/file/config_import.go @@ -9,6 +9,7 @@ import ( "kis-flow/config" "kis-flow/flow" "kis-flow/kis" + "kis-flow/metrics" "os" "path" "path/filepath" @@ -62,7 +63,7 @@ func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisT func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { conn := new(config.KisConnConfig) if ok := yaml.Unmarshal(confData, conn); ok != nil { - return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType)) + return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType)) } if _, ok := all.Conns[conn.CName]; ok { @@ -75,6 +76,19 @@ func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisT return nil } +// kisTypeGlobalConfigure 解析Global配置文件,yaml格式 +func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error { + // 全局配置 + if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil { + return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType)) + } + + // 启动Metrics服务 + metrics.RunMetrics() + + return nil +} + // parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中 func parseConfigWalkYaml(loadPath string) (*allConfig, error) { @@ -117,6 +131,9 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) { case common.KisIdTypeConnnector: return kisTypeConnConfigure(all, confData, filePath, kisType) + case common.KisIdTypeGlobal: + return kisTypeGlobalConfigure(confData, filePath, kisType) + default: return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType)) } diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go index 0dffdcb..2974b31 100644 --- a/flow/kis_flow_data.go +++ b/flow/kis_flow_data.go @@ -8,6 +8,7 @@ import ( "kis-flow/common" "kis-flow/config" "kis-flow/log" + "kis-flow/metrics" "time" ) @@ -47,6 +48,12 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error { // 清空缓冲Buf flow.buffer = flow.buffer[0:0] + // 首次提交数据源数据,进行统计数据总量 + if config.GlobalConfig.EnableProm == true { + // 统计数据总量 Metrics.DataTota 指标累计加1 + metrics.Metrics.DataTotal.Add(float64(dataCnt)) + } + log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil diff --git a/go.mod b/go.mod index 6231234..8d94937 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,19 @@ go 1.18 require ( github.com/google/uuid v1.5.0 - github.com/patrickmn/go-cache v2.1.0+incompatible // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/prometheus/client_golang v1.14.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.37.0 // indirect + github.com/prometheus/procfs v0.8.0 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/metrics/kis_metrics.go b/metrics/kis_metrics.go new file mode 100644 index 0000000..b7c202b --- /dev/null +++ b/metrics/kis_metrics.go @@ -0,0 +1,57 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "kis-flow/common" + "kis-flow/config" + "kis-flow/log" + "net/http" +) + +// kisMetrics kisFlow的Prometheus监控指标 +type kisMetrics struct { + //数据数量总量 + DataTotal prometheus.Counter +} + +var Metrics *kisMetrics + +// RunMetricsService 启动Prometheus监控服务 +func RunMetricsService(serverAddr string) error { + + // 注册Prometheus 监控路由路径 + http.Handle(common.METRICS_ROUTE, promhttp.Handler()) + + // 启动HttpServer + err := http.ListenAndServe(serverAddr, nil) //多个进程不可监听同一个端口 + if err != nil { + log.Logger().ErrorF("RunMetricsService err = %s\n", err) + } + + return err +} + +func InitMetrics() { + Metrics = new(kisMetrics) + + // DataTotal初始化Counter + Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME, + Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, + }) + + // 注册Metrics + prometheus.MustRegister(Metrics.DataTotal) +} + +// RunMetrics 启动Prometheus指标服务 +func RunMetrics() { + // 初始化Prometheus指标 + InitMetrics() + + if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true { + // 启动Prometheus指标Metrics服务 + go RunMetricsService(config.GlobalConfig.PrometheusServe) + } +} diff --git a/test/kis_metrics_test.go b/test/kis_metrics_test.go new file mode 100644 index 0000000..9cbe41c --- /dev/null +++ b/test/kis_metrics_test.go @@ -0,0 +1,50 @@ +package test + +import ( + "context" + "kis-flow/common" + "kis-flow/file" + "kis-flow/kis" + "kis-flow/test/caas" + "kis-flow/test/faas" + "testing" + "time" +) + +func TestMetricsDataTotal(t *testing.T) { + ctx := context.Background() + + // 0. 注册Function 回调业务 + kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) + kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) + kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) + + // 0. 注册ConnectorInit 和 Connector 回调业务 + kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) + kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) + + // 1. 加载配置文件并构建Flow + if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { + panic(err) + } + + // 2. 获取Flow + flow1 := kis.Pool().GetFlow("flowName1") + + n := 0 + + for n < 10 { + // 3. 提交原始数据 + _ = flow1.CommitRow("This is Data1 from Test") + + // 4. 执行flow1 + if err := flow1.Run(ctx); err != nil { + panic(err) + } + + time.Sleep(1 * time.Second) + n++ + } + + select {} +} diff --git a/test/load_conf/kis-flow.yml b/test/load_conf/kis-flow.yml new file mode 100644 index 0000000..5eb695d --- /dev/null +++ b/test/load_conf/kis-flow.yml @@ -0,0 +1,8 @@ +#kistype Global为kisflow的全局配置 +kistype: global +#是否启动prometheus监控 +prometheus_enable: true +#是否需要kisflow单独启动端口监听 +prometheus_listen: true +#prometheus取点监听地址 +prometheus_serve: 0.0.0.0:20004 \ No newline at end of file diff --git a/test/prometheus_server_test.go b/test/prometheus_server_test.go new file mode 100644 index 0000000..223119e --- /dev/null +++ b/test/prometheus_server_test.go @@ -0,0 +1,14 @@ +package test + +import ( + "kis-flow/metrics" + "testing" +) + +func TestPrometheusServer(t *testing.T) { + + err := metrics.RunMetricsService("0.0.0.0:20004") + if err != nil { + panic(err) + } +}