chore: refactor config center (#4339)

Signed-off-by: kevin <wanjunfeng@gmail.com>
This commit is contained in:
Kevin Wan 2024-08-28 20:02:48 +08:00 committed by GitHub
parent 44cddec5c3
commit 24d6150073
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 90 additions and 93 deletions

View File

@ -15,8 +15,8 @@ import (
) )
var ( var (
errorEmptyConfig = errors.New("empty config value") errEmptyConfig = errors.New("empty config value")
errorMissUnmarshalerType = errors.New("miss unmarshaler type") errMissingUnmarshalerType = errors.New("missing unmarshaler type")
) )
// Configurator is the interface for configuration center. // Configurator is the interface for configuration center.
@ -32,16 +32,14 @@ type (
Config struct { Config struct {
// Type is the value type, yaml, json or toml. // Type is the value type, yaml, json or toml.
Type string `json:",default=yaml,options=[yaml,json,toml]"` Type string `json:",default=yaml,options=[yaml,json,toml]"`
// Log indicates whether to log the configuration. // Log is the flag to control logging.
Log bool `json:",default=ture"` Log bool `json:",default=true"`
} }
configCenter[T any] struct { configCenter[T any] struct {
conf Config conf Config
unmarshaler LoaderFn unmarshaler LoaderFn
subscriber subscriber.Subscriber subscriber subscriber.Subscriber
listeners []func() listeners []func()
lock sync.Mutex lock sync.Mutex
snapshot atomic.Value snapshot atomic.Value
@ -61,7 +59,6 @@ var _ Configurator[any] = (*configCenter[any])(nil)
func MustNewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) Configurator[T] { func MustNewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) Configurator[T] {
cc, err := NewConfigCenter[T](c, subscriber) cc, err := NewConfigCenter[T](c, subscriber)
logx.Must(err) logx.Must(err)
return cc return cc
} }
@ -76,9 +73,6 @@ func NewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) (Configu
conf: c, conf: c,
unmarshaler: unmarshaler, unmarshaler: unmarshaler,
subscriber: subscriber, subscriber: subscriber,
listeners: nil,
lock: sync.Mutex{},
snapshot: atomic.Value{},
} }
if err := cc.loadConfig(); err != nil { if err := cc.loadConfig(); err != nil {
@ -105,10 +99,10 @@ func (c *configCenter[T]) AddListener(listener func()) {
// GetConfig return structured config. // GetConfig return structured config.
func (c *configCenter[T]) GetConfig() (T, error) { func (c *configCenter[T]) GetConfig() (T, error) {
var r T
v := c.value() v := c.value()
if v == nil || len(v.data) < 1 { if v == nil || len(v.data) == 0 {
return r, errorEmptyConfig var empty T
return empty, errEmptyConfig
} }
return v.marshalData, v.err return v.marshalData, v.err
@ -141,7 +135,9 @@ func (c *configCenter[T]) loadConfig() error {
} }
func (c *configCenter[T]) onChange() { func (c *configCenter[T]) onChange() {
_ = c.loadConfig() if err := c.loadConfig(); err != nil {
return
}
c.lock.Lock() c.lock.Lock()
listeners := make([]func(), len(c.listeners)) listeners := make([]func(), len(c.listeners))
@ -165,40 +161,39 @@ func (c *configCenter[T]) genValue(data string) *value[T] {
v := &value[T]{ v := &value[T]{
data: data, data: data,
} }
if len(data) <= 0 { if len(data) == 0 {
return v return v
} }
t := reflect.TypeOf(v.marshalData) t := reflect.TypeOf(v.marshalData)
// if the type is nil, it means that the user has not set the type of the configuration. // if the type is nil, it means that the user has not set the type of the configuration.
if t == nil { if t == nil {
v.err = errorMissUnmarshalerType v.err = errMissingUnmarshalerType
return v return v
} }
t = mapping.Deref(t) t = mapping.Deref(t)
switch t.Kind() { switch t.Kind() {
case reflect.Struct, reflect.Array, reflect.Slice: case reflect.Struct, reflect.Array, reflect.Slice:
err := c.unmarshaler([]byte(data), &v.marshalData) if err := c.unmarshaler([]byte(data), &v.marshalData); err != nil {
if err != nil {
v.err = err v.err = err
if c.conf.Log { if c.conf.Log {
logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]", err.Error(), data) logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]",
err.Error(), data)
} }
} }
case reflect.String: case reflect.String:
if str, ok := any(data).(T); ok { if str, ok := any(data).(T); ok {
v.marshalData = str v.marshalData = str
} else { } else {
v.err = errorMissUnmarshalerType v.err = errMissingUnmarshalerType
} }
default: default:
if c.conf.Log { if c.conf.Log {
logx.Errorf("ConfigCenter unmarshal configuration missing unmarshaler for type: %s, content [%s]", logx.Errorf("ConfigCenter unmarshal configuration missing unmarshaler for type: %s, content [%s]",
t.Kind(), data) t.Kind(), data)
} }
v.err = errorMissUnmarshalerType v.err = errMissingUnmarshalerType
} }
return v return v

View File

@ -150,7 +150,7 @@ func TestConfigCenter_AddListener(t *testing.T) {
func TestConfigCenter_genValue(t *testing.T) { func TestConfigCenter_genValue(t *testing.T) {
t.Run("data is empty", func(t *testing.T) { t.Run("data is empty", func(t *testing.T) {
c := &configCenter[string]{ c := &configCenter[string]{
unmarshaler: defaultRegistry.unmarshalers["json"], unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true}, conf: Config{Log: true},
} }
v := c.genValue("") v := c.genValue("")
@ -159,25 +159,25 @@ func TestConfigCenter_genValue(t *testing.T) {
t.Run("invalid template type", func(t *testing.T) { t.Run("invalid template type", func(t *testing.T) {
c := &configCenter[any]{ c := &configCenter[any]{
unmarshaler: defaultRegistry.unmarshalers["json"], unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true}, conf: Config{Log: true},
} }
v := c.genValue("xxxx") v := c.genValue("xxxx")
assert.Equal(t, errorMissUnmarshalerType, v.err) assert.Equal(t, errMissingUnmarshalerType, v.err)
}) })
t.Run("unsupported template type", func(t *testing.T) { t.Run("unsupported template type", func(t *testing.T) {
c := &configCenter[int]{ c := &configCenter[int]{
unmarshaler: defaultRegistry.unmarshalers["json"], unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true}, conf: Config{Log: true},
} }
v := c.genValue("1") v := c.genValue("1")
assert.Equal(t, errorMissUnmarshalerType, v.err) assert.Equal(t, errMissingUnmarshalerType, v.err)
}) })
t.Run("supported template string type", func(t *testing.T) { t.Run("supported template string type", func(t *testing.T) {
c := &configCenter[string]{ c := &configCenter[string]{
unmarshaler: defaultRegistry.unmarshalers["json"], unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true}, conf: Config{Log: true},
} }
v := c.genValue("12345") v := c.genValue("12345")
@ -189,7 +189,7 @@ func TestConfigCenter_genValue(t *testing.T) {
c := &configCenter[struct { c := &configCenter[struct {
Name string `json:"name"` Name string `json:"name"`
}]{ }]{
unmarshaler: defaultRegistry.unmarshalers["json"], unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true}, conf: Config{Log: true},
} }
v := c.genValue(`{"name":"new name}`) v := c.genValue(`{"name":"new name}`)
@ -201,7 +201,7 @@ func TestConfigCenter_genValue(t *testing.T) {
c := &configCenter[struct { c := &configCenter[struct {
Name string `json:"name"` Name string `json:"name"`
}]{ }]{
unmarshaler: defaultRegistry.unmarshalers["json"], unmarshaler: registry.unmarshalers["json"],
conf: Config{Log: true}, conf: Config{Log: true},
} }
v := c.genValue(`{"name":"new name"}`) v := c.genValue(`{"name":"new name"}`)

View File

@ -12,30 +12,19 @@ type (
} }
// EtcdConf is the configuration for etcd. // EtcdConf is the configuration for etcd.
EtcdConf discov.EtcdConf EtcdConf = discov.EtcdConf
) )
// MustNewEtcdSubscriber returns an etcd Subscriber, exits on errors. // MustNewEtcdSubscriber returns an etcd Subscriber, exits on errors.
func MustNewEtcdSubscriber(conf EtcdConf) Subscriber { func MustNewEtcdSubscriber(conf EtcdConf) Subscriber {
s, err := NewEtcdSubscriber(conf) s, err := NewEtcdSubscriber(conf)
logx.Must(err) logx.Must(err)
return s return s
} }
// NewEtcdSubscriber returns an etcd Subscriber. // NewEtcdSubscriber returns an etcd Subscriber.
func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) { func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) {
var opts = []discov.SubOption{ opts := buildSubOptions(conf)
discov.WithDisablePrefix(),
}
if len(conf.User) != 0 {
opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass))
}
if len(conf.CertFile) != 0 || len(conf.CertKeyFile) != 0 || len(conf.CACertFile) != 0 {
opts = append(opts,
discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile, conf.CACertFile, conf.InsecureSkipVerify))
}
s, err := discov.NewSubscriber(conf.Hosts, conf.Key, opts...) s, err := discov.NewSubscriber(conf.Hosts, conf.Key, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -44,6 +33,23 @@ func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) {
return &etcdSubscriber{Subscriber: s}, nil return &etcdSubscriber{Subscriber: s}, nil
} }
// buildSubOptions constructs the options for creating a new etcd subscriber.
func buildSubOptions(conf EtcdConf) []discov.SubOption {
opts := []discov.SubOption{
discov.WithExactMatch(),
}
if len(conf.User) > 0 {
opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass))
}
if len(conf.CertFile) > 0 || len(conf.CertKeyFile) > 0 || len(conf.CACertFile) > 0 {
opts = append(opts, discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile,
conf.CACertFile, conf.InsecureSkipVerify))
}
return opts
}
// AddListener adds a listener to the subscriber. // AddListener adds a listener to the subscriber.
func (s *etcdSubscriber) AddListener(listener func()) error { func (s *etcdSubscriber) AddListener(listener func()) error {
s.Subscriber.AddListener(listener) s.Subscriber.AddListener(listener)
@ -53,8 +59,9 @@ func (s *etcdSubscriber) AddListener(listener func()) error {
// Value returns the value of the subscriber. // Value returns the value of the subscriber.
func (s *etcdSubscriber) Value() (string, error) { func (s *etcdSubscriber) Value() (string, error) {
vs := s.Subscriber.Values() vs := s.Subscriber.Values()
if len(vs) != 0 { if len(vs) > 0 {
return vs[len(vs)-1], nil return vs[len(vs)-1], nil
} }
return "", nil return "", nil
} }

View File

@ -6,41 +6,36 @@ import (
"github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/conf"
) )
type ( var registry = &unmarshalerRegistry{
// unmarshalerRegistry is the registry for unmarshalers.
unmarshalerRegistry struct {
unmarshalers map[string]LoaderFn
mu sync.RWMutex
}
// LoaderFn is the function type for loading configuration.
LoaderFn func([]byte, any) error
)
var defaultRegistry *unmarshalerRegistry
func init() {
defaultRegistry = &unmarshalerRegistry{
unmarshalers: map[string]LoaderFn{ unmarshalers: map[string]LoaderFn{
"json": conf.LoadFromJsonBytes, "json": conf.LoadFromJsonBytes,
"toml": conf.LoadFromTomlBytes, "toml": conf.LoadFromTomlBytes,
"yaml": conf.LoadFromYamlBytes, "yaml": conf.LoadFromYamlBytes,
}, },
} }
type (
// LoaderFn is the function type for loading configuration.
LoaderFn func([]byte, any) error
// unmarshalerRegistry is the registry for unmarshalers.
unmarshalerRegistry struct {
unmarshalers map[string]LoaderFn
mu sync.RWMutex
} }
)
// RegisterUnmarshaler registers an unmarshaler. // RegisterUnmarshaler registers an unmarshaler.
func RegisterUnmarshaler(name string, fn LoaderFn) { func RegisterUnmarshaler(name string, fn LoaderFn) {
defaultRegistry.mu.Lock() registry.mu.Lock()
defaultRegistry.unmarshalers[name] = fn defer registry.mu.Unlock()
defaultRegistry.mu.Unlock() registry.unmarshalers[name] = fn
} }
// Unmarshaler returns the unmarshaler by name. // Unmarshaler returns the unmarshaler by name.
func Unmarshaler(name string) (LoaderFn, bool) { func Unmarshaler(name string) (LoaderFn, bool) {
defaultRegistry.mu.RLock() registry.mu.RLock()
fn, ok := defaultRegistry.unmarshalers[name] defer registry.mu.RUnlock()
defaultRegistry.mu.RUnlock() fn, ok := registry.unmarshalers[name]
return fn, ok return fn, ok
} }

View File

@ -46,7 +46,7 @@ func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
} }
// Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener. // Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener.
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, disablePrefix bool) error { func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, exactMatch bool) error {
c, exists := r.getCluster(endpoints) c, exists := r.getCluster(endpoints)
// if exists, the existing values should be updated to the listener. // if exists, the existing values should be updated to the listener.
if exists { if exists {
@ -56,7 +56,7 @@ func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, dis
} }
} }
return c.monitor(key, l, disablePrefix) return c.monitor(key, l, exactMatch)
} }
func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) { func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
@ -87,7 +87,7 @@ type cluster struct {
watchGroup *threading.RoutineGroup watchGroup *threading.RoutineGroup
done chan lang.PlaceholderType done chan lang.PlaceholderType
lock sync.RWMutex lock sync.RWMutex
disablePrefix bool exactMatch bool
} }
func newCluster(endpoints []string) *cluster { func newCluster(endpoints []string) *cluster {
@ -226,7 +226,7 @@ func (c *cluster) load(cli EtcdClient, key string) int64 {
for { for {
var err error var err error
ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout) ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
if c.disablePrefix { if c.exactMatch {
resp, err = cli.Get(ctx, key) resp, err = cli.Get(ctx, key)
} else { } else {
resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix()) resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
@ -254,10 +254,10 @@ func (c *cluster) load(cli EtcdClient, key string) int64 {
return resp.Header.Revision return resp.Header.Revision
} }
func (c *cluster) monitor(key string, l UpdateListener, disablePrefix bool) error { func (c *cluster) monitor(key string, l UpdateListener, exactMatch bool) error {
c.lock.Lock() c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l) c.listeners[key] = append(c.listeners[key], l)
c.disablePrefix = disablePrefix c.exactMatch = exactMatch
c.lock.Unlock() c.lock.Unlock()
cli, err := c.getClient() cli, err := c.getClient()
@ -328,7 +328,7 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error {
ops []clientv3.OpOption ops []clientv3.OpOption
watchKey = key watchKey = key
) )
if !c.disablePrefix { if !c.exactMatch {
watchKey = makeKeyPrefix(key) watchKey = makeKeyPrefix(key)
ops = append(ops, clientv3.WithPrefix()) ops = append(ops, clientv3.WithPrefix())
} }

View File

@ -17,7 +17,7 @@ type (
Subscriber struct { Subscriber struct {
endpoints []string endpoints []string
exclusive bool exclusive bool
disablePrefix bool exactMatch bool
items *container items *container
} }
) )
@ -35,7 +35,7 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib
} }
sub.items = newContainer(sub.exclusive) sub.items = newContainer(sub.exclusive)
if err := internal.GetRegistry().Monitor(endpoints, key, sub.items, sub.disablePrefix); err != nil { if err := internal.GetRegistry().Monitor(endpoints, key, sub.items, sub.exactMatch); err != nil {
return nil, err return nil, err
} }
@ -60,10 +60,10 @@ func Exclusive() SubOption {
} }
} }
// WithDisablePrefix turn off querying using key prefixes. // WithExactMatch turn off querying using key prefixes.
func WithDisablePrefix() SubOption { func WithExactMatch() SubOption {
return func(sub *Subscriber) { return func(sub *Subscriber) {
sub.disablePrefix = true sub.exactMatch = true
} }
} }