diff --git a/zrpc/resolver/internal/kubebuilder.go b/zrpc/resolver/internal/kubebuilder.go index 04616b56..63900377 100644 --- a/zrpc/resolver/internal/kubebuilder.go +++ b/zrpc/resolver/internal/kubebuilder.go @@ -3,7 +3,6 @@ package internal import ( "context" "fmt" - "runtime/debug" "time" "github.com/zeromicro/go-zero/core/logx" @@ -23,18 +22,18 @@ const ( type kubeResolver struct { cc resolver.ClientConn - stopCh chan struct{} inf informers.SharedInformerFactory + stopCh chan struct{} } +func (r *kubeResolver) ResolveNow(_ resolver.ResolveNowOptions) {} + func (r *kubeResolver) start() { threading.GoSafe(func() { r.inf.Start(r.stopCh) }) } -func (r *kubeResolver) ResolveNow(_ resolver.ResolveNowOptions) {} - func (r *kubeResolver) Close() { close(r.stopCh) } @@ -43,8 +42,6 @@ type kubeBuilder struct{} func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { - logx.Debugf("target: %s, callstack: %s, cc ptr: %p", target, string(debug.Stack()), cc) - svc, err := kube.ParseTarget(target) if err != nil { return nil, err @@ -61,10 +58,13 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, } if svc.Port == 0 { - endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{}) + // getting endpoints is only to get the port + endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get( + context.Background(), svc.Name, v1.GetOptions{}) if err != nil { return nil, err } + svc.Port = int(endpoints.Subsets[0].Ports[0].Port) } @@ -93,22 +93,24 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, return nil, err } - endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, v1.GetOptions{}) + // get the initial endpoints, cannot use the previous endpoints, + // because the endpoints may be updated before/after the informer is started. + endpoints, err := cs.CoreV1().Endpoints(svc.Namespace).Get( + context.Background(), svc.Name, v1.GetOptions{}) if err != nil { return nil, err } handler.Update(endpoints) - resolver := &kubeResolver{ + r := &kubeResolver{ cc: cc, - stopCh: make(chan struct{}), inf: inf, + stopCh: make(chan struct{}), } + r.start() - resolver.start() - - return resolver, nil + return r, nil } func (b *kubeBuilder) Scheme() string {