mirror of
https://github.com/aceld/kis-flow.git
synced 2025-02-02 23:38:39 +08:00
add mertics server
This commit is contained in:
parent
0e269e17ae
commit
765bd4a32e
@ -64,3 +64,11 @@ const (
|
|||||||
// DefaultExpiration 默认GoCahce时间 ,永久保存
|
// DefaultExpiration 默认GoCahce时间 ,永久保存
|
||||||
DefaultExpiration time.Duration = 0
|
DefaultExpiration time.Duration = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// metrics
|
||||||
|
const (
|
||||||
|
METRICS_ROUTE string = "/metrics"
|
||||||
|
|
||||||
|
COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
|
||||||
|
COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
|
||||||
|
)
|
||||||
|
15
config/kis_global_config.go
Normal file
15
config/kis_global_config.go
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
type KisGlobalConfig struct {
|
||||||
|
//kistype Global为kisflow的全局配置
|
||||||
|
KisType string `yaml:"kistype"`
|
||||||
|
//是否启动prometheus监控
|
||||||
|
EnableProm bool `yaml:"prometheus_enable"`
|
||||||
|
//是否需要kisflow单独启动端口监听
|
||||||
|
PrometheusListen bool `yaml:"prometheus_listen"`
|
||||||
|
//prometheus取点监听地址
|
||||||
|
PrometheusServe string `yaml:"prometheus_serve"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GlobalConfig 默认全局配置,全部均为关闭
|
||||||
|
var GlobalConfig = new(KisGlobalConfig)
|
@ -9,6 +9,7 @@ import (
|
|||||||
"kis-flow/config"
|
"kis-flow/config"
|
||||||
"kis-flow/flow"
|
"kis-flow/flow"
|
||||||
"kis-flow/kis"
|
"kis-flow/kis"
|
||||||
|
"kis-flow/metrics"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -62,7 +63,7 @@ func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisT
|
|||||||
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
|
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
|
||||||
conn := new(config.KisConnConfig)
|
conn := new(config.KisConnConfig)
|
||||||
if ok := yaml.Unmarshal(confData, conn); ok != nil {
|
if ok := yaml.Unmarshal(confData, conn); ok != nil {
|
||||||
return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
|
return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := all.Conns[conn.CName]; ok {
|
if _, ok := all.Conns[conn.CName]; ok {
|
||||||
@ -75,6 +76,19 @@ func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisT
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// kisTypeGlobalConfigure 解析Global配置文件,yaml格式
|
||||||
|
func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
|
||||||
|
// 全局配置
|
||||||
|
if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil {
|
||||||
|
return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 启动Metrics服务
|
||||||
|
metrics.RunMetrics()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中
|
// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中
|
||||||
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
|
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
|
||||||
|
|
||||||
@ -117,6 +131,9 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
|
|||||||
case common.KisIdTypeConnnector:
|
case common.KisIdTypeConnnector:
|
||||||
return kisTypeConnConfigure(all, confData, filePath, kisType)
|
return kisTypeConnConfigure(all, confData, filePath, kisType)
|
||||||
|
|
||||||
|
case common.KisIdTypeGlobal:
|
||||||
|
return kisTypeGlobalConfigure(confData, filePath, kisType)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
|
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"kis-flow/common"
|
"kis-flow/common"
|
||||||
"kis-flow/config"
|
"kis-flow/config"
|
||||||
"kis-flow/log"
|
"kis-flow/log"
|
||||||
|
"kis-flow/metrics"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -47,6 +48,12 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error {
|
|||||||
// 清空缓冲Buf
|
// 清空缓冲Buf
|
||||||
flow.buffer = flow.buffer[0:0]
|
flow.buffer = flow.buffer[0:0]
|
||||||
|
|
||||||
|
// 首次提交数据源数据,进行统计数据总量
|
||||||
|
if config.GlobalConfig.EnableProm == true {
|
||||||
|
// 统计数据总量 Metrics.DataTota 指标累计加1
|
||||||
|
metrics.Metrics.DataTotal.Add(float64(dataCnt))
|
||||||
|
}
|
||||||
|
|
||||||
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
17
go.mod
17
go.mod
@ -4,6 +4,19 @@ go 1.18
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/google/uuid v1.5.0
|
github.com/google/uuid v1.5.0
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
github.com/prometheus/client_golang v1.14.0
|
||||||
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
|
github.com/golang/protobuf v1.5.2 // indirect
|
||||||
|
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||||
|
github.com/prometheus/client_model v0.3.0 // indirect
|
||||||
|
github.com/prometheus/common v0.37.0 // indirect
|
||||||
|
github.com/prometheus/procfs v0.8.0 // indirect
|
||||||
|
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||||
|
google.golang.org/protobuf v1.28.1 // indirect
|
||||||
)
|
)
|
||||||
|
57
metrics/kis_metrics.go
Normal file
57
metrics/kis_metrics.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
"kis-flow/common"
|
||||||
|
"kis-flow/config"
|
||||||
|
"kis-flow/log"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// kisMetrics kisFlow的Prometheus监控指标
|
||||||
|
type kisMetrics struct {
|
||||||
|
//数据数量总量
|
||||||
|
DataTotal prometheus.Counter
|
||||||
|
}
|
||||||
|
|
||||||
|
var Metrics *kisMetrics
|
||||||
|
|
||||||
|
// RunMetricsService 启动Prometheus监控服务
|
||||||
|
func RunMetricsService(serverAddr string) error {
|
||||||
|
|
||||||
|
// 注册Prometheus 监控路由路径
|
||||||
|
http.Handle(common.METRICS_ROUTE, promhttp.Handler())
|
||||||
|
|
||||||
|
// 启动HttpServer
|
||||||
|
err := http.ListenAndServe(serverAddr, nil) //多个进程不可监听同一个端口
|
||||||
|
if err != nil {
|
||||||
|
log.Logger().ErrorF("RunMetricsService err = %s\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitMetrics() {
|
||||||
|
Metrics = new(kisMetrics)
|
||||||
|
|
||||||
|
// DataTotal初始化Counter
|
||||||
|
Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
|
||||||
|
Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
|
||||||
|
})
|
||||||
|
|
||||||
|
// 注册Metrics
|
||||||
|
prometheus.MustRegister(Metrics.DataTotal)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunMetrics 启动Prometheus指标服务
|
||||||
|
func RunMetrics() {
|
||||||
|
// 初始化Prometheus指标
|
||||||
|
InitMetrics()
|
||||||
|
|
||||||
|
if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true {
|
||||||
|
// 启动Prometheus指标Metrics服务
|
||||||
|
go RunMetricsService(config.GlobalConfig.PrometheusServe)
|
||||||
|
}
|
||||||
|
}
|
50
test/kis_metrics_test.go
Normal file
50
test/kis_metrics_test.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"kis-flow/common"
|
||||||
|
"kis-flow/file"
|
||||||
|
"kis-flow/kis"
|
||||||
|
"kis-flow/test/caas"
|
||||||
|
"kis-flow/test/faas"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMetricsDataTotal(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// 0. 注册Function 回调业务
|
||||||
|
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
|
||||||
|
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
|
||||||
|
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
|
||||||
|
|
||||||
|
// 0. 注册ConnectorInit 和 Connector 回调业务
|
||||||
|
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
|
||||||
|
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
|
||||||
|
|
||||||
|
// 1. 加载配置文件并构建Flow
|
||||||
|
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 获取Flow
|
||||||
|
flow1 := kis.Pool().GetFlow("flowName1")
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
|
||||||
|
for n < 10 {
|
||||||
|
// 3. 提交原始数据
|
||||||
|
_ = flow1.CommitRow("This is Data1 from Test")
|
||||||
|
|
||||||
|
// 4. 执行flow1
|
||||||
|
if err := flow1.Run(ctx); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
|
||||||
|
select {}
|
||||||
|
}
|
8
test/load_conf/kis-flow.yml
Normal file
8
test/load_conf/kis-flow.yml
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
#kistype Global为kisflow的全局配置
|
||||||
|
kistype: global
|
||||||
|
#是否启动prometheus监控
|
||||||
|
prometheus_enable: true
|
||||||
|
#是否需要kisflow单独启动端口监听
|
||||||
|
prometheus_listen: true
|
||||||
|
#prometheus取点监听地址
|
||||||
|
prometheus_serve: 0.0.0.0:20004
|
14
test/prometheus_server_test.go
Normal file
14
test/prometheus_server_test.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"kis-flow/metrics"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPrometheusServer(t *testing.T) {
|
||||||
|
|
||||||
|
err := metrics.RunMetricsService("0.0.0.0:20004")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user