mirror of
https://github.com/zeromicro/go-zero.git
synced 2025-01-24 01:30:25 +08:00
c3756a8f1c
* fix: etcd publisher reconnecting problem * chore: fix wrong call
202 lines
4.5 KiB
Go
202 lines
4.5 KiB
Go
package discov
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/zeromicro/go-zero/core/discov/internal"
|
|
"github.com/zeromicro/go-zero/core/lang"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"github.com/zeromicro/go-zero/core/proc"
|
|
"github.com/zeromicro/go-zero/core/syncx"
|
|
"github.com/zeromicro/go-zero/core/threading"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
type (
|
|
// PubOption defines the method to customize a Publisher.
|
|
PubOption func(client *Publisher)
|
|
|
|
// A Publisher can be used to publish the value to an etcd cluster on the given key.
|
|
Publisher struct {
|
|
endpoints []string
|
|
key string
|
|
fullKey string
|
|
id int64
|
|
value string
|
|
lease clientv3.LeaseID
|
|
quit *syncx.DoneChan
|
|
pauseChan chan lang.PlaceholderType
|
|
resumeChan chan lang.PlaceholderType
|
|
}
|
|
)
|
|
|
|
// NewPublisher returns a Publisher.
|
|
// endpoints is the hosts of the etcd cluster.
|
|
// key:value are a pair to be published.
|
|
// opts are used to customize the Publisher.
|
|
func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Publisher {
|
|
publisher := &Publisher{
|
|
endpoints: endpoints,
|
|
key: key,
|
|
value: value,
|
|
quit: syncx.NewDoneChan(),
|
|
pauseChan: make(chan lang.PlaceholderType),
|
|
resumeChan: make(chan lang.PlaceholderType),
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(publisher)
|
|
}
|
|
|
|
return publisher
|
|
}
|
|
|
|
// KeepAlive keeps key:value alive.
|
|
func (p *Publisher) KeepAlive() error {
|
|
cli, err := p.doRegister()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
proc.AddWrapUpListener(func() {
|
|
p.Stop()
|
|
})
|
|
|
|
return p.keepAliveAsync(cli)
|
|
}
|
|
|
|
// Pause pauses the renewing of key:value.
|
|
func (p *Publisher) Pause() {
|
|
p.pauseChan <- lang.Placeholder
|
|
}
|
|
|
|
// Resume resumes the renewing of key:value.
|
|
func (p *Publisher) Resume() {
|
|
p.resumeChan <- lang.Placeholder
|
|
}
|
|
|
|
// Stop stops the renewing and revokes the registration.
|
|
func (p *Publisher) Stop() {
|
|
p.quit.Close()
|
|
}
|
|
|
|
func (p *Publisher) doKeepAlive() error {
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
select {
|
|
case <-p.quit.Done():
|
|
return nil
|
|
default:
|
|
cli, err := p.doRegister()
|
|
if err != nil {
|
|
logx.Errorf("etcd publisher doRegister: %s", err.Error())
|
|
break
|
|
}
|
|
|
|
if err := p.keepAliveAsync(cli); err != nil {
|
|
logx.Errorf("etcd publisher keepAliveAsync: %s", err.Error())
|
|
break
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Publisher) doRegister() (internal.EtcdClient, error) {
|
|
cli, err := internal.GetRegistry().GetConn(p.endpoints)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p.lease, err = p.register(cli)
|
|
return cli, err
|
|
}
|
|
|
|
func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
|
|
ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
threading.GoSafe(func() {
|
|
for {
|
|
select {
|
|
case _, ok := <-ch:
|
|
if !ok {
|
|
p.revoke(cli)
|
|
if err := p.doKeepAlive(); err != nil {
|
|
logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
|
|
}
|
|
return
|
|
}
|
|
case <-p.pauseChan:
|
|
logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
|
|
p.revoke(cli)
|
|
select {
|
|
case <-p.resumeChan:
|
|
if err := p.doKeepAlive(); err != nil {
|
|
logx.Errorf("etcd publisher KeepAlive: %s", err.Error())
|
|
}
|
|
return
|
|
case <-p.quit.Done():
|
|
return
|
|
}
|
|
case <-p.quit.Done():
|
|
p.revoke(cli)
|
|
return
|
|
}
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
|
|
resp, err := client.Grant(client.Ctx(), TimeToLive)
|
|
if err != nil {
|
|
return clientv3.NoLease, err
|
|
}
|
|
|
|
lease := resp.ID
|
|
if p.id > 0 {
|
|
p.fullKey = makeEtcdKey(p.key, p.id)
|
|
} else {
|
|
p.fullKey = makeEtcdKey(p.key, int64(lease))
|
|
}
|
|
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
|
|
|
|
return lease, err
|
|
}
|
|
|
|
func (p *Publisher) revoke(cli internal.EtcdClient) {
|
|
if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
|
|
logx.Errorf("etcd publisher revoke: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
// WithId customizes a Publisher with the id.
|
|
func WithId(id int64) PubOption {
|
|
return func(publisher *Publisher) {
|
|
publisher.id = id
|
|
}
|
|
}
|
|
|
|
// WithPubEtcdAccount provides the etcd username/password.
|
|
func WithPubEtcdAccount(user, pass string) PubOption {
|
|
return func(pub *Publisher) {
|
|
RegisterAccount(pub.endpoints, user, pass)
|
|
}
|
|
}
|
|
|
|
// WithPubEtcdTLS provides the etcd CertFile/CertKeyFile/CACertFile.
|
|
func WithPubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify bool) PubOption {
|
|
return func(pub *Publisher) {
|
|
logx.Must(RegisterTLS(pub.endpoints, certFile, certKeyFile, caFile, insecureSkipVerify))
|
|
}
|
|
}
|