go-zero/core/discov/subscriber.go

194 lines
4.1 KiB
Go
Raw Normal View History

2020-07-26 17:09:05 +08:00
package discov
import (
"sync"
2020-07-30 22:56:39 +08:00
"sync/atomic"
2020-07-26 17:09:05 +08:00
2020-08-08 16:40:10 +08:00
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/logx"
2020-08-08 16:40:10 +08:00
"github.com/tal-tech/go-zero/core/syncx"
2020-07-26 17:09:05 +08:00
)
type (
// SubOption defines the method to customize a Subscriber.
SubOption func(sub *Subscriber)
2020-07-26 17:09:05 +08:00
// A Subscriber is used to subscribe the given key on a etcd cluster.
2020-07-26 17:09:05 +08:00
Subscriber struct {
endpoints []string
exclusive bool
items *container
2020-07-26 17:09:05 +08:00
}
)
// NewSubscriber returns a Subscriber.
// endpoints is the hosts of the etcd cluster.
// key is the key to subscribe.
// opts are used to customize the Subscriber.
2020-07-30 22:56:39 +08:00
func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscriber, error) {
sub := &Subscriber{
endpoints: endpoints,
}
2020-07-26 17:09:05 +08:00
for _, opt := range opts {
opt(sub)
2020-07-26 17:09:05 +08:00
}
sub.items = newContainer(sub.exclusive)
2020-07-26 17:09:05 +08:00
2020-07-30 22:56:39 +08:00
if err := internal.GetRegistry().Monitor(endpoints, key, sub.items); err != nil {
return nil, err
}
2020-07-26 17:09:05 +08:00
2020-07-30 22:56:39 +08:00
return sub, nil
2020-07-26 17:09:05 +08:00
}
// AddListener adds listener to s.
2020-07-31 16:20:13 +08:00
func (s *Subscriber) AddListener(listener func()) {
s.items.addListener(listener)
}
// Values returns all the subscription values.
2020-07-26 17:09:05 +08:00
func (s *Subscriber) Values() []string {
return s.items.getValues()
}
// Exclusive means that key value can only be 1:1,
2020-07-26 17:09:05 +08:00
// which means later added value will remove the keys associated with the same value previously.
func Exclusive() SubOption {
return func(sub *Subscriber) {
sub.exclusive = true
}
}
// WithSubEtcdAccount provides the etcd username/password.
func WithSubEtcdAccount(user, pass string) SubOption {
return func(sub *Subscriber) {
RegisterAccount(sub.endpoints, user, pass)
}
}
// WithSubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) SubOption {
return func(sub *Subscriber) {
logx.Must(RegisterTLS(sub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
2020-07-26 17:09:05 +08:00
}
}
type container struct {
exclusive bool
values map[string][]string
mapping map[string]string
2020-07-30 22:56:39 +08:00
snapshot atomic.Value
dirty *syncx.AtomicBool
2020-07-31 16:20:13 +08:00
listeners []func()
2020-07-26 17:09:05 +08:00
lock sync.Mutex
}
func newContainer(exclusive bool) *container {
return &container{
exclusive: exclusive,
values: make(map[string][]string),
mapping: make(map[string]string),
2020-07-30 22:56:39 +08:00
dirty: syncx.ForAtomicBool(true),
2020-07-26 17:09:05 +08:00
}
}
func (c *container) OnAdd(kv internal.KV) {
c.addKv(kv.Key, kv.Val)
2020-07-31 16:20:13 +08:00
c.notifyChange()
2020-07-26 17:09:05 +08:00
}
func (c *container) OnDelete(kv internal.KV) {
c.removeKey(kv.Key)
2020-07-31 16:20:13 +08:00
c.notifyChange()
2020-07-26 17:09:05 +08:00
}
// addKv adds the kv, returns if there are already other keys associate with the value
func (c *container) addKv(key, value string) ([]string, bool) {
c.lock.Lock()
defer c.lock.Unlock()
2020-07-30 22:56:39 +08:00
c.dirty.Set(true)
2020-07-26 17:09:05 +08:00
keys := c.values[value]
previous := append([]string(nil), keys...)
early := len(keys) > 0
if c.exclusive && early {
for _, each := range keys {
c.doRemoveKey(each)
}
}
c.values[value] = append(c.values[value], key)
c.mapping[key] = value
if early {
return previous, true
}
2021-02-09 13:50:21 +08:00
return nil, false
2020-07-26 17:09:05 +08:00
}
2020-07-31 16:20:13 +08:00
func (c *container) addListener(listener func()) {
c.lock.Lock()
c.listeners = append(c.listeners, listener)
c.lock.Unlock()
}
2020-07-26 17:09:05 +08:00
func (c *container) doRemoveKey(key string) {
server, ok := c.mapping[key]
if !ok {
return
}
delete(c.mapping, key)
keys := c.values[server]
remain := keys[:0]
for _, k := range keys {
if k != key {
remain = append(remain, k)
}
}
if len(remain) > 0 {
c.values[server] = remain
} else {
delete(c.values, server)
}
}
func (c *container) getValues() []string {
2020-07-30 22:56:39 +08:00
if !c.dirty.True() {
return c.snapshot.Load().([]string)
}
2020-07-26 17:09:05 +08:00
c.lock.Lock()
defer c.lock.Unlock()
2020-07-30 22:56:39 +08:00
var vals []string
2020-07-26 17:09:05 +08:00
for each := range c.values {
2020-07-30 22:56:39 +08:00
vals = append(vals, each)
2020-07-26 17:09:05 +08:00
}
2020-07-30 22:56:39 +08:00
c.snapshot.Store(vals)
c.dirty.Set(false)
return vals
2020-07-26 17:09:05 +08:00
}
2020-07-31 16:20:13 +08:00
func (c *container) notifyChange() {
c.lock.Lock()
listeners := append(([]func())(nil), c.listeners...)
c.lock.Unlock()
for _, listener := range listeners {
listener()
}
}
2020-07-26 17:09:05 +08:00
// removeKey removes the kv, returns true if there are still other keys associate with the value
func (c *container) removeKey(key string) {
c.lock.Lock()
defer c.lock.Unlock()
2020-07-30 22:56:39 +08:00
c.dirty.Set(true)
2020-07-26 17:09:05 +08:00
c.doRemoveKey(key)
}