2020-07-26 17:09:05 +08:00
|
|
|
package internal
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-12-17 13:28:19 +08:00
|
|
|
"errors"
|
2020-07-26 17:09:05 +08:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2022-01-04 15:51:32 +08:00
|
|
|
"github.com/zeromicro/go-zero/core/lang"
|
2025-01-29 00:32:21 +08:00
|
|
|
"github.com/zeromicro/go-zero/core/logc"
|
2025-01-22 14:01:18 +08:00
|
|
|
"github.com/zeromicro/go-zero/core/mathx"
|
2022-01-04 15:51:32 +08:00
|
|
|
"github.com/zeromicro/go-zero/core/syncx"
|
|
|
|
"github.com/zeromicro/go-zero/core/threading"
|
2025-01-22 14:01:18 +08:00
|
|
|
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
2020-07-26 17:09:05 +08:00
|
|
|
)
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
const coolDownDeviation = 0.05
|
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
var (
|
2021-02-27 23:56:18 +08:00
|
|
|
registry = Registry{
|
2020-07-26 17:09:05 +08:00
|
|
|
clusters: make(map[string]*cluster),
|
|
|
|
}
|
2025-01-22 14:01:18 +08:00
|
|
|
connManager = syncx.NewResourceManager()
|
|
|
|
coolDownUnstable = mathx.NewUnstable(coolDownDeviation)
|
|
|
|
errClosed = errors.New("etcd monitor chan has been closed")
|
2020-07-26 17:09:05 +08:00
|
|
|
)
|
|
|
|
|
2021-02-27 23:56:18 +08:00
|
|
|
// A Registry is a registry that manages the etcd client connections.
|
2020-07-26 17:09:05 +08:00
|
|
|
type Registry struct {
|
|
|
|
clusters map[string]*cluster
|
2024-07-27 16:27:05 +08:00
|
|
|
lock sync.RWMutex
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2021-02-27 23:56:18 +08:00
|
|
|
// GetRegistry returns a global Registry.
|
2020-07-26 17:09:05 +08:00
|
|
|
func GetRegistry() *Registry {
|
2021-02-27 23:56:18 +08:00
|
|
|
return ®istry
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetConn returns an etcd client connection associated with given endpoints.
|
|
|
|
func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
|
2025-01-22 14:01:18 +08:00
|
|
|
c, _ := r.getOrCreateCluster(endpoints)
|
2021-11-28 20:08:18 +08:00
|
|
|
return c.getClient()
|
2021-02-27 23:56:18 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// Monitor monitors the key on given etcd endpoints, notify with the given UpdateListener.
|
2025-01-22 14:01:18 +08:00
|
|
|
func (r *Registry) Monitor(endpoints []string, key string, exactMatch bool, l UpdateListener) error {
|
|
|
|
wkey := watchKey{
|
|
|
|
key: key,
|
|
|
|
exactMatch: exactMatch,
|
|
|
|
}
|
|
|
|
|
|
|
|
c, exists := r.getOrCreateCluster(endpoints)
|
2021-11-28 20:08:18 +08:00
|
|
|
// if exists, the existing values should be updated to the listener.
|
|
|
|
if exists {
|
2025-01-22 14:01:18 +08:00
|
|
|
c.lock.Lock()
|
|
|
|
watcher, ok := c.watchers[wkey]
|
|
|
|
if ok {
|
|
|
|
watcher.listeners = append(watcher.listeners, l)
|
|
|
|
}
|
|
|
|
c.lock.Unlock()
|
|
|
|
|
|
|
|
if ok {
|
|
|
|
kvs := c.getCurrent(wkey)
|
|
|
|
for _, kv := range kvs {
|
|
|
|
l.OnAdd(kv)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-11-28 20:08:18 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
return c.monitor(wkey, l)
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (r *Registry) Unmonitor(endpoints []string, key string, exactMatch bool, l UpdateListener) {
|
2025-01-22 13:36:13 +08:00
|
|
|
c, exists := r.getCluster(endpoints)
|
|
|
|
if !exists {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
wkey := watchKey{
|
|
|
|
key: key,
|
|
|
|
exactMatch: exactMatch,
|
|
|
|
}
|
|
|
|
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
|
|
watcher, ok := c.watchers[wkey]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, listener := range watcher.listeners {
|
|
|
|
if listener == l {
|
|
|
|
watcher.listeners = append(watcher.listeners[:i], watcher.listeners[i+1:]...)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(watcher.listeners) == 0 {
|
|
|
|
if watcher.cancel != nil {
|
|
|
|
watcher.cancel()
|
|
|
|
}
|
|
|
|
delete(c.watchers, wkey)
|
|
|
|
}
|
2025-01-22 13:36:13 +08:00
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (r *Registry) getCluster(endpoints []string) (*cluster, bool) {
|
2020-07-26 17:09:05 +08:00
|
|
|
clusterKey := getClusterKey(endpoints)
|
2025-01-22 14:01:18 +08:00
|
|
|
|
2024-07-27 16:27:05 +08:00
|
|
|
r.lock.RLock()
|
2025-01-22 14:01:18 +08:00
|
|
|
c, ok := r.clusters[clusterKey]
|
2024-07-27 16:27:05 +08:00
|
|
|
r.lock.RUnlock()
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
return c, ok
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Registry) getOrCreateCluster(endpoints []string) (c *cluster, exists bool) {
|
|
|
|
c, exists = r.getCluster(endpoints)
|
2021-11-28 20:08:18 +08:00
|
|
|
if !exists {
|
2025-01-22 14:01:18 +08:00
|
|
|
clusterKey := getClusterKey(endpoints)
|
|
|
|
|
2024-07-27 16:27:05 +08:00
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
2025-01-22 14:01:18 +08:00
|
|
|
|
2024-07-27 16:27:05 +08:00
|
|
|
// double-check locking
|
|
|
|
c, exists = r.clusters[clusterKey]
|
|
|
|
if !exists {
|
|
|
|
c = newCluster(endpoints)
|
|
|
|
r.clusters[clusterKey] = c
|
|
|
|
}
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2021-11-28 20:08:18 +08:00
|
|
|
return
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
type (
|
|
|
|
watchKey struct {
|
|
|
|
key string
|
|
|
|
exactMatch bool
|
|
|
|
}
|
|
|
|
|
|
|
|
watchValue struct {
|
|
|
|
listeners []UpdateListener
|
|
|
|
values map[string]string
|
|
|
|
cancel context.CancelFunc
|
|
|
|
}
|
|
|
|
|
|
|
|
cluster struct {
|
|
|
|
endpoints []string
|
|
|
|
key string
|
|
|
|
watchers map[watchKey]*watchValue
|
|
|
|
watchGroup *threading.RoutineGroup
|
|
|
|
done chan lang.PlaceholderType
|
|
|
|
lock sync.RWMutex
|
|
|
|
}
|
|
|
|
)
|
2020-07-26 17:09:05 +08:00
|
|
|
|
|
|
|
func newCluster(endpoints []string) *cluster {
|
|
|
|
return &cluster{
|
|
|
|
endpoints: endpoints,
|
|
|
|
key: getClusterKey(endpoints),
|
2025-01-22 14:01:18 +08:00
|
|
|
watchers: make(map[watchKey]*watchValue),
|
2020-07-26 17:09:05 +08:00
|
|
|
watchGroup: threading.NewRoutineGroup(),
|
|
|
|
done: make(chan lang.PlaceholderType),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) addListener(key watchKey, l UpdateListener) {
|
|
|
|
c.lock.Lock()
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
|
|
watcher, ok := c.watchers[key]
|
|
|
|
if ok {
|
|
|
|
watcher.listeners = append(watcher.listeners, l)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
val := newWatchValue()
|
|
|
|
val.listeners = []UpdateListener{l}
|
|
|
|
c.watchers[key] = val
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *cluster) getClient() (EtcdClient, error) {
|
|
|
|
val, err := connManager.GetResource(c.key, func() (io.Closer, error) {
|
|
|
|
return c.newClient()
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return val.(EtcdClient), nil
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) getCurrent(key watchKey) []KV {
|
2024-07-25 17:24:05 +08:00
|
|
|
c.lock.RLock()
|
|
|
|
defer c.lock.RUnlock()
|
2021-11-28 20:08:18 +08:00
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
watcher, ok := c.watchers[key]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-11-28 20:08:18 +08:00
|
|
|
var kvs []KV
|
2025-01-22 14:01:18 +08:00
|
|
|
for k, v := range watcher.values {
|
2021-11-28 20:08:18 +08:00
|
|
|
kvs = append(kvs, KV{
|
|
|
|
Key: k,
|
|
|
|
Val: v,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return kvs
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) handleChanges(key watchKey, kvs []KV) {
|
2020-07-26 17:09:05 +08:00
|
|
|
c.lock.Lock()
|
2025-01-22 14:01:18 +08:00
|
|
|
watcher, ok := c.watchers[key]
|
2020-07-26 17:09:05 +08:00
|
|
|
if !ok {
|
2025-01-22 14:01:18 +08:00
|
|
|
c.lock.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
listeners := append([]UpdateListener(nil), watcher.listeners...)
|
|
|
|
// watcher.values cannot be nil
|
|
|
|
vals := watcher.values
|
|
|
|
newVals := make(map[string]string, len(kvs)+len(vals))
|
|
|
|
for _, kv := range kvs {
|
|
|
|
newVals[kv.Key] = kv.Val
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
2025-01-22 14:01:18 +08:00
|
|
|
add, remove := calculateChanges(vals, newVals)
|
|
|
|
watcher.values = newVals
|
2020-07-26 17:09:05 +08:00
|
|
|
c.lock.Unlock()
|
|
|
|
|
|
|
|
for _, kv := range add {
|
|
|
|
for _, l := range listeners {
|
|
|
|
l.OnAdd(kv)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, kv := range remove {
|
|
|
|
for _, l := range listeners {
|
|
|
|
l.OnDelete(kv)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-29 00:32:21 +08:00
|
|
|
func (c *cluster) handleWatchEvents(ctx context.Context, key watchKey, events []*clientv3.Event) {
|
2024-07-25 17:24:05 +08:00
|
|
|
c.lock.RLock()
|
2025-01-22 14:01:18 +08:00
|
|
|
watcher, ok := c.watchers[key]
|
|
|
|
if !ok {
|
|
|
|
c.lock.RUnlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
listeners := append([]UpdateListener(nil), watcher.listeners...)
|
2024-07-25 17:24:05 +08:00
|
|
|
c.lock.RUnlock()
|
2020-07-26 17:09:05 +08:00
|
|
|
|
|
|
|
for _, ev := range events {
|
|
|
|
switch ev.Type {
|
|
|
|
case clientv3.EventTypePut:
|
|
|
|
c.lock.Lock()
|
2025-01-22 14:01:18 +08:00
|
|
|
watcher.values[string(ev.Kv.Key)] = string(ev.Kv.Value)
|
2020-07-26 17:09:05 +08:00
|
|
|
c.lock.Unlock()
|
|
|
|
for _, l := range listeners {
|
|
|
|
l.OnAdd(KV{
|
|
|
|
Key: string(ev.Kv.Key),
|
|
|
|
Val: string(ev.Kv.Value),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
case clientv3.EventTypeDelete:
|
2022-07-01 23:07:25 +08:00
|
|
|
c.lock.Lock()
|
2025-01-22 14:01:18 +08:00
|
|
|
delete(watcher.values, string(ev.Kv.Key))
|
2022-07-01 23:07:25 +08:00
|
|
|
c.lock.Unlock()
|
2020-07-26 17:09:05 +08:00
|
|
|
for _, l := range listeners {
|
|
|
|
l.OnDelete(KV{
|
|
|
|
Key: string(ev.Kv.Key),
|
|
|
|
Val: string(ev.Kv.Value),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
default:
|
2025-01-29 00:32:21 +08:00
|
|
|
logc.Errorf(ctx, "Unknown event type: %v", ev.Type)
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) load(cli EtcdClient, key watchKey) int64 {
|
2020-07-26 17:09:05 +08:00
|
|
|
var resp *clientv3.GetResponse
|
|
|
|
for {
|
|
|
|
var err error
|
2025-01-22 14:01:18 +08:00
|
|
|
ctx, cancel := context.WithTimeout(cli.Ctx(), RequestTimeout)
|
|
|
|
if key.exactMatch {
|
|
|
|
resp, err = cli.Get(ctx, key.key)
|
2024-08-28 14:47:52 +08:00
|
|
|
} else {
|
2025-01-22 14:01:18 +08:00
|
|
|
resp, err = cli.Get(ctx, makeKeyPrefix(key.key), clientv3.WithPrefix())
|
2024-08-28 14:47:52 +08:00
|
|
|
}
|
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
cancel()
|
|
|
|
if err == nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2025-01-29 00:32:21 +08:00
|
|
|
logc.Errorf(cli.Ctx(), "%s, key: %s, exactMatch: %t", err.Error(), key.key, key.exactMatch)
|
2025-01-22 14:01:18 +08:00
|
|
|
time.Sleep(coolDownUnstable.AroundDuration(coolDownInterval))
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
var kvs []KV
|
|
|
|
for _, ev := range resp.Kvs {
|
|
|
|
kvs = append(kvs, KV{
|
|
|
|
Key: string(ev.Key),
|
|
|
|
Val: string(ev.Value),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
c.handleChanges(key, kvs)
|
2022-08-29 08:35:31 +08:00
|
|
|
|
|
|
|
return resp.Header.Revision
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) monitor(key watchKey, l UpdateListener) error {
|
|
|
|
cli, err := c.getClient()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2025-01-22 13:38:56 +08:00
|
|
|
}
|
2020-07-26 17:09:05 +08:00
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
c.addListener(key, l)
|
|
|
|
rev := c.load(cli, key)
|
|
|
|
c.watchGroup.Run(func() {
|
|
|
|
c.watch(cli, key, rev)
|
2025-01-22 13:36:13 +08:00
|
|
|
})
|
2025-01-22 14:01:18 +08:00
|
|
|
|
|
|
|
return nil
|
2025-01-22 13:36:13 +08:00
|
|
|
}
|
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
func (c *cluster) newClient() (EtcdClient, error) {
|
|
|
|
cli, err := NewClient(c.endpoints)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
go c.watchConnState(cli)
|
|
|
|
|
|
|
|
return cli, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *cluster) reload(cli EtcdClient) {
|
|
|
|
c.lock.Lock()
|
2025-01-22 14:01:18 +08:00
|
|
|
// cancel the previous watches
|
2020-07-26 17:09:05 +08:00
|
|
|
close(c.done)
|
|
|
|
c.watchGroup.Wait()
|
2025-01-22 14:01:18 +08:00
|
|
|
var keys []watchKey
|
|
|
|
for wk, wval := range c.watchers {
|
|
|
|
keys = append(keys, wk)
|
|
|
|
if wval.cancel != nil {
|
|
|
|
wval.cancel()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
c.done = make(chan lang.PlaceholderType)
|
|
|
|
c.watchGroup = threading.NewRoutineGroup()
|
|
|
|
c.lock.Unlock()
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
// start new watches
|
2020-07-26 17:09:05 +08:00
|
|
|
for _, key := range keys {
|
|
|
|
k := key
|
|
|
|
c.watchGroup.Run(func() {
|
2022-08-29 08:35:31 +08:00
|
|
|
rev := c.load(cli, k)
|
|
|
|
c.watch(cli, k, rev)
|
2020-07-26 17:09:05 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) watch(cli EtcdClient, key watchKey, rev int64) {
|
2021-06-17 18:46:16 +08:00
|
|
|
for {
|
2025-01-22 14:01:18 +08:00
|
|
|
err := c.watchStream(cli, key, rev)
|
2023-12-17 13:28:19 +08:00
|
|
|
if err == nil {
|
2021-06-17 18:46:16 +08:00
|
|
|
return
|
|
|
|
}
|
2023-12-17 13:44:55 +08:00
|
|
|
|
|
|
|
if rev != 0 && errors.Is(err, rpctypes.ErrCompacted) {
|
2025-01-29 00:32:21 +08:00
|
|
|
logc.Errorf(cli.Ctx(), "etcd watch stream has been compacted, try to reload, rev %d", rev)
|
2023-12-17 13:28:19 +08:00
|
|
|
rev = c.load(cli, key)
|
|
|
|
}
|
2023-12-17 13:44:55 +08:00
|
|
|
|
|
|
|
// log the error and retry
|
2025-01-29 00:32:21 +08:00
|
|
|
logc.Error(cli.Ctx(), err)
|
2021-06-17 18:46:16 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) watchStream(cli EtcdClient, key watchKey, rev int64) error {
|
|
|
|
ctx, rch := c.setupWatch(cli, key, rev)
|
2024-08-28 14:47:52 +08:00
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case wresp, ok := <-rch:
|
|
|
|
if !ok {
|
2023-12-17 13:44:55 +08:00
|
|
|
return errClosed
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
if wresp.Canceled {
|
2023-12-17 13:44:55 +08:00
|
|
|
return fmt.Errorf("etcd monitor chan has been canceled, error: %w", wresp.Err())
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
if wresp.Err() != nil {
|
2023-12-17 13:44:55 +08:00
|
|
|
return fmt.Errorf("etcd monitor chan error: %w", wresp.Err())
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2025-01-29 00:32:21 +08:00
|
|
|
c.handleWatchEvents(ctx, key, wresp.Events)
|
2025-01-22 13:38:56 +08:00
|
|
|
case <-ctx.Done():
|
|
|
|
return nil
|
2025-01-22 14:01:18 +08:00
|
|
|
case <-c.done:
|
|
|
|
return nil
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.Context, clientv3.WatchChan) {
|
|
|
|
var (
|
|
|
|
rch clientv3.WatchChan
|
|
|
|
ops []clientv3.OpOption
|
|
|
|
wkey = key.key
|
|
|
|
)
|
|
|
|
|
|
|
|
if !key.exactMatch {
|
|
|
|
wkey = makeKeyPrefix(key.key)
|
|
|
|
ops = append(ops, clientv3.WithPrefix())
|
|
|
|
}
|
|
|
|
if rev != 0 {
|
|
|
|
ops = append(ops, clientv3.WithRev(rev+1))
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(cli.Ctx())
|
|
|
|
if watcher, ok := c.watchers[key]; ok {
|
|
|
|
watcher.cancel = cancel
|
|
|
|
} else {
|
|
|
|
val := newWatchValue()
|
|
|
|
val.cancel = cancel
|
|
|
|
|
|
|
|
c.lock.Lock()
|
|
|
|
c.watchers[key] = val
|
|
|
|
c.lock.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
rch = cli.Watch(clientv3.WithRequireLeader(ctx), wkey, ops...)
|
|
|
|
|
|
|
|
return ctx, rch
|
|
|
|
}
|
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
func (c *cluster) watchConnState(cli EtcdClient) {
|
|
|
|
watcher := newStateWatcher()
|
|
|
|
watcher.addListener(func() {
|
|
|
|
go c.reload(cli)
|
|
|
|
})
|
|
|
|
watcher.watch(cli.ActiveConnection())
|
|
|
|
}
|
|
|
|
|
2021-02-27 23:56:18 +08:00
|
|
|
// DialClient dials an etcd cluster with given endpoints.
|
2020-07-26 17:09:05 +08:00
|
|
|
func DialClient(endpoints []string) (EtcdClient, error) {
|
2021-10-31 09:05:38 +08:00
|
|
|
cfg := clientv3.Config{
|
2023-05-06 12:39:19 +08:00
|
|
|
Endpoints: endpoints,
|
|
|
|
AutoSyncInterval: autoSyncInterval,
|
|
|
|
DialTimeout: DialTimeout,
|
|
|
|
RejectOldCluster: true,
|
|
|
|
PermitWithoutStream: true,
|
2021-10-31 09:05:38 +08:00
|
|
|
}
|
|
|
|
if account, ok := GetAccount(endpoints); ok {
|
|
|
|
cfg.Username = account.User
|
|
|
|
cfg.Password = account.Pass
|
|
|
|
}
|
2022-01-02 20:23:50 +08:00
|
|
|
if tlsCfg, ok := GetTLS(endpoints); ok {
|
|
|
|
cfg.TLS = tlsCfg
|
|
|
|
}
|
2021-10-31 09:05:38 +08:00
|
|
|
|
|
|
|
return clientv3.New(cfg)
|
2020-07-26 17:09:05 +08:00
|
|
|
}
|
|
|
|
|
2025-01-22 14:01:18 +08:00
|
|
|
func calculateChanges(oldVals, newVals map[string]string) (add, remove []KV) {
|
|
|
|
for k, v := range newVals {
|
|
|
|
if val, ok := oldVals[k]; !ok || v != val {
|
|
|
|
add = append(add, KV{
|
|
|
|
Key: k,
|
|
|
|
Val: v,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range oldVals {
|
|
|
|
if val, ok := newVals[k]; !ok || v != val {
|
|
|
|
remove = append(remove, KV{
|
|
|
|
Key: k,
|
|
|
|
Val: v,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return add, remove
|
|
|
|
}
|
|
|
|
|
2020-07-26 17:09:05 +08:00
|
|
|
func getClusterKey(endpoints []string) string {
|
|
|
|
sort.Strings(endpoints)
|
|
|
|
return strings.Join(endpoints, endpointsSeparator)
|
|
|
|
}
|
|
|
|
|
|
|
|
func makeKeyPrefix(key string) string {
|
|
|
|
return fmt.Sprintf("%s%c", key, Delimiter)
|
|
|
|
}
|
2025-01-22 14:01:18 +08:00
|
|
|
|
|
|
|
// NewClient returns a watchValue that make sure values are not nil.
|
|
|
|
func newWatchValue() *watchValue {
|
|
|
|
return &watchValue{
|
|
|
|
values: make(map[string]string),
|
|
|
|
}
|
|
|
|
}
|