go-zero/core/discov/publisher.go
2021-06-21 09:05:20 +08:00

154 lines
3.4 KiB
Go

package discov
import (
"github.com/tal-tech/go-zero/core/discov/internal"
"github.com/tal-tech/go-zero/core/lang"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/proc"
"github.com/tal-tech/go-zero/core/syncx"
"github.com/tal-tech/go-zero/core/threading"
clientv3 "go.etcd.io/etcd/client/v3"
)
type (
// PublisherOption defines the method to customize a Publisher.
PublisherOption func(client *Publisher)
// A Publisher can be used to publish the value to an etcd cluster on the given key.
Publisher struct {
endpoints []string
key string
fullKey string
id int64
value string
lease clientv3.LeaseID
quit *syncx.DoneChan
pauseChan chan lang.PlaceholderType
resumeChan chan lang.PlaceholderType
}
)
// NewPublisher returns a Publisher.
// endpoints is the hosts of the etcd cluster.
// key:value are a pair to be published.
// opts are used to customize the Publisher.
func NewPublisher(endpoints []string, key, value string, opts ...PublisherOption) *Publisher {
publisher := &Publisher{
endpoints: endpoints,
key: key,
value: value,
quit: syncx.NewDoneChan(),
pauseChan: make(chan lang.PlaceholderType),
resumeChan: make(chan lang.PlaceholderType),
}
for _, opt := range opts {
opt(publisher)
}
return publisher
}
// KeepAlive keeps key:value alive.
func (p *Publisher) KeepAlive() error {
cli, err := internal.GetRegistry().GetConn(p.endpoints)
if err != nil {
return err
}
p.lease, err = p.register(cli)
if err != nil {
return err
}
proc.AddWrapUpListener(func() {
p.Stop()
})
return p.keepAliveAsync(cli)
}
// Pause pauses the renewing of key:value.
func (p *Publisher) Pause() {
p.pauseChan <- lang.Placeholder
}
// Resume resumes the renewing of key:value.
func (p *Publisher) Resume() {
p.resumeChan <- lang.Placeholder
}
// Stop stops the renewing and revokes the registration.
func (p *Publisher) Stop() {
p.quit.Close()
}
func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
ch, err := cli.KeepAlive(cli.Ctx(), p.lease)
if err != nil {
return err
}
threading.GoSafe(func() {
for {
select {
case _, ok := <-ch:
if !ok {
p.revoke(cli)
if err := p.KeepAlive(); err != nil {
logx.Errorf("KeepAlive: %s", err.Error())
}
return
}
case <-p.pauseChan:
logx.Infof("paused etcd renew, key: %s, value: %s", p.key, p.value)
p.revoke(cli)
select {
case <-p.resumeChan:
if err := p.KeepAlive(); err != nil {
logx.Errorf("KeepAlive: %s", err.Error())
}
return
case <-p.quit.Done():
return
}
case <-p.quit.Done():
p.revoke(cli)
return
}
}
})
return nil
}
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
resp, err := client.Grant(client.Ctx(), TimeToLive)
if err != nil {
return clientv3.NoLease, err
}
lease := resp.ID
if p.id > 0 {
p.fullKey = makeEtcdKey(p.key, p.id)
} else {
p.fullKey = makeEtcdKey(p.key, int64(lease))
}
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
return lease, err
}
func (p *Publisher) revoke(cli internal.EtcdClient) {
if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
logx.Error(err)
}
}
// WithId customizes a Publisher with the id.
func WithId(id int64) PublisherOption {
return func(publisher *Publisher) {
publisher.id = id
}
}