From d371ab5479c41499c4dba2e8c1f5df4b68d098f3 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Thu, 18 Apr 2024 23:18:49 +0800 Subject: [PATCH] feat: use breaker with ctx to prevent deadline exceeded (#4091) Signed-off-by: kevin --- core/stores/mon/collection.go | 36 ++++++++--------- core/stores/mon/model.go | 40 ++++++++----------- core/stores/redis/breakerhook.go | 4 +- core/stores/sqlx/sqlconn.go | 10 ++--- core/stores/sqlx/stmt.go | 4 +- rest/httpc/service.go | 2 +- .../clientinterceptors/breakerinterceptor.go | 2 +- .../serverinterceptors/breakerinterceptor.go | 2 +- 8 files changed, 47 insertions(+), 53 deletions(-) diff --git a/core/stores/mon/collection.go b/core/stores/mon/collection.go index a909bb02..5deadbef 100644 --- a/core/stores/mon/collection.go +++ b/core/stores/mon/collection.go @@ -141,7 +141,7 @@ func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { starTime := timex.Now() defer func() { c.logDurationSimple(ctx, aggregate, starTime, err) @@ -161,7 +161,7 @@ func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.Writ endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, bulkWrite, startTime, err) @@ -181,7 +181,7 @@ func (c *decoratedCollection) CountDocuments(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, countDocuments, startTime, err) @@ -201,7 +201,7 @@ func (c *decoratedCollection) DeleteMany(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, deleteMany, startTime, err) @@ -221,7 +221,7 @@ func (c *decoratedCollection) DeleteOne(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, deleteOne, startTime, err, filter) @@ -241,7 +241,7 @@ func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, fi endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, distinct, startTime, err) @@ -261,7 +261,7 @@ func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, estimatedDocumentCount, startTime, err) @@ -281,7 +281,7 @@ func (c *decoratedCollection) Find(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, find, startTime, err, filter) @@ -301,7 +301,7 @@ func (c *decoratedCollection) FindOne(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, findOne, startTime, err, filter) @@ -322,7 +322,7 @@ func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, findOneAndDelete, startTime, err, filter) @@ -344,7 +344,7 @@ func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, findOneAndReplace, startTime, err, filter, replacement) @@ -365,7 +365,7 @@ func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter, upda endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, findOneAndUpdate, startTime, err, filter, update) @@ -386,7 +386,7 @@ func (c *decoratedCollection) InsertMany(ctx context.Context, documents []any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, insertMany, startTime, err) @@ -406,7 +406,7 @@ func (c *decoratedCollection) InsertOne(ctx context.Context, document any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, insertOne, startTime, err, document) @@ -426,7 +426,7 @@ func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter, replacemen endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, replaceOne, startTime, err, filter, replacement) @@ -446,7 +446,7 @@ func (c *decoratedCollection) UpdateByID(ctx context.Context, id, update any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, updateByID, startTime, err, id, update) @@ -466,7 +466,7 @@ func (c *decoratedCollection) UpdateMany(ctx context.Context, filter, update any endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDurationSimple(ctx, updateMany, startTime, err) @@ -486,7 +486,7 @@ func (c *decoratedCollection) UpdateOne(ctx context.Context, filter, update any, endSpan(span, err) }() - err = c.brk.DoWithAcceptable(func() error { + err = c.brk.DoWithAcceptableCtx(ctx, func() error { startTime := timex.Now() defer func() { c.logDuration(ctx, updateOne, startTime, err, filter, update) diff --git a/core/stores/mon/model.go b/core/stores/mon/model.go index e58df7c8..da1ba7ec 100644 --- a/core/stores/mon/model.go +++ b/core/stores/mon/model.go @@ -69,27 +69,21 @@ func newModel(name string, cli *mongo.Client, coll Collection, brk breaker.Break // StartSession starts a new session. func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, err error) { - err = m.brk.DoWithAcceptable(func() error { - starTime := timex.Now() - defer func() { - logDuration(context.Background(), m.name, startSession, starTime, err) - }() + starTime := timex.Now() + defer func() { + logDuration(context.Background(), m.name, startSession, starTime, err) + }() - session, sessionErr := m.cli.StartSession(opts...) - if sessionErr != nil { - return sessionErr - } + session, sessionErr := m.cli.StartSession(opts...) + if sessionErr != nil { + return nil, sessionErr + } - sess = &wrappedSession{ - Session: session, - name: m.name, - brk: m.brk, - } - - return nil - }, acceptable) - - return + return &wrappedSession{ + Session: session, + name: m.name, + brk: m.brk, + }, nil } // Aggregate executes an aggregation pipeline. @@ -184,7 +178,7 @@ func (w *wrappedSession) AbortTransaction(ctx context.Context) (err error) { endSpan(span, err) }() - return w.brk.DoWithAcceptable(func() error { + return w.brk.DoWithAcceptableCtx(ctx, func() error { starTime := timex.Now() defer func() { logDuration(ctx, w.name, abortTransaction, starTime, err) @@ -201,7 +195,7 @@ func (w *wrappedSession) CommitTransaction(ctx context.Context) (err error) { endSpan(span, err) }() - return w.brk.DoWithAcceptable(func() error { + return w.brk.DoWithAcceptableCtx(ctx, func() error { starTime := timex.Now() defer func() { logDuration(ctx, w.name, commitTransaction, starTime, err) @@ -222,7 +216,7 @@ func (w *wrappedSession) WithTransaction( endSpan(span, err) }() - err = w.brk.DoWithAcceptable(func() error { + err = w.brk.DoWithAcceptableCtx(ctx, func() error { starTime := timex.Now() defer func() { logDuration(ctx, w.name, withTransaction, starTime, err) @@ -243,7 +237,7 @@ func (w *wrappedSession) EndSession(ctx context.Context) { endSpan(span, err) }() - err = w.brk.DoWithAcceptable(func() error { + err = w.brk.DoWithAcceptableCtx(ctx, func() error { starTime := timex.Now() defer func() { logDuration(ctx, w.name, endSession, starTime, err) diff --git a/core/stores/redis/breakerhook.go b/core/stores/redis/breakerhook.go index 4d0a1bc0..c8bdce7d 100644 --- a/core/stores/redis/breakerhook.go +++ b/core/stores/redis/breakerhook.go @@ -26,7 +26,7 @@ func (h breakerHook) ProcessHook(next red.ProcessHook) red.ProcessHook { return next(ctx, cmd) } - return h.brk.DoWithAcceptable(func() error { + return h.brk.DoWithAcceptableCtx(ctx, func() error { return next(ctx, cmd) }, acceptable) } @@ -34,7 +34,7 @@ func (h breakerHook) ProcessHook(next red.ProcessHook) red.ProcessHook { func (h breakerHook) ProcessPipelineHook(next red.ProcessPipelineHook) red.ProcessPipelineHook { return func(ctx context.Context, cmds []red.Cmder) error { - return h.brk.DoWithAcceptable(func() error { + return h.brk.DoWithAcceptableCtx(ctx, func() error { return next(ctx, cmds) }, acceptable) } diff --git a/core/stores/sqlx/sqlconn.go b/core/stores/sqlx/sqlconn.go index 4dde5d43..ff676b45 100644 --- a/core/stores/sqlx/sqlconn.go +++ b/core/stores/sqlx/sqlconn.go @@ -83,7 +83,7 @@ func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn { } // NewSqlConnFromDB returns a SqlConn with the given sql.DB. -// Use it with caution, it's provided for other ORM to interact with. +// Use it with caution; it's provided for other ORM to interact with. func NewSqlConnFromDB(db *sql.DB, opts ...SqlOption) SqlConn { conn := &commonSqlConn{ connProv: func() (*sql.DB, error) { @@ -120,7 +120,7 @@ func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...any) ( endSpan(span, err) }() - err = db.brk.DoWithAcceptable(func() error { + err = db.brk.DoWithAcceptableCtx(ctx, func() error { var conn *sql.DB conn, err = db.connProv() if err != nil { @@ -148,7 +148,7 @@ func (db *commonSqlConn) PrepareCtx(ctx context.Context, query string) (stmt Stm endSpan(span, err) }() - err = db.brk.DoWithAcceptable(func() error { + err = db.brk.DoWithAcceptableCtx(ctx, func() error { var conn *sql.DB conn, err = db.connProv() if err != nil { @@ -256,7 +256,7 @@ func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Contex endSpan(span, err) }() - err = db.brk.DoWithAcceptable(func() error { + err = db.brk.DoWithAcceptableCtx(ctx, func() error { return transact(ctx, db, db.beginTx, fn) }, db.acceptable) if errors.Is(err, breaker.ErrServiceUnavailable) { @@ -287,7 +287,7 @@ func (db *commonSqlConn) acceptable(err error) bool { func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows) error, q string, args ...any) (err error) { var scanFailed bool - err = db.brk.DoWithAcceptable(func() error { + err = db.brk.DoWithAcceptableCtx(ctx, func() error { conn, err := db.connProv() if err != nil { db.onError(ctx, err) diff --git a/core/stores/sqlx/stmt.go b/core/stores/sqlx/stmt.go index e10a110c..a8de0996 100644 --- a/core/stores/sqlx/stmt.go +++ b/core/stores/sqlx/stmt.go @@ -65,7 +65,7 @@ func (s statement) ExecCtx(ctx context.Context, args ...any) (result sql.Result, endSpan(span, err) }() - err = s.brk.DoWithAcceptable(func() error { + err = s.brk.DoWithAcceptableCtx(ctx, func() error { result, err = execStmt(ctx, s.stmt, s.query, args...) return err }, func(err error) bool { @@ -141,7 +141,7 @@ func (s statement) QueryRowsPartialCtx(ctx context.Context, v any, args ...any) func (s statement) queryRows(ctx context.Context, scanFn func(any, rowsScanner) error, v any, args ...any) error { var scanFailed bool - err := s.brk.DoWithAcceptable(func() error { + err := s.brk.DoWithAcceptableCtx(ctx, func() error { return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error { err := scanFn(v, rows) if isScanFailed(err) { diff --git a/rest/httpc/service.go b/rest/httpc/service.go index d69b69e2..a65880e3 100644 --- a/rest/httpc/service.go +++ b/rest/httpc/service.go @@ -63,7 +63,7 @@ func (s namedService) do(r *http.Request) (resp *http.Response, err error) { } brk := breaker.GetBreaker(s.name) - err = brk.DoWithAcceptable(func() error { + err = brk.DoWithAcceptableCtx(r.Context(), func() error { resp, err = s.cli.Do(r) return err }, func(err error) bool { diff --git a/zrpc/internal/clientinterceptors/breakerinterceptor.go b/zrpc/internal/clientinterceptors/breakerinterceptor.go index c4231760..63c068a6 100644 --- a/zrpc/internal/clientinterceptors/breakerinterceptor.go +++ b/zrpc/internal/clientinterceptors/breakerinterceptor.go @@ -13,7 +13,7 @@ import ( func BreakerInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { breakerName := path.Join(cc.Target(), method) - return breaker.DoWithAcceptable(breakerName, func() error { + return breaker.DoWithAcceptableCtx(ctx, breakerName, func() error { return invoker(ctx, method, req, reply, cc, opts...) }, codes.Acceptable) } diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor.go b/zrpc/internal/serverinterceptors/breakerinterceptor.go index 79d8c68c..7d64561b 100644 --- a/zrpc/internal/serverinterceptors/breakerinterceptor.go +++ b/zrpc/internal/serverinterceptors/breakerinterceptor.go @@ -24,7 +24,7 @@ func StreamBreakerInterceptor(svr any, stream grpc.ServerStream, info *grpc.Stre func UnaryBreakerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { breakerName := info.FullMethod - err = breaker.DoWithAcceptable(breakerName, func() error { + err = breaker.DoWithAcceptableCtx(ctx, breakerName, func() error { var err error resp, err = handler(ctx, req) return err