From 0ee7654407303aa2251aee3f0dd3c89834e08c94 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Wed, 4 Aug 2021 18:45:05 +0800 Subject: [PATCH] fix #792 (#873) --- zrpc/internal/client.go | 2 +- zrpc/internal/codes/accept.go | 13 ++++++++ zrpc/internal/rpcserver.go | 2 ++ .../serverinterceptors/breakerinterceptor.go | 33 +++++++++++++++++++ .../breakerinterceptor_test.go | 31 +++++++++++++++++ 5 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 zrpc/internal/serverinterceptors/breakerinterceptor.go create mode 100644 zrpc/internal/serverinterceptors/breakerinterceptor_test.go diff --git a/zrpc/internal/client.go b/zrpc/internal/client.go index 26f76748..96111782 100644 --- a/zrpc/internal/client.go +++ b/zrpc/internal/client.go @@ -69,8 +69,8 @@ func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption { WithUnaryClientInterceptors( clientinterceptors.TracingInterceptor, clientinterceptors.DurationInterceptor, - clientinterceptors.BreakerInterceptor, clientinterceptors.PrometheusInterceptor, + clientinterceptors.BreakerInterceptor, clientinterceptors.TimeoutInterceptor(cliOpts.Timeout), ), } diff --git a/zrpc/internal/codes/accept.go b/zrpc/internal/codes/accept.go index 0ecb1275..106abbe8 100644 --- a/zrpc/internal/codes/accept.go +++ b/zrpc/internal/codes/accept.go @@ -1,6 +1,8 @@ package codes import ( + "context" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -10,6 +12,17 @@ func Acceptable(err error) bool { switch status.Code(err) { case codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss: return false + case codes.Unknown: + return acceptableUnknown(err) + default: + return true + } +} + +func acceptableUnknown(err error) bool { + switch err { + case context.DeadlineExceeded: + return false default: return true } diff --git a/zrpc/internal/rpcserver.go b/zrpc/internal/rpcserver.go index 9d16901d..3b5c9a3e 100644 --- a/zrpc/internal/rpcserver.go +++ b/zrpc/internal/rpcserver.go @@ -58,10 +58,12 @@ func (s *rpcServer) Start(register RegisterFn) error { serverinterceptors.UnaryCrashInterceptor(), serverinterceptors.UnaryStatInterceptor(s.metrics), serverinterceptors.UnaryPrometheusInterceptor(), + serverinterceptors.UnaryBreakerInterceptor(), } unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...) streamInterceptors := []grpc.StreamServerInterceptor{ serverinterceptors.StreamCrashInterceptor, + serverinterceptors.StreamBreakerInterceptor, } streamInterceptors = append(streamInterceptors, s.streamInterceptors...) options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...), diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor.go b/zrpc/internal/serverinterceptors/breakerinterceptor.go new file mode 100644 index 00000000..7a87f253 --- /dev/null +++ b/zrpc/internal/serverinterceptors/breakerinterceptor.go @@ -0,0 +1,33 @@ +package serverinterceptors + +import ( + "context" + + "github.com/tal-tech/go-zero/core/breaker" + "github.com/tal-tech/go-zero/zrpc/internal/codes" + "google.golang.org/grpc" +) + +// StreamBreakerInterceptor is an interceptor that acts as a circuit breaker. +func StreamBreakerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, + handler grpc.StreamHandler) (err error) { + breakerName := info.FullMethod + return breaker.DoWithAcceptable(breakerName, func() error { + return handler(srv, stream) + }, codes.Acceptable) +} + +// UnaryBreakerInterceptor is an interceptor that acts as a circuit breaker. +func UnaryBreakerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler) (resp interface{}, err error) { + breakerName := info.FullMethod + err = breaker.DoWithAcceptable(breakerName, func() error { + var err error + resp, err = handler(ctx, req) + return err + }, codes.Acceptable) + + return resp, err + } +} diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor_test.go b/zrpc/internal/serverinterceptors/breakerinterceptor_test.go new file mode 100644 index 00000000..222b929a --- /dev/null +++ b/zrpc/internal/serverinterceptors/breakerinterceptor_test.go @@ -0,0 +1,31 @@ +package serverinterceptors + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestStreamBreakerInterceptor(t *testing.T) { + err := StreamBreakerInterceptor(nil, nil, &grpc.StreamServerInfo{ + FullMethod: "any", + }, func( + srv interface{}, stream grpc.ServerStream) error { + return status.New(codes.DeadlineExceeded, "any").Err() + }) + assert.NotNil(t, err) +} + +func TestUnaryBreakerInterceptor(t *testing.T) { + interceptor := UnaryBreakerInterceptor() + _, err := interceptor(nil, nil, &grpc.UnaryServerInfo{ + FullMethod: "any", + }, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, status.New(codes.DeadlineExceeded, "any").Err() + }) + assert.NotNil(t, err) +}