From 44cddec5c30a11b38334ffbdca2cba090874a4d7 Mon Sep 17 00:00:00 2001 From: MarkJoyMa <64180138+MarkJoyMa@users.noreply.github.com> Date: Wed, 28 Aug 2024 14:47:52 +0800 Subject: [PATCH] feat: added configuration center function (#3035) Co-authored-by: aiden.ma --- core/configcenter/configurator.go | 205 ++++++++++++++++++ core/configcenter/configurator_test.go | 233 +++++++++++++++++++++ core/configcenter/subscriber/etcd.go | 60 ++++++ core/configcenter/subscriber/subscriber.go | 9 + core/configcenter/unmarshaler.go | 46 ++++ core/configcenter/unmarshaler_test.go | 28 +++ core/discov/internal/registry.go | 54 +++-- core/discov/internal/registry_test.go | 2 +- core/discov/subscriber.go | 16 +- 9 files changed, 628 insertions(+), 25 deletions(-) create mode 100644 core/configcenter/configurator.go create mode 100644 core/configcenter/configurator_test.go create mode 100644 core/configcenter/subscriber/etcd.go create mode 100644 core/configcenter/subscriber/subscriber.go create mode 100644 core/configcenter/unmarshaler.go create mode 100644 core/configcenter/unmarshaler_test.go diff --git a/core/configcenter/configurator.go b/core/configcenter/configurator.go new file mode 100644 index 00000000..809dedff --- /dev/null +++ b/core/configcenter/configurator.go @@ -0,0 +1,205 @@ +package configurator + +import ( + "errors" + "fmt" + "reflect" + "strings" + "sync" + "sync/atomic" + + "github.com/zeromicro/go-zero/core/configcenter/subscriber" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/mapping" + "github.com/zeromicro/go-zero/core/threading" +) + +var ( + errorEmptyConfig = errors.New("empty config value") + errorMissUnmarshalerType = errors.New("miss unmarshaler type") +) + +// Configurator is the interface for configuration center. +type Configurator[T any] interface { + // GetConfig returns the subscription value. + GetConfig() (T, error) + // AddListener adds a listener to the subscriber. + AddListener(listener func()) +} + +type ( + // Config is the configuration for Configurator. + Config struct { + // Type is the value type, yaml, json or toml. + Type string `json:",default=yaml,options=[yaml,json,toml]"` + // Log indicates whether to log the configuration. + Log bool `json:",default=ture"` + } + + configCenter[T any] struct { + conf Config + unmarshaler LoaderFn + + subscriber subscriber.Subscriber + + listeners []func() + lock sync.Mutex + snapshot atomic.Value + } + + value[T any] struct { + data string + marshalData T + err error + } +) + +// Configurator is the interface for configuration center. +var _ Configurator[any] = (*configCenter[any])(nil) + +// MustNewConfigCenter returns a Configurator, exits on errors. +func MustNewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) Configurator[T] { + cc, err := NewConfigCenter[T](c, subscriber) + logx.Must(err) + + return cc +} + +// NewConfigCenter returns a Configurator. +func NewConfigCenter[T any](c Config, subscriber subscriber.Subscriber) (Configurator[T], error) { + unmarshaler, ok := Unmarshaler(strings.ToLower(c.Type)) + if !ok { + return nil, fmt.Errorf("unknown format: %s", c.Type) + } + + cc := &configCenter[T]{ + conf: c, + unmarshaler: unmarshaler, + subscriber: subscriber, + listeners: nil, + lock: sync.Mutex{}, + snapshot: atomic.Value{}, + } + + if err := cc.loadConfig(); err != nil { + return nil, err + } + + if err := cc.subscriber.AddListener(cc.onChange); err != nil { + return nil, err + } + + if _, err := cc.GetConfig(); err != nil { + return nil, err + } + + return cc, nil +} + +// AddListener adds listener to s. +func (c *configCenter[T]) AddListener(listener func()) { + c.lock.Lock() + defer c.lock.Unlock() + c.listeners = append(c.listeners, listener) +} + +// GetConfig return structured config. +func (c *configCenter[T]) GetConfig() (T, error) { + var r T + v := c.value() + if v == nil || len(v.data) < 1 { + return r, errorEmptyConfig + } + + return v.marshalData, v.err +} + +// Value returns the subscription value. +func (c *configCenter[T]) Value() string { + v := c.value() + if v == nil { + return "" + } + return v.data +} + +func (c *configCenter[T]) loadConfig() error { + v, err := c.subscriber.Value() + if err != nil { + if c.conf.Log { + logx.Errorf("ConfigCenter loads changed configuration, error: %v", err) + } + return err + } + + if c.conf.Log { + logx.Infof("ConfigCenter loads changed configuration, content [%s]", v) + } + + c.snapshot.Store(c.genValue(v)) + return nil +} + +func (c *configCenter[T]) onChange() { + _ = c.loadConfig() + + c.lock.Lock() + listeners := make([]func(), len(c.listeners)) + copy(listeners, c.listeners) + c.lock.Unlock() + + for _, l := range listeners { + threading.GoSafe(l) + } +} + +func (c *configCenter[T]) value() *value[T] { + content := c.snapshot.Load() + if content == nil { + return nil + } + return content.(*value[T]) +} + +func (c *configCenter[T]) genValue(data string) *value[T] { + v := &value[T]{ + data: data, + } + if len(data) <= 0 { + return v + } + + t := reflect.TypeOf(v.marshalData) + // if the type is nil, it means that the user has not set the type of the configuration. + if t == nil { + v.err = errorMissUnmarshalerType + return v + } + + t = mapping.Deref(t) + + switch t.Kind() { + case reflect.Struct, reflect.Array, reflect.Slice: + err := c.unmarshaler([]byte(data), &v.marshalData) + if err != nil { + v.err = err + if c.conf.Log { + logx.Errorf("ConfigCenter unmarshal configuration failed, err: %+v, content [%s]", err.Error(), data) + } + } + case reflect.String: + if str, ok := any(data).(T); ok { + v.marshalData = str + } else { + v.err = errorMissUnmarshalerType + } + default: + if c.conf.Log { + logx.Errorf("ConfigCenter unmarshal configuration missing unmarshaler for type: %s, content [%s]", + t.Kind(), data) + } + v.err = errorMissUnmarshalerType + } + + return v +} diff --git a/core/configcenter/configurator_test.go b/core/configcenter/configurator_test.go new file mode 100644 index 00000000..703afe7f --- /dev/null +++ b/core/configcenter/configurator_test.go @@ -0,0 +1,233 @@ +package configurator + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewConfigCenter(t *testing.T) { + _, err := NewConfigCenter[any](Config{ + Log: true, + }, &mockSubscriber{}) + assert.Error(t, err) + + _, err = NewConfigCenter[any](Config{ + Type: "json", + Log: true, + }, &mockSubscriber{}) + assert.Error(t, err) +} + +func TestConfigCenter_GetConfig(t *testing.T) { + mock := &mockSubscriber{} + type Data struct { + Name string `json:"name"` + } + + mock.v = `{"name": "go-zero"}` + c1, err := NewConfigCenter[Data](Config{ + Type: "json", + Log: true, + }, mock) + assert.NoError(t, err) + + data, err := c1.GetConfig() + assert.NoError(t, err) + assert.Equal(t, "go-zero", data.Name) + + mock.v = `{"name": "111"}` + c2, err := NewConfigCenter[Data](Config{Type: "json"}, mock) + assert.NoError(t, err) + + mock.v = `{}` + c3, err := NewConfigCenter[string](Config{ + Type: "json", + Log: true, + }, mock) + assert.NoError(t, err) + _, err = c3.GetConfig() + assert.NoError(t, err) + + data, err = c2.GetConfig() + assert.NoError(t, err) + + mock.lisErr = errors.New("mock error") + _, err = NewConfigCenter[Data](Config{ + Type: "json", + Log: true, + }, mock) + assert.Error(t, err) +} + +func TestConfigCenter_onChange(t *testing.T) { + mock := &mockSubscriber{} + type Data struct { + Name string `json:"name"` + } + + mock.v = `{"name": "go-zero"}` + c1, err := NewConfigCenter[Data](Config{Type: "json", Log: true}, mock) + assert.NoError(t, err) + + data, err := c1.GetConfig() + assert.NoError(t, err) + assert.Equal(t, "go-zero", data.Name) + + mock.v = `{"name": "go-zero2"}` + mock.change() + + data, err = c1.GetConfig() + assert.NoError(t, err) + assert.Equal(t, "go-zero2", data.Name) + + mock.valErr = errors.New("mock error") + _, err = NewConfigCenter[Data](Config{Type: "json", Log: false}, mock) + assert.Error(t, err) +} + +func TestConfigCenter_Value(t *testing.T) { + mock := &mockSubscriber{} + mock.v = "1234" + + c, err := NewConfigCenter[string](Config{ + Type: "json", + Log: true, + }, mock) + assert.NoError(t, err) + + cc := c.(*configCenter[string]) + + assert.Equal(t, cc.Value(), "1234") + + mock.valErr = errors.New("mock error") + + _, err = NewConfigCenter[any](Config{ + Type: "json", + Log: true, + }, mock) + assert.Error(t, err) +} + +func TestConfigCenter_AddListener(t *testing.T) { + mock := &mockSubscriber{} + mock.v = "1234" + c, err := NewConfigCenter[string](Config{ + Type: "json", + Log: true, + }, mock) + assert.NoError(t, err) + + cc := c.(*configCenter[string]) + var a, b int + var mutex sync.Mutex + cc.AddListener(func() { + mutex.Lock() + a = 1 + mutex.Unlock() + }) + cc.AddListener(func() { + mutex.Lock() + b = 2 + mutex.Unlock() + }) + + assert.Equal(t, 2, len(cc.listeners)) + + mock.change() + + time.Sleep(time.Millisecond * 100) + + mutex.Lock() + assert.Equal(t, 1, a) + assert.Equal(t, 2, b) + mutex.Unlock() +} + +func TestConfigCenter_genValue(t *testing.T) { + t.Run("data is empty", func(t *testing.T) { + c := &configCenter[string]{ + unmarshaler: defaultRegistry.unmarshalers["json"], + conf: Config{Log: true}, + } + v := c.genValue("") + assert.Equal(t, "", v.data) + }) + + t.Run("invalid template type", func(t *testing.T) { + c := &configCenter[any]{ + unmarshaler: defaultRegistry.unmarshalers["json"], + conf: Config{Log: true}, + } + v := c.genValue("xxxx") + assert.Equal(t, errorMissUnmarshalerType, v.err) + }) + + t.Run("unsupported template type", func(t *testing.T) { + c := &configCenter[int]{ + unmarshaler: defaultRegistry.unmarshalers["json"], + conf: Config{Log: true}, + } + v := c.genValue("1") + assert.Equal(t, errorMissUnmarshalerType, v.err) + }) + + t.Run("supported template string type", func(t *testing.T) { + c := &configCenter[string]{ + unmarshaler: defaultRegistry.unmarshalers["json"], + conf: Config{Log: true}, + } + v := c.genValue("12345") + assert.NoError(t, v.err) + assert.Equal(t, "12345", v.data) + }) + + t.Run("unmarshal fail", func(t *testing.T) { + c := &configCenter[struct { + Name string `json:"name"` + }]{ + unmarshaler: defaultRegistry.unmarshalers["json"], + conf: Config{Log: true}, + } + v := c.genValue(`{"name":"new name}`) + assert.Equal(t, `{"name":"new name}`, v.data) + assert.Error(t, v.err) + }) + + t.Run("success", func(t *testing.T) { + c := &configCenter[struct { + Name string `json:"name"` + }]{ + unmarshaler: defaultRegistry.unmarshalers["json"], + conf: Config{Log: true}, + } + v := c.genValue(`{"name":"new name"}`) + assert.Equal(t, `{"name":"new name"}`, v.data) + assert.Equal(t, "new name", v.marshalData.Name) + assert.NoError(t, v.err) + }) +} + +type mockSubscriber struct { + v string + lisErr, valErr error + listener func() +} + +func (m *mockSubscriber) AddListener(listener func()) error { + m.listener = listener + return m.lisErr +} + +func (m *mockSubscriber) Value() (string, error) { + return m.v, m.valErr +} + +func (m *mockSubscriber) change() { + if m.listener != nil { + m.listener() + } +} diff --git a/core/configcenter/subscriber/etcd.go b/core/configcenter/subscriber/etcd.go new file mode 100644 index 00000000..942dd532 --- /dev/null +++ b/core/configcenter/subscriber/etcd.go @@ -0,0 +1,60 @@ +package subscriber + +import ( + "github.com/zeromicro/go-zero/core/discov" + "github.com/zeromicro/go-zero/core/logx" +) + +type ( + // etcdSubscriber is a subscriber that subscribes to etcd. + etcdSubscriber struct { + *discov.Subscriber + } + + // EtcdConf is the configuration for etcd. + EtcdConf discov.EtcdConf +) + +// MustNewEtcdSubscriber returns an etcd Subscriber, exits on errors. +func MustNewEtcdSubscriber(conf EtcdConf) Subscriber { + s, err := NewEtcdSubscriber(conf) + logx.Must(err) + + return s +} + +// NewEtcdSubscriber returns an etcd Subscriber. +func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) { + var opts = []discov.SubOption{ + discov.WithDisablePrefix(), + } + if len(conf.User) != 0 { + opts = append(opts, discov.WithSubEtcdAccount(conf.User, conf.Pass)) + } + if len(conf.CertFile) != 0 || len(conf.CertKeyFile) != 0 || len(conf.CACertFile) != 0 { + opts = append(opts, + discov.WithSubEtcdTLS(conf.CertFile, conf.CertKeyFile, conf.CACertFile, conf.InsecureSkipVerify)) + } + + s, err := discov.NewSubscriber(conf.Hosts, conf.Key, opts...) + if err != nil { + return nil, err + } + + return &etcdSubscriber{Subscriber: s}, nil +} + +// AddListener adds a listener to the subscriber. +func (s *etcdSubscriber) AddListener(listener func()) error { + s.Subscriber.AddListener(listener) + return nil +} + +// Value returns the value of the subscriber. +func (s *etcdSubscriber) Value() (string, error) { + vs := s.Subscriber.Values() + if len(vs) != 0 { + return vs[len(vs)-1], nil + } + return "", nil +} diff --git a/core/configcenter/subscriber/subscriber.go b/core/configcenter/subscriber/subscriber.go new file mode 100644 index 00000000..e0ad9fc8 --- /dev/null +++ b/core/configcenter/subscriber/subscriber.go @@ -0,0 +1,9 @@ +package subscriber + +// Subscriber is the interface for configcenter subscribers. +type Subscriber interface { + // AddListener adds a listener to the subscriber. + AddListener(listener func()) error + // Value returns the value of the subscriber. + Value() (string, error) +} diff --git a/core/configcenter/unmarshaler.go b/core/configcenter/unmarshaler.go new file mode 100644 index 00000000..c8b68e31 --- /dev/null +++ b/core/configcenter/unmarshaler.go @@ -0,0 +1,46 @@ +package configurator + +import ( + "sync" + + "github.com/zeromicro/go-zero/core/conf" +) + +type ( + // unmarshalerRegistry is the registry for unmarshalers. + unmarshalerRegistry struct { + unmarshalers map[string]LoaderFn + + mu sync.RWMutex + } + + // LoaderFn is the function type for loading configuration. + LoaderFn func([]byte, any) error +) + +var defaultRegistry *unmarshalerRegistry + +func init() { + defaultRegistry = &unmarshalerRegistry{ + unmarshalers: map[string]LoaderFn{ + "json": conf.LoadFromJsonBytes, + "toml": conf.LoadFromTomlBytes, + "yaml": conf.LoadFromYamlBytes, + }, + } +} + +// RegisterUnmarshaler registers an unmarshaler. +func RegisterUnmarshaler(name string, fn LoaderFn) { + defaultRegistry.mu.Lock() + defaultRegistry.unmarshalers[name] = fn + defaultRegistry.mu.Unlock() +} + +// Unmarshaler returns the unmarshaler by name. +func Unmarshaler(name string) (LoaderFn, bool) { + defaultRegistry.mu.RLock() + fn, ok := defaultRegistry.unmarshalers[name] + defaultRegistry.mu.RUnlock() + return fn, ok +} diff --git a/core/configcenter/unmarshaler_test.go b/core/configcenter/unmarshaler_test.go new file mode 100644 index 00000000..2498dc92 --- /dev/null +++ b/core/configcenter/unmarshaler_test.go @@ -0,0 +1,28 @@ +package configurator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRegisterUnmarshaler(t *testing.T) { + RegisterUnmarshaler("test", func(data []byte, v interface{}) error { + return nil + }) + + _, ok := Unmarshaler("test") + assert.True(t, ok) + + _, ok = Unmarshaler("test2") + assert.False(t, ok) + + _, ok = Unmarshaler("json") + assert.True(t, ok) + + _, ok = Unmarshaler("toml") + assert.True(t, ok) + + _, ok = Unmarshaler("yaml") + assert.True(t, ok) +} diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index 6b892e08..f4652f3a 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -10,13 +10,14 @@ import ( "sync" "time" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/zeromicro/go-zero/core/contextx" "github.com/zeromicro/go-zero/core/lang" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/threading" - "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - clientv3 "go.etcd.io/etcd/client/v3" ) var ( @@ -45,7 +46,7 @@ func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) { } // Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener. -func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) error { +func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener, disablePrefix bool) error { c, exists := r.getCluster(endpoints) // if exists, the existing values should be updated to the listener. if exists { @@ -55,7 +56,7 @@ func (r *Registry) Monitor(endpoints []string, key string, l UpdateListener) err } } - return c.monitor(key, l) + return c.monitor(key, l, disablePrefix) } func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) { @@ -79,13 +80,14 @@ func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) { } type cluster struct { - endpoints []string - key string - values map[string]map[string]string - listeners map[string][]UpdateListener - watchGroup *threading.RoutineGroup - done chan lang.PlaceholderType - lock sync.RWMutex + endpoints []string + key string + values map[string]map[string]string + listeners map[string][]UpdateListener + watchGroup *threading.RoutineGroup + done chan lang.PlaceholderType + lock sync.RWMutex + disablePrefix bool } func newCluster(endpoints []string) *cluster { @@ -224,7 +226,12 @@ func (c *cluster) load(cli EtcdClient, key string) int64 { for { var err error ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout) - resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix()) + if c.disablePrefix { + resp, err = cli.Get(ctx, key) + } else { + resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix()) + } + cancel() if err == nil { break @@ -247,9 +254,10 @@ func (c *cluster) load(cli EtcdClient, key string) int64 { return resp.Header.Revision } -func (c *cluster) monitor(key string, l UpdateListener) error { +func (c *cluster) monitor(key string, l UpdateListener, disablePrefix bool) error { c.lock.Lock() c.listeners[key] = append(c.listeners[key], l) + c.disablePrefix = disablePrefix c.lock.Unlock() cli, err := c.getClient() @@ -315,14 +323,20 @@ func (c *cluster) watch(cli EtcdClient, key string, rev int64) { } func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error { - var rch clientv3.WatchChan - if rev != 0 { - rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), - clientv3.WithPrefix(), clientv3.WithRev(rev+1)) - } else { - rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), - clientv3.WithPrefix()) + var ( + rch clientv3.WatchChan + ops []clientv3.OpOption + watchKey = key + ) + if !c.disablePrefix { + watchKey = makeKeyPrefix(key) + ops = append(ops, clientv3.WithPrefix()) } + if rev != 0 { + ops = append(ops, clientv3.WithRev(rev+1)) + } + + rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), watchKey, ops...) for { select { diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index 6e2469a9..bb9fd629 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -289,7 +289,7 @@ func TestRegistry_Monitor(t *testing.T) { }, } GetRegistry().lock.Unlock() - assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener))) + assert.Error(t, GetRegistry().Monitor(endpoints, "foo", new(mockListener), false)) } type mockListener struct { diff --git a/core/discov/subscriber.go b/core/discov/subscriber.go index 809dee2a..e0dd5de0 100644 --- a/core/discov/subscriber.go +++ b/core/discov/subscriber.go @@ -15,9 +15,10 @@ type ( // A Subscriber is used to subscribe the given key on an etcd cluster. Subscriber struct { - endpoints []string - exclusive bool - items *container + endpoints []string + exclusive bool + disablePrefix bool + items *container } ) @@ -34,7 +35,7 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib } sub.items = newContainer(sub.exclusive) - if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil { + if err := internal.GetRegistry().Monitor(endpoints, key, sub.items, sub.disablePrefix); err != nil { return nil, err } @@ -59,6 +60,13 @@ func Exclusive() SubOption { } } +// WithDisablePrefix turn off querying using key prefixes. +func WithDisablePrefix() SubOption { + return func(sub *Subscriber) { + sub.disablePrefix = true + } +} + // WithSubEtcdAccount provides the etcd username/password. func WithSubEtcdAccount(user, pass string) SubOption { return func(sub *Subscriber) {