From 665e4342f015bacf615f288a3f702716581036ee Mon Sep 17 00:00:00 2001 From: aceld Date: Mon, 18 Mar 2024 09:29:05 +0800 Subject: [PATCH] add prometheus metrics --- common/const.go | 20 +++++++++++++ flow/kis_flow.go | 45 ++++++++++++++++++++++++++++ flow/kis_flow_data.go | 3 ++ metrics/kis_metrics.go | 66 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 133 insertions(+), 1 deletion(-) diff --git a/common/const.go b/common/const.go index a693993..9fcaa13 100644 --- a/common/const.go +++ b/common/const.go @@ -69,6 +69,26 @@ const ( const ( METRICS_ROUTE string = "/metrics" + LABEL_FLOW_NAME string = "flow_name" + LABEL_FLOW_ID string = "flow_id" + LABEL_FUNCTION_NAME string = "func_name" + LABEL_FUNCTION_MODE string = "func_mode" + COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total" COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量" + + GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total" + GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量" + + GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts" + GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数" + + GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts" + GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数" + + HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration" + HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时" + + HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration" + HISTOGRAM_FLOW_DURATION_HELP string = "Flow执行耗时" ) diff --git a/flow/kis_flow.go b/flow/kis_flow.go index 3ce0a25..6989c0e 100644 --- a/flow/kis_flow.go +++ b/flow/kis_flow.go @@ -3,6 +3,7 @@ package flow import ( "context" "errors" + "github.com/prometheus/client_golang/prometheus" "kis-flow/common" "kis-flow/config" "kis-flow/conn" @@ -10,6 +11,7 @@ import ( "kis-flow/id" "kis-flow/kis" "kis-flow/log" + "kis-flow/metrics" "sync" "time" @@ -198,6 +200,10 @@ func (flow *KisFlow) Run(ctx context.Context) error { return nil } + // Metrics + var funcStart time.Time + var flowStart time.Time + // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function flow.PrevFunctionId = common.FunctionIdFirstVirtual @@ -206,6 +212,14 @@ func (flow *KisFlow) Run(ctx context.Context) error { return err } + // Metrics + if config.GlobalConfig.EnableProm == true { + // 统计Flow的调度次数 + metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc() + // 统计Flow的执行消耗时长 + flowStart = time.Now() + } + // 流式链式调用 for fn != nil && flow.abort == false { @@ -214,6 +228,17 @@ func (flow *KisFlow) Run(ctx context.Context) error { flow.ThisFunction = fn flow.ThisFunctionId = fid + fName := fn.GetConfig().FName + fMode := fn.GetConfig().FMode + + if config.GlobalConfig.EnableProm == true { + // 统计Function调度次数 + metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc() + + // 统计Function 耗时 记录开始时间 + funcStart = time.Now() + } + // 得到当前Function要处理与的源数据 if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) @@ -231,9 +256,29 @@ func (flow *KisFlow) Run(ctx context.Context) error { if err != nil { return err } + + // 统计Function 耗时 + if config.GlobalConfig.EnableProm == true { + // Function消耗时间 + duration := time.Since(funcStart) + + // 统计当前Function统计指标,做时间统计 + metrics.Metrics.FunctionDuration.With( + prometheus.Labels{ + common.LABEL_FUNCTION_NAME: fName, + common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000) + } + } } + // Metrics + if config.GlobalConfig.EnableProm == true { + // 统计Flow执行耗时 + duration := time.Since(flowStart) + metrics.Metrics.FlowDuration.WithLabelValues(flow.Name).Observe(duration.Seconds() * 1000) + } + return nil } diff --git a/flow/kis_flow_data.go b/flow/kis_flow_data.go index 2974b31..70d1f0a 100644 --- a/flow/kis_flow_data.go +++ b/flow/kis_flow_data.go @@ -52,6 +52,9 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error { if config.GlobalConfig.EnableProm == true { // 统计数据总量 Metrics.DataTota 指标累计加1 metrics.Metrics.DataTotal.Add(float64(dataCnt)) + + //统计当前Flow数量指标 + metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt)) } log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) diff --git a/metrics/kis_metrics.go b/metrics/kis_metrics.go index b7c202b..50faa18 100644 --- a/metrics/kis_metrics.go +++ b/metrics/kis_metrics.go @@ -11,8 +11,18 @@ import ( // kisMetrics kisFlow的Prometheus监控指标 type kisMetrics struct { - //数据数量总量 + // 数据数量总量 DataTotal prometheus.Counter + // 各Flow处理数据总量 + FlowDataTotal *prometheus.GaugeVec + // Flow被调度次数 + FlowScheduleCntsToTal *prometheus.GaugeVec + // Function被调度次数 + FuncScheduleCntsTotal *prometheus.GaugeVec + // Function执行时间 + FunctionDuration *prometheus.HistogramVec + // Flow执行时间 + FlowDuration *prometheus.HistogramVec } var Metrics *kisMetrics @@ -41,8 +51,62 @@ func InitMetrics() { Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP, }) + // FlowDataTotal初始化GaugeVec + Metrics.FlowDataTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: common.GANGE_FLOW_DATA_TOTAL_NAME, + Help: common.GANGE_FLOW_DATA_TOTAL_HELP, + }, + // 标签名称 + []string{common.LABEL_FLOW_NAME}, + ) + + // FlowScheduleCntsToTal初始化GaugeVec + Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: common.GANGE_FLOW_SCHE_CNTS_NAME, + Help: common.GANGE_FLOW_SCHE_CNTS_HELP, + }, + //标签名称 + []string{common.LABEL_FLOW_NAME}, + ) + + // FuncScheduleCntsTotal初始化GaugeVec + Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: common.GANGE_FUNC_SCHE_CNTS_NAME, + Help: common.GANGE_FUNC_SCHE_CNTS_HELP, + }, + //标签名称 + []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, + ) + + // FunctionDuration初始化HistogramVec + Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: common.HISTOGRAM_FUNCTION_DURATION_NAME, + Help: common.HISTOGRAM_FUNCTION_DURATION_HELP, + Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, //单位ms,最大半分钟 + }, + []string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE}, + ) + + // FlowDuration初始化HistogramVec + Metrics.FlowDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: common.HISTOGRAM_FLOW_DURATION_NAME, + Help: common.HISTOGRAM_FLOW_DURATION_HELP, + Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000, 60000}, //单位ms,最大1分钟 + }, + []string{common.LABEL_FLOW_NAME}, + ) + // 注册Metrics prometheus.MustRegister(Metrics.DataTotal) + prometheus.MustRegister(Metrics.FlowDataTotal) + prometheus.MustRegister(Metrics.FlowScheduleCntsToTal) + prometheus.MustRegister(Metrics.FuncScheduleCntsTotal) + prometheus.MustRegister(Metrics.FunctionDuration) + prometheus.MustRegister(Metrics.FlowDuration) } // RunMetrics 启动Prometheus指标服务