feat: added configuration center function (#3035)

Co-authored-by: aiden.ma <Aiden.ma@yijinin.com>
This commit is contained in:
MarkJoyMa 2024-08-28 14:47:52 +08:00 committed by GitHub
parent 47d13e5ef8
commit 44cddec5c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 628 additions and 25 deletions

View File

@ -0,0 +1,205 @@
package configurator
import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"github.com/zeromicro/go-zero/core/configcenter/subscriber"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mapping"
"github.com/zeromicro/go-zero/core/threading"
)
var (
errorEmptyConfig = errors.New("empty config value")
errorMissUnmarshalerType = errors.New("miss unmarshaler type")
)
// Configurator is the interface for configuration center.
type Configurator[T any] interface {
// GetConfig returns the subscription value.
GetConfig() (T, error)
// AddListener adds a listener to the subscriber.
AddListener(listener func())
}
type (
// Config is the configuration for Configurator.
Config struct {
// Type is the value type, yaml, json or toml.
Type string `json:",default=yaml,options=[yaml,json,toml]"`
// Log indicates whether to log the configuration.
Log bool `json:",default=ture"`
}
configCenter[T any] struct {
conf Config
unmarshaler LoaderFn
subscriber subscriber.Subscriber
listeners []func()
lock sync.Mutex
snapshot atomic.Value
}
value[T any] struct {
data string
marshalData T
err error
}
)
// Configurator is the interface for configuration center.
var _ Configurator[any] = (*configCenter[any])(nil)
// MustNewConfigCenter returns a Configurator, exits on errors.
func MustNewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) Configurator[T] {
cc, err := NewConfigCenter[T](c, subscriber)
logx.Must(err)
return cc
}
// NewConfigCenter returns a Configurator.
func NewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) (Configurator[T], error) {
unmarshaler, ok := Unmarshaler(strings.ToLower(c.Type))
if !ok {
return nil, fmt.Errorf("unknown format: %s", c.Type)
}
cc := &configCenter[T]{
conf: c,
unmarshaler: unmarshaler,
subscriber: subscriber,
listeners: nil,
lock: sync.Mutex{},
snapshot: atomic.Value{},
}
if err := cc.loadConfig(); err != nil {
return nil, err
}
if err := cc.subscriber.AddListener(cc.onChange); err != nil {
return nil, err
}
if _, err := cc.GetConfig(); err != nil {
return nil, err
}
return cc, nil
}
// AddListener adds listener to s.
func (c *configCenter[T]) AddListener(listener func()) {
c.lock.Lock()
defer c.lock.Unlock()
c.listeners = append(c.listeners, listener)
}
// GetConfig return structured config.
func (c *configCenter[T]) GetConfig() (T, error) {
var r T
v := c.value()
if v == nil || len(v.data) < 1 {
return r, errorEmptyConfig
}
return v.marshalData, v.err
}
// Value returns the subscription value.
func (c *configCenter[T]) Value() string {
v := c.value()
if v == nil {
return ""
}
return v.data
}
func (c *configCenter[T]) loadConfig() error {
v, err := c.subscriber.Value()
if err != nil {
if c.conf.Log {
logx.Errorf("ConfigCenter loads changed configuration, error: %v", err)
}
return err
}
if c.conf.Log {
logx.Infof("ConfigCenter loads changed configuration, content [%s]", v)
}
c.snapshot.Store(c.genValue(v))
return nil
}
func (c *configCenter[T]) onChange() {
_ = c.loadConfig()
c.lock.Lock()
listeners := make([]func(), len(c.listeners))
copy(listeners, c.listeners)
c.lock.Unlock()
for _, l := range listeners {
threading.GoSafe(l)
}
}
func (c *configCenter[T]) value() *value[T] {
content := c.snapshot.Load()
if content == nil {
return nil
}
return content.(*value[T])
}
func (c *configCenter[T]) genValue(data string) *value[T] {
v := &value[T]{
data: data,
}
if len(data) <= 0 {
return v
}
t := reflect.TypeOf(v.marshalData)
// if the type is nil, it means that the user has not set the type of the configuration.
if t == nil {
v.err = errorMissUnmarshalerType
return v
}
t = mapping.Deref(t)
switch t.Kind() {
case reflect.Struct, reflect.Array, reflect.Slice:
err := c.unmarshaler([]byte(data), &v.marshalData)
if err != nil {
v.err = err
if c.conf.Log {
logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]", err.Error(), data)
}
}
case reflect.String:
if str, ok := any(data).(T); ok {
v.marshalData = str
} else {
v.err = errorMissUnmarshalerType
}
default:
if c.conf.Log {
logx.Errorf("ConfigCenter unmarshal configuration missing unmarshaler for type: %s, content [%s]",
t.Kind(), data)
}
v.err = errorMissUnmarshalerType
}
return v
}

View File

@ -0,0 +1,233 @@
package configurator
import (
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestNewConfigCenter(t *testing.T) {
_, err := NewConfigCenter[any](Config{
Log: true,
}, &mockSubscriber{})
assert.Error(t, err)
_, err = NewConfigCenter[any](Config{
Type: "json",
Log: true,
}, &mockSubscriber{})
assert.Error(t, err)
}
func TestConfigCenter_GetConfig(t *testing.T) {
mock := &mockSubscriber{}
type Data struct {
Name string `json:"name"`
}
mock.v = `{"name": "go-zero"}`
c1, err := NewConfigCenter[Data](Config{
Type: "json",
Log: true,
}, mock)
assert.NoError(t, err)
data, err := c1.GetConfig()
assert.NoError(t, err)
assert.Equal(t, "go-zero", data.Name)
mock.v = `{"name": "111"}`
c2, err := NewConfigCenter[Data](Config{Type: "json"}, mock)
assert.NoError(t, err)
mock.v = `{}`
c3, err := NewConfigCenter[string](Config{
Type: "json",
Log: true,
}, mock)
assert.NoError(t, err)
_, err = c3.GetConfig()
assert.NoError(t, err)
data, err = c2.GetConfig()
assert.NoError(t, err)
mock.lisErr = errors.New("mock error")
_, err = NewConfigCenter[Data](Config{
Type: "json",
Log: true,
}, mock)
assert.Error(t, err)
}
func TestConfigCenter_onChange(t *testing.T) {
mock := &mockSubscriber{}
type Data struct {
Name string `json:"name"`
}
mock.v = `{"name": "go-zero"}`
c1, err := NewConfigCenter[Data](Config{Type: "json", Log: true}, mock)
assert.NoError(t, err)
data, err := c1.GetConfig()
assert.NoError(t, err)
assert.Equal(t, "go-zero", data.Name)
mock.v = `{"name": "go-zero2"}`
mock.change()
data, err = c1.GetConfig()
assert.NoError(t, err)
assert.Equal(t, "go-zero2", data.Name)
mock.valErr = errors.New("mock error")
_, err = NewConfigCenter[Data](Config{Type: "json", Log: false}, mock)
assert.Error(t, err)
}
func TestConfigCenter_Value(t *testing.T) {
mock := &mockSubscriber{}
mock.v = "1234"
c, err := NewConfigCenter[string](Config{
Type: "json",
Log: true,
}, mock)
assert.NoError(t, err)
cc := c.(*configCenter[string])
assert.Equal(t, cc.Value(), "1234")
mock.valErr = errors.New("mock error")
_, err = NewConfigCenter[any](Config{
Type: "json",
Log: true,
}, mock)
assert.Error(t, err)
}
func TestConfigCenter_AddListener(t *testing.T) {
mock := &mockSubscriber{}
mock.v = "1234"
c, err := NewConfigCenter[string](Config{
Type: "json",
Log: true,
}, mock)
assert.NoError(t, err)
cc := c.(*configCenter[string])
var a, b int
var mutex sync.Mutex
cc.AddListener(func() {
mutex.Lock()
a = 1
mutex.Unlock()
})
cc.AddListener(func() {
mutex.Lock()
b = 2
mutex.Unlock()
})
assert.Equal(t, 2, len(cc.listeners))
mock.change()
time.Sleep(time.Millisecond * 100)
mutex.Lock()
assert.Equal(t, 1, a)
assert.Equal(t, 2, b)
mutex.Unlock()
}
func TestConfigCenter_genValue(t *testing.T) {
t.Run("data is empty", func(t *testing.T) {
c := &configCenter[string]{
unmarshaler: defaultRegistry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("")
assert.Equal(t, "", v.data)
})
t.Run("invalid template type", func(t *testing.T) {
c := &configCenter[any]{
unmarshaler: defaultRegistry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("xxxx")
assert.Equal(t, errorMissUnmarshalerType, v.err)
})
t.Run("unsupported template type", func(t *testing.T) {
c := &configCenter[int]{
unmarshaler: defaultRegistry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("1")
assert.Equal(t, errorMissUnmarshalerType, v.err)
})
t.Run("supported template string type", func(t *testing.T) {
c := &configCenter[string]{
unmarshaler: defaultRegistry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue("12345")
assert.NoError(t, v.err)
assert.Equal(t, "12345", v.data)
})
t.Run("unmarshal fail", func(t *testing.T) {
c := &configCenter[struct {
Name string `json:"name"`
}]{
unmarshaler: defaultRegistry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue(`{"name":"new name}`)
assert.Equal(t, `{"name":"new name}`, v.data)
assert.Error(t, v.err)
})
t.Run("success", func(t *testing.T) {
c := &configCenter[struct {
Name string `json:"name"`
}]{
unmarshaler: defaultRegistry.unmarshalers["json"],
conf: Config{Log: true},
}
v := c.genValue(`{"name":"new name"}`)
assert.Equal(t, `{"name":"new name"}`, v.data)
assert.Equal(t, "new name", v.marshalData.Name)
assert.NoError(t, v.err)
})
}
type mockSubscriber struct {
v string
lisErr, valErr error
listener func()
}
func (m *mockSubscriber) AddListener(listener func()) error {
m.listener = listener
return m.lisErr
}
func (m *mockSubscriber) Value() (string, error) {
return m.v, m.valErr
}
func (m *mockSubscriber) change() {
if m.listener != nil {
m.listener()
}
}

View File

@ -0,0 +1,60 @@
package subscriber
import (
"github.com/zeromicro/go-zero/core/discov"
"github.com/zeromicro/go-zero/core/logx"
)
type (
// etcdSubscriber is a subscriber that subscribes to etcd.
etcdSubscriber struct {
*discov.Subscriber
}
// EtcdConf is the configuration for etcd.
EtcdConf discov.EtcdConf
)
// MustNewEtcdSubscriber returns an etcd Subscriber, exits on errors.
func MustNewEtcdSubscriber(conf EtcdConf) Subscriber {
s, err := NewEtcdSubscriber(conf)
logx.Must(err)
return s
}
// NewEtcdSubscriber returns an etcd Subscriber.
func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) {
var opts = []discov.SubOption{
discov.WithDisablePrefix(),
}
if len(conf.User) != 0 {
opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass))
}
if len(conf.CertFile) != 0 || len(conf.CertKeyFile) != 0 || len(conf.CACertFile) != 0 {
opts = append(opts,
discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile, conf.CACertFile, conf.InsecureSkipVerify))
}
s, err := discov.NewSubscriber(conf.Hosts, conf.Key, opts...)
if err != nil {
return nil, err
}
return &etcdSubscriber{Subscriber: s}, nil
}
// AddListener adds a listener to the subscriber.
func (s *etcdSubscriber) AddListener(listener func()) error {
s.Subscriber.AddListener(listener)
return nil
}
// Value returns the value of the subscriber.
func (s *etcdSubscriber) Value() (string, error) {
vs := s.Subscriber.Values()
if len(vs) != 0 {
return vs[len(vs)-1], nil
}
return "", nil
}

View File

@ -0,0 +1,9 @@
package subscriber
// Subscriber is the interface for configcenter subscribers.
type Subscriber interface {
// AddListener adds a listener to the subscriber.
AddListener(listener func()) error
// Value returns the value of the subscriber.
Value() (string, error)
}

View File

@ -0,0 +1,46 @@
package configurator
import (
"sync"
"github.com/zeromicro/go-zero/core/conf"
)
type (
// unmarshalerRegistry is the registry for unmarshalers.
unmarshalerRegistry struct {
unmarshalers map[string]LoaderFn
mu sync.RWMutex
}
// LoaderFn is the function type for loading configuration.
LoaderFn func([]byte, any) error
)
var defaultRegistry *unmarshalerRegistry
func init() {
defaultRegistry = &unmarshalerRegistry{
unmarshalers: map[string]LoaderFn{
"json": conf.LoadFromJsonBytes,
"toml": conf.LoadFromTomlBytes,
"yaml": conf.LoadFromYamlBytes,
},
}
}
// RegisterUnmarshaler registers an unmarshaler.
func RegisterUnmarshaler(name string, fn LoaderFn) {
defaultRegistry.mu.Lock()
defaultRegistry.unmarshalers[name] = fn
defaultRegistry.mu.Unlock()
}
// Unmarshaler returns the unmarshaler by name.
func Unmarshaler(name string) (LoaderFn, bool) {
defaultRegistry.mu.RLock()
fn, ok := defaultRegistry.unmarshalers[name]
defaultRegistry.mu.RUnlock()
return fn, ok
}

View File

@ -0,0 +1,28 @@
package configurator
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestRegisterUnmarshaler(t *testing.T) {
RegisterUnmarshaler("test", func(data []byte, v interface{}) error {
return nil
})
_, ok := Unmarshaler("test")
assert.True(t, ok)
_, ok = Unmarshaler("test2")
assert.False(t, ok)
_, ok = Unmarshaler("json")
assert.True(t, ok)
_, ok = Unmarshaler("toml")
assert.True(t, ok)
_, ok = Unmarshaler("yaml")
assert.True(t, ok)
}

View File

@ -10,13 +10,14 @@ import (
"sync"
"time"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/zeromicro/go-zero/core/contextx"
"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
)
var (
@ -45,7 +46,7 @@ func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
}
// Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener.
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error {
func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, disablePrefix bool) error {
c, exists := r.getCluster(endpoints)
// if exists, the existing values should be updated to the listener.
if exists {
@ -55,7 +56,7 @@ func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) err
}
}
return c.monitor(key, l)
return c.monitor(key, l, disablePrefix)
}
func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
@ -79,13 +80,14 @@ func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
}
type cluster struct {
endpoints []string
key string
values map[string]map[string]string
listeners map[string][]UpdateListener
watchGroup *threading.RoutineGroup
done chan lang.PlaceholderType
lock sync.RWMutex
endpoints []string
key string
values map[string]map[string]string
listeners map[string][]UpdateListener
watchGroup *threading.RoutineGroup
done chan lang.PlaceholderType
lock sync.RWMutex
disablePrefix bool
}
func newCluster(endpoints []string) *cluster {
@ -224,7 +226,12 @@ func (c *cluster) load(cli EtcdClient, key string) int64 {
for {
var err error
ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
if c.disablePrefix {
resp, err = cli.Get(ctx, key)
} else {
resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
}
cancel()
if err == nil {
break
@ -247,9 +254,10 @@ func (c *cluster) load(cli EtcdClient, key string) int64 {
return resp.Header.Revision
}
func (c *cluster) monitor(key string, l UpdateListener) error {
func (c *cluster) monitor(key string, l UpdateListener, disablePrefix bool) error {
c.lock.Lock()
c.listeners[key] = append(c.listeners[key], l)
c.disablePrefix = disablePrefix
c.lock.Unlock()
cli, err := c.getClient()
@ -315,14 +323,20 @@ func (c *cluster) watch(cli EtcdClient, key string, rev int64) {
}
func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error {
var rch clientv3.WatchChan
if rev != 0 {
rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key),
clientv3.WithPrefix(), clientv3.WithRev(rev+1))
} else {
rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key),
clientv3.WithPrefix())
var (
rch clientv3.WatchChan
ops []clientv3.OpOption
watchKey = key
)
if !c.disablePrefix {
watchKey = makeKeyPrefix(key)
ops = append(ops, clientv3.WithPrefix())
}
if rev != 0 {
ops = append(ops, clientv3.WithRev(rev+1))
}
rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), watchKey, ops...)
for {
select {

View File

@ -289,7 +289,7 @@ func TestRegistry_Monitor(t *testing.T) {
},
}
GetRegistry().lock.Unlock()
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener)))
assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener), false))
}
type mockListener struct {

View File

@ -15,9 +15,10 @@ type (
// A Subscriber is used to subscribe the given key on an etcd cluster.
Subscriber struct {
endpoints []string
exclusive bool
items *container
endpoints []string
exclusive bool
disablePrefix bool
items *container
}
)
@ -34,7 +35,7 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib
}
sub.items = newContainer(sub.exclusive)
if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
if err := internal.GetRegistry().Monitor(endpoints, key, sub.items, sub.disablePrefix); err != nil {
return nil, err
}
@ -59,6 +60,13 @@ func Exclusive() SubOption {
}
}
// WithDisablePrefix turn off querying using key prefixes.
func WithDisablePrefix() SubOption {
return func(sub *Subscriber) {
sub.disablePrefix = true
}
}
// WithSubEtcdAccount provides the etcd username/password.
func WithSubEtcdAccount(user, pass string) SubOption {
return func(sub *Subscriber) {