Merge pull request #4 from aceld/feature/aceld

Feature/aceld
This commit is contained in:
刘丹冰 2024-03-04 17:10:13 +08:00 committed by GitHub
commit 52c53bf43e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 192 additions and 3 deletions

View File

@ -64,3 +64,11 @@ const (
// DefaultExpiration 默认GoCahce时间 ,永久保存
DefaultExpiration time.Duration = 0
)
// metrics
const (
METRICS_ROUTE string = "/metrics"
COUNTER_KISFLOW_DATA_TOTAL_NAME string = "kisflow_data_total"
COUNTER_KISFLOW_DATA_TOTAL_HELP string = "KisFlow全部Flow的数据总量"
)

View File

@ -0,0 +1,15 @@
package config
type KisGlobalConfig struct {
//kistype Global为kisflow的全局配置
KisType string `yaml:"kistype"`
//是否启动prometheus监控
EnableProm bool `yaml:"prometheus_enable"`
//是否需要kisflow单独启动端口监听
PrometheusListen bool `yaml:"prometheus_listen"`
//prometheus取点监听地址
PrometheusServe string `yaml:"prometheus_serve"`
}
// GlobalConfig 默认全局配置,全部均为关闭
var GlobalConfig = new(KisGlobalConfig)

View File

@ -7,6 +7,7 @@ import (
"kis-flow/config"
"kis-flow/flow"
"kis-flow/kis"
"kis-flow/metrics"
"os"
"path"
"path/filepath"
@ -62,7 +63,7 @@ func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisT
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
conn := new(config.KisConnConfig)
if ok := yaml.Unmarshal(confData, conn); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
}
if _, ok := all.Conns[conn.CName]; ok {
@ -75,6 +76,19 @@ func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisT
return nil
}
// kisTypeGlobalConfigure 解析Global配置文件yaml格式
func kisTypeGlobalConfigure(confData []byte, fileName string, kisType interface{}) error {
// 全局配置
if ok := yaml.Unmarshal(confData, config.GlobalConfig); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format kisType = %s", fileName, kisType))
}
// 启动Metrics服务
metrics.RunMetrics()
return nil
}
// parseConfigWalkYaml 全盘解析配置文件yaml格式, 讲配置信息解析到allConfig中
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
@ -117,6 +131,9 @@ func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
case common.KisIdTypeConnector:
return kisTypeConnConfigure(all, confData, filePath, kisType)
case common.KisIdTypeGlobal:
return kisTypeGlobalConfigure(confData, filePath, kisType)
default:
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
}

View File

@ -8,6 +8,7 @@ import (
"kis-flow/common"
"kis-flow/config"
"kis-flow/log"
"kis-flow/metrics"
"time"
)
@ -47,6 +48,12 @@ func (flow *KisFlow) commitSrcData(ctx context.Context) error {
// 清空缓冲Buf
flow.buffer = flow.buffer[0:0]
// 首次提交数据源数据,进行统计数据总量
if config.GlobalConfig.EnableProm == true {
// 统计数据总量 Metrics.DataTota 指标累计加1
metrics.Metrics.DataTotal.Add(float64(dataCnt))
}
log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil

17
go.mod
View File

@ -4,6 +4,19 @@ go 1.18
require (
github.com/google/uuid v1.5.0
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.14.0
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
google.golang.org/protobuf v1.28.1 // indirect
)

57
metrics/kis_metrics.go Normal file
View File

@ -0,0 +1,57 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"kis-flow/common"
"kis-flow/config"
"kis-flow/log"
"net/http"
)
// kisMetrics kisFlow的Prometheus监控指标
type kisMetrics struct {
//数据数量总量
DataTotal prometheus.Counter
}
var Metrics *kisMetrics
// RunMetricsService 启动Prometheus监控服务
func RunMetricsService(serverAddr string) error {
// 注册Prometheus 监控路由路径
http.Handle(common.METRICS_ROUTE, promhttp.Handler())
// 启动HttpServer
err := http.ListenAndServe(serverAddr, nil) //多个进程不可监听同一个端口
if err != nil {
log.Logger().ErrorF("RunMetricsService err = %s\n", err)
}
return err
}
func InitMetrics() {
Metrics = new(kisMetrics)
// DataTotal初始化Counter
Metrics.DataTotal = prometheus.NewCounter(prometheus.CounterOpts{
Name: common.COUNTER_KISFLOW_DATA_TOTAL_NAME,
Help: common.COUNTER_KISFLOW_DATA_TOTAL_HELP,
})
// 注册Metrics
prometheus.MustRegister(Metrics.DataTotal)
}
// RunMetrics 启动Prometheus指标服务
func RunMetrics() {
// 初始化Prometheus指标
InitMetrics()
if config.GlobalConfig.EnableProm == true && config.GlobalConfig.PrometheusListen == true {
// 启动Prometheus指标Metrics服务
go RunMetricsService(config.GlobalConfig.PrometheusServe)
}
}

50
test/kis_metrics_test.go Normal file
View File

@ -0,0 +1,50 @@
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
"time"
)
func TestMetricsDataTotal(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
n := 0
for n < 10 {
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
time.Sleep(1 * time.Second)
n++
}
select {}
}

View File

@ -0,0 +1,8 @@
#kistype Global为kisflow的全局配置
kistype: global
#是否启动prometheus监控
prometheus_enable: true
#是否需要kisflow单独启动端口监听
prometheus_listen: true
#prometheus取点监听地址
prometheus_serve: 0.0.0.0:20004

View File

@ -0,0 +1,14 @@
package test
import (
"kis-flow/metrics"
"testing"
)
func TestPrometheusServer(t *testing.T) {
err := metrics.RunMetricsService("0.0.0.0:20004")
if err != nil {
panic(err)
}
}