mirror of
https://github.com/aceld/kis-flow.git
synced 2025-02-02 15:28:38 +08:00
commit
fa62729026
@ -69,6 +69,26 @@ const (
|
||||
const (
|
||||
METRICS_ROUTE string = "/metrics"
|
||||
|
||||
LABEL_FLOW_NAME string = "flow_name"
|
||||
LABEL_FLOW_ID string = "flow_id"
|
||||
LABEL_FUNCTION_NAME string = "func_name"
|
||||
LABEL_FUNCTION_MODE string = "func_mode"
|
||||
|
||||
COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
|
||||
COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
|
||||
|
||||
GANGE_FLOW_DATA_TOTAL_NAME string = "flow_data_total"
|
||||
GANGE_FLOW_DATA_TOTAL_HELP string = "KisFlow各个FlowID数据流的数据数量总量"
|
||||
|
||||
GANGE_FLOW_SCHE_CNTS_NAME string = "flow_schedule_cnts"
|
||||
GANGE_FLOW_SCHE_CNTS_HELP string = "KisFlow各个FlowID被调度的次数"
|
||||
|
||||
GANGE_FUNC_SCHE_CNTS_NAME string = "func_schedule_cnts"
|
||||
GANGE_FUNC_SCHE_CNTS_HELP string = "KisFlow各个Function被调度的次数"
|
||||
|
||||
HISTOGRAM_FUNCTION_DURATION_NAME string = "func_run_duration"
|
||||
HISTOGRAM_FUNCTION_DURATION_HELP string = "Function执行耗时"
|
||||
|
||||
HISTOGRAM_FLOW_DURATION_NAME string = "flow_run_duration"
|
||||
HISTOGRAM_FLOW_DURATION_HELP string = "Flow执行耗时"
|
||||
)
|
||||
|
@ -3,6 +3,7 @@ package flow
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"kis-flow/common"
|
||||
"kis-flow/config"
|
||||
"kis-flow/conn"
|
||||
@ -10,6 +11,7 @@ import (
|
||||
"kis-flow/id"
|
||||
"kis-flow/kis"
|
||||
"kis-flow/log"
|
||||
"kis-flow/metrics"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -198,6 +200,10 @@ func (flow *KisFlow) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Metrics
|
||||
var funcStart time.Time
|
||||
var flowStart time.Time
|
||||
|
||||
// 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
|
||||
flow.PrevFunctionId = common.FunctionIdFirstVirtual
|
||||
|
||||
@ -206,6 +212,14 @@ func (flow *KisFlow) Run(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Metrics
|
||||
if config.GlobalConfig.EnableProm == true {
|
||||
// 统计Flow的调度次数
|
||||
metrics.Metrics.FlowScheduleCntsToTal.WithLabelValues(flow.Name).Inc()
|
||||
// 统计Flow的执行消耗时长
|
||||
flowStart = time.Now()
|
||||
}
|
||||
|
||||
// 流式链式调用
|
||||
for fn != nil && flow.abort == false {
|
||||
|
||||
@ -214,6 +228,17 @@ func (flow *KisFlow) Run(ctx context.Context) error {
|
||||
flow.ThisFunction = fn
|
||||
flow.ThisFunctionId = fid
|
||||
|
||||
fName := fn.GetConfig().FName
|
||||
fMode := fn.GetConfig().FMode
|
||||
|
||||
if config.GlobalConfig.EnableProm == true {
|
||||
// 统计Function调度次数
|
||||
metrics.Metrics.FuncScheduleCntsTotal.WithLabelValues(fName, fMode).Inc()
|
||||
|
||||
// 统计Function 耗时 记录开始时间
|
||||
funcStart = time.Now()
|
||||
}
|
||||
|
||||
// 得到当前Function要处理与的源数据
|
||||
if inputData, err := flow.getCurData(); err != nil {
|
||||
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
|
||||
@ -231,7 +256,27 @@ func (flow *KisFlow) Run(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 统计Function 耗时
|
||||
if config.GlobalConfig.EnableProm == true {
|
||||
// Function消耗时间
|
||||
duration := time.Since(funcStart)
|
||||
|
||||
// 统计当前Function统计指标,做时间统计
|
||||
metrics.Metrics.FunctionDuration.With(
|
||||
prometheus.Labels{
|
||||
common.LABEL_FUNCTION_NAME: fName,
|
||||
common.LABEL_FUNCTION_MODE: fMode}).Observe(duration.Seconds() * 1000)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Metrics
|
||||
if config.GlobalConfig.EnableProm == true {
|
||||
// 统计Flow执行耗时
|
||||
duration := time.Since(flowStart)
|
||||
metrics.Metrics.FlowDuration.WithLabelValues(flow.Name).Observe(duration.Seconds() * 1000)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -52,6 +52,9 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error {
|
||||
if config.GlobalConfig.EnableProm == true {
|
||||
// 统计数据总量 Metrics.DataTota 指标累计加1
|
||||
metrics.Metrics.DataTotal.Add(float64(dataCnt))
|
||||
|
||||
//统计当前Flow数量指标
|
||||
metrics.Metrics.FlowDataTotal.WithLabelValues(flow.Name).Add(float64(dataCnt))
|
||||
}
|
||||
|
||||
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
||||
|
@ -11,8 +11,18 @@ import (
|
||||
|
||||
// kisMetrics kisFlow的Prometheus监控指标
|
||||
type kisMetrics struct {
|
||||
//数据数量总量
|
||||
// 数据数量总量
|
||||
DataTotal prometheus.Counter
|
||||
// 各Flow处理数据总量
|
||||
FlowDataTotal *prometheus.GaugeVec
|
||||
// Flow被调度次数
|
||||
FlowScheduleCntsToTal *prometheus.GaugeVec
|
||||
// Function被调度次数
|
||||
FuncScheduleCntsTotal *prometheus.GaugeVec
|
||||
// Function执行时间
|
||||
FunctionDuration *prometheus.HistogramVec
|
||||
// Flow执行时间
|
||||
FlowDuration *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
var Metrics *kisMetrics
|
||||
@ -41,8 +51,62 @@ func InitMetrics() {
|
||||
Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
|
||||
})
|
||||
|
||||
// FlowDataTotal初始化GaugeVec
|
||||
Metrics.FlowDataTotal = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: common.GANGE_FLOW_DATA_TOTAL_NAME,
|
||||
Help: common.GANGE_FLOW_DATA_TOTAL_HELP,
|
||||
},
|
||||
// 标签名称
|
||||
[]string{common.LABEL_FLOW_NAME},
|
||||
)
|
||||
|
||||
// FlowScheduleCntsToTal初始化GaugeVec
|
||||
Metrics.FlowScheduleCntsToTal = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: common.GANGE_FLOW_SCHE_CNTS_NAME,
|
||||
Help: common.GANGE_FLOW_SCHE_CNTS_HELP,
|
||||
},
|
||||
//标签名称
|
||||
[]string{common.LABEL_FLOW_NAME},
|
||||
)
|
||||
|
||||
// FuncScheduleCntsTotal初始化GaugeVec
|
||||
Metrics.FuncScheduleCntsTotal = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: common.GANGE_FUNC_SCHE_CNTS_NAME,
|
||||
Help: common.GANGE_FUNC_SCHE_CNTS_HELP,
|
||||
},
|
||||
//标签名称
|
||||
[]string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
|
||||
)
|
||||
|
||||
// FunctionDuration初始化HistogramVec
|
||||
Metrics.FunctionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: common.HISTOGRAM_FUNCTION_DURATION_NAME,
|
||||
Help: common.HISTOGRAM_FUNCTION_DURATION_HELP,
|
||||
Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000}, //单位ms,最大半分钟
|
||||
},
|
||||
[]string{common.LABEL_FUNCTION_NAME, common.LABEL_FUNCTION_MODE},
|
||||
)
|
||||
|
||||
// FlowDuration初始化HistogramVec
|
||||
Metrics.FlowDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: common.HISTOGRAM_FLOW_DURATION_NAME,
|
||||
Help: common.HISTOGRAM_FLOW_DURATION_HELP,
|
||||
Buckets: []float64{0.005, 0.01, 0.03, 0.08, 0.1, 0.5, 1.0, 5.0, 10, 100, 1000, 5000, 30000, 60000}, //单位ms,最大1分钟
|
||||
},
|
||||
[]string{common.LABEL_FLOW_NAME},
|
||||
)
|
||||
|
||||
// 注册Metrics
|
||||
prometheus.MustRegister(Metrics.DataTotal)
|
||||
prometheus.MustRegister(Metrics.FlowDataTotal)
|
||||
prometheus.MustRegister(Metrics.FlowScheduleCntsToTal)
|
||||
prometheus.MustRegister(Metrics.FuncScheduleCntsTotal)
|
||||
prometheus.MustRegister(Metrics.FunctionDuration)
|
||||
prometheus.MustRegister(Metrics.FlowDuration)
|
||||
}
|
||||
|
||||
// RunMetrics 启动Prometheus指标服务
|
||||
|
Loading…
Reference in New Issue
Block a user