diff --git a/core/stores/mon/bulkinserter.go b/core/stores/mon/bulkinserter.go new file mode 100644 index 00000000..c3fdfa23 --- /dev/null +++ b/core/stores/mon/bulkinserter.go @@ -0,0 +1,91 @@ +package mon + +import ( + "context" + "time" + + "github.com/zeromicro/go-zero/core/executors" + "github.com/zeromicro/go-zero/core/logx" + "go.mongodb.org/mongo-driver/mongo" +) + +const ( + flushInterval = time.Second + maxBulkRows = 1000 +) + +type ( + // ResultHandler is a handler that used to handle results. + ResultHandler func(*mongo.InsertManyResult, error) + + // A BulkInserter is used to insert bulk of mongo records. + BulkInserter struct { + executor *executors.PeriodicalExecutor + inserter *dbInserter + } +) + +// NewBulkInserter returns a BulkInserter. +func NewBulkInserter(coll *mongo.Collection, interval ...time.Duration) *BulkInserter { + inserter := &dbInserter{ + collection: coll, + } + + duration := flushInterval + if len(interval) > 0 { + duration = interval[0] + } + + return &BulkInserter{ + executor: executors.NewPeriodicalExecutor(duration, inserter), + inserter: inserter, + } +} + +// Flush flushes the inserter, writes all pending records. +func (bi *BulkInserter) Flush() { + bi.executor.Flush() +} + +// Insert inserts doc. +func (bi *BulkInserter) Insert(doc interface{}) { + bi.executor.Add(doc) +} + +// SetResultHandler sets the result handler. +func (bi *BulkInserter) SetResultHandler(handler ResultHandler) { + bi.executor.Sync(func() { + bi.inserter.resultHandler = handler + }) +} + +type dbInserter struct { + collection *mongo.Collection + documents []interface{} + resultHandler ResultHandler +} + +func (in *dbInserter) AddTask(doc interface{}) bool { + in.documents = append(in.documents, doc) + return len(in.documents) >= maxBulkRows +} + +func (in *dbInserter) Execute(objs interface{}) { + docs := objs.([]interface{}) + if len(docs) == 0 { + return + } + + result, err := in.collection.InsertMany(context.Background(), docs) + if in.resultHandler != nil { + in.resultHandler(result, err) + } else if err != nil { + logx.Error(err) + } +} + +func (in *dbInserter) RemoveAll() interface{} { + documents := in.documents + in.documents = nil + return documents +} diff --git a/core/stores/mon/bulkinserter_test.go b/core/stores/mon/bulkinserter_test.go new file mode 100644 index 00000000..0d6aa214 --- /dev/null +++ b/core/stores/mon/bulkinserter_test.go @@ -0,0 +1,27 @@ +package mon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" +) + +func TestBulkInserter(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "ok", Value: 1}}...)) + bulk := NewBulkInserter(mt.Coll) + bulk.SetResultHandler(func(result *mongo.InsertManyResult, err error) { + assert.Nil(t, err) + assert.Equal(t, 2, len(result.InsertedIDs)) + }) + bulk.Insert(bson.D{{Key: "foo", Value: "bar"}}) + bulk.Insert(bson.D{{Key: "foo", Value: "baz"}}) + bulk.Flush() + }) +} diff --git a/core/stores/mon/clientmanager.go b/core/stores/mon/clientmanager.go new file mode 100644 index 00000000..5a6c0278 --- /dev/null +++ b/core/stores/mon/clientmanager.go @@ -0,0 +1,51 @@ +package mon + +import ( + "context" + "io" + "time" + + "github.com/zeromicro/go-zero/core/syncx" + "go.mongodb.org/mongo-driver/mongo" + mopt "go.mongodb.org/mongo-driver/mongo/options" +) + +const defaultTimeout = time.Second + +var clientManager = syncx.NewResourceManager() + +// ClosableClient wraps *mongo.Client and provides a Close method. +type ClosableClient struct { + *mongo.Client +} + +// Close disconnects the underlying *mongo.Client. +func (cs *ClosableClient) Close() error { + return cs.Client.Disconnect(context.Background()) +} + +// Inject injects a *mongo.Client into the client manager. +// Typically, this is used to inject a *mongo.Client for test purpose. +func Inject(key string, client *mongo.Client) { + clientManager.Inject(key, &ClosableClient{client}) +} + +func getClient(url string) (*mongo.Client, error) { + val, err := clientManager.GetResource(url, func() (io.Closer, error) { + cli, err := mongo.Connect(context.Background(), mopt.Client().ApplyURI(url)) + if err != nil { + return nil, err + } + + concurrentSess := &ClosableClient{ + Client: cli, + } + + return concurrentSess, nil + }) + if err != nil { + return nil, err + } + + return val.(*ClosableClient).Client, nil +} diff --git a/core/stores/mon/clientmanager_test.go b/core/stores/mon/clientmanager_test.go new file mode 100644 index 00000000..bc6b70f8 --- /dev/null +++ b/core/stores/mon/clientmanager_test.go @@ -0,0 +1,20 @@ +package mon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" +) + +func TestClientManger_getClient(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + Inject(mtest.ClusterURI(), mt.Client) + cli, err := getClient(mtest.ClusterURI()) + assert.Nil(t, err) + assert.Equal(t, mt.Client, cli) + }) +} diff --git a/core/stores/mon/collection.go b/core/stores/mon/collection.go new file mode 100644 index 00000000..c9e3ed2a --- /dev/null +++ b/core/stores/mon/collection.go @@ -0,0 +1,418 @@ +package mon + +import ( + "context" + "encoding/json" + "time" + + "github.com/zeromicro/go-zero/core/breaker" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/timex" + "go.mongodb.org/mongo-driver/mongo" + mopt "go.mongodb.org/mongo-driver/mongo/options" +) + +const defaultSlowThreshold = time.Millisecond * 500 + +// ErrNotFound is an alias of mongo.ErrNoDocuments +var ErrNotFound = mongo.ErrNoDocuments + +type ( + // Collection defines a MongoDB collection. + Collection interface { + // Aggregate executes an aggregation pipeline. + Aggregate(ctx context.Context, pipeline interface{}, opts ...*mopt.AggregateOptions) ( + *mongo.Cursor, error) + // BulkWrite performs a bulk write operation. + BulkWrite(ctx context.Context, models []mongo.WriteModel, opts ...*mopt.BulkWriteOptions) ( + *mongo.BulkWriteResult, error) + // Clone creates a copy of this collection with the same settings. + Clone(opts ...*mopt.CollectionOptions) (*mongo.Collection, error) + // CountDocuments returns the number of documents in the collection that match the filter. + CountDocuments(ctx context.Context, filter interface{}, opts ...*mopt.CountOptions) (int64, error) + // Database returns the database that this collection is a part of. + Database() *mongo.Database + // DeleteMany deletes documents from the collection that match the filter. + DeleteMany(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) ( + *mongo.DeleteResult, error) + // DeleteOne deletes at most one document from the collection that matches the filter. + DeleteOne(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) ( + *mongo.DeleteResult, error) + // Distinct returns a list of distinct values for the given key across the collection. + Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*mopt.DistinctOptions) ([]interface{}, error) + // Drop drops this collection from database. + Drop(ctx context.Context) error + // EstimatedDocumentCount returns an estimate of the count of documents in a collection + // using collection metadata. + EstimatedDocumentCount(ctx context.Context, opts ...*mopt.EstimatedDocumentCountOptions) (int64, error) + // Find finds the documents matching the provided filter. + Find(ctx context.Context, filter interface{}, opts ...*mopt.FindOptions) (*mongo.Cursor, error) + // FindOne returns up to one document that matches the provided filter. + FindOne(ctx context.Context, filter interface{}, opts ...*mopt.FindOneOptions) ( + *mongo.SingleResult, error) + // FindOneAndDelete returns at most one document that matches the filter. If the filter + // matches multiple documents, only the first document is deleted. + FindOneAndDelete(ctx context.Context, filter interface{}, opts ...*mopt.FindOneAndDeleteOptions) ( + *mongo.SingleResult, error) + // FindOneAndReplace returns at most one document that matches the filter. If the filter + // matches multiple documents, FindOneAndReplace returns the first document in the + // collection that matches the filter. + FindOneAndReplace(ctx context.Context, filter interface{}, replacement interface{}, + opts ...*mopt.FindOneAndReplaceOptions) (*mongo.SingleResult, error) + // FindOneAndUpdate returns at most one document that matches the filter. If the filter + // matches multiple documents, FindOneAndUpdate returns the first document in the + // collection that matches the filter. + FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.FindOneAndUpdateOptions) (*mongo.SingleResult, error) + // Indexes returns the index view for this collection. + Indexes() mongo.IndexView + // InsertMany inserts the provided documents. + InsertMany(ctx context.Context, documents []interface{}, opts ...*mopt.InsertManyOptions) ( + *mongo.InsertManyResult, error) + // InsertOne inserts the provided document. + InsertOne(ctx context.Context, document interface{}, opts ...*mopt.InsertOneOptions) ( + *mongo.InsertOneResult, error) + // ReplaceOne replaces at most one document that matches the filter. + ReplaceOne(ctx context.Context, filter interface{}, replacement interface{}, + opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error) + // UpdateByID updates a single document matching the provided filter. + UpdateByID(ctx context.Context, id interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) + // UpdateMany updates the provided documents. + UpdateMany(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) + // UpdateOne updates a single document matching the provided filter. + UpdateOne(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) + // Watch returns a change stream cursor used to receive notifications of changes to the collection. + Watch(ctx context.Context, pipeline interface{}, opts ...*mopt.ChangeStreamOptions) ( + *mongo.ChangeStream, error) + } + + decoratedCollection struct { + *mongo.Collection + name string + brk breaker.Breaker + } + + keepablePromise struct { + promise breaker.Promise + log func(error) + } +) + +func newCollection(collection *mongo.Collection, brk breaker.Breaker) Collection { + return &decoratedCollection{ + Collection: collection, + name: collection.Name(), + brk: brk, + } +} + +func (c *decoratedCollection) Aggregate(ctx context.Context, pipeline interface{}, + opts ...*mopt.AggregateOptions) (cur *mongo.Cursor, err error) { + err = c.brk.DoWithAcceptable(func() error { + starTime := timex.Now() + defer func() { + c.logDurationSimple("Aggregate", starTime, err) + }() + + cur, err = c.Collection.Aggregate(ctx, pipeline, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) BulkWrite(ctx context.Context, models []mongo.WriteModel, + opts ...*mopt.BulkWriteOptions) (res *mongo.BulkWriteResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("BulkWrite", startTime, err) + }() + + res, err = c.Collection.BulkWrite(ctx, models, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) CountDocuments(ctx context.Context, filter interface{}, + opts ...*mopt.CountOptions) (count int64, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("CountDocuments", startTime, err) + }() + + count, err = c.Collection.CountDocuments(ctx, filter, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) DeleteMany(ctx context.Context, filter interface{}, + opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("DeleteMany", startTime, err) + }() + + res, err = c.Collection.DeleteMany(ctx, filter, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) DeleteOne(ctx context.Context, filter interface{}, + opts ...*mopt.DeleteOptions) (res *mongo.DeleteResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("DeleteOne", startTime, err, filter) + }() + + res, err = c.Collection.DeleteOne(ctx, filter, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) Distinct(ctx context.Context, fieldName string, filter interface{}, + opts ...*mopt.DistinctOptions) (val []interface{}, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("Distinct", startTime, err) + }() + + val, err = c.Collection.Distinct(ctx, fieldName, filter, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) EstimatedDocumentCount(ctx context.Context, + opts ...*mopt.EstimatedDocumentCountOptions) (val int64, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("EstimatedDocumentCount", startTime, err) + }() + + val, err = c.Collection.EstimatedDocumentCount(ctx, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) Find(ctx context.Context, filter interface{}, + opts ...*mopt.FindOptions) (cur *mongo.Cursor, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("Find", startTime, err, filter) + }() + + cur, err = c.Collection.Find(ctx, filter, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) FindOne(ctx context.Context, filter interface{}, + opts ...*mopt.FindOneOptions) (res *mongo.SingleResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("FindOne", startTime, err, filter) + }() + + res = c.Collection.FindOne(ctx, filter, opts...) + err = res.Err() + return err + }, acceptable) + return +} + +func (c *decoratedCollection) FindOneAndDelete(ctx context.Context, filter interface{}, + opts ...*mopt.FindOneAndDeleteOptions) (res *mongo.SingleResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("FindOneAndDelete", startTime, err, filter) + }() + + res = c.Collection.FindOneAndDelete(ctx, filter, opts...) + err = res.Err() + return err + }, acceptable) + return +} + +func (c *decoratedCollection) FindOneAndReplace(ctx context.Context, filter interface{}, + replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) ( + res *mongo.SingleResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("FindOneAndReplace", startTime, err, filter, replacement) + }() + + res = c.Collection.FindOneAndReplace(ctx, filter, replacement, opts...) + err = res.Err() + return err + }, acceptable) + return +} + +func (c *decoratedCollection) FindOneAndUpdate(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.FindOneAndUpdateOptions) (res *mongo.SingleResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("FindOneAndUpdate", startTime, err, filter, update) + }() + + res = c.Collection.FindOneAndUpdate(ctx, filter, update, opts...) + err = res.Err() + return err + }, acceptable) + return +} + +func (c *decoratedCollection) InsertMany(ctx context.Context, documents []interface{}, + opts ...*mopt.InsertManyOptions) (res *mongo.InsertManyResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("InsertMany", startTime, err) + }() + + res, err = c.Collection.InsertMany(ctx, documents, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) InsertOne(ctx context.Context, document interface{}, + opts ...*mopt.InsertOneOptions) (res *mongo.InsertOneResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("InsertOne", startTime, err, document) + }() + + res, err = c.Collection.InsertOne(ctx, document, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) ReplaceOne(ctx context.Context, filter interface{}, replacement interface{}, + opts ...*mopt.ReplaceOptions) (res *mongo.UpdateResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("ReplaceOne", startTime, err, filter, replacement) + }() + + res, err = c.Collection.ReplaceOne(ctx, filter, replacement, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) UpdateByID(ctx context.Context, id interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("UpdateByID", startTime, err, id, update) + }() + + res, err = c.Collection.UpdateByID(ctx, id, update, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) UpdateMany(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDurationSimple("UpdateMany", startTime, err) + }() + + res, err = c.Collection.UpdateMany(ctx, filter, update, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) UpdateOne(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (res *mongo.UpdateResult, err error) { + err = c.brk.DoWithAcceptable(func() error { + startTime := timex.Now() + defer func() { + c.logDuration("UpdateOne", startTime, err, filter, update) + }() + + res, err = c.Collection.UpdateOne(ctx, filter, update, opts...) + return err + }, acceptable) + return +} + +func (c *decoratedCollection) logDuration(method string, startTime time.Duration, err error, + docs ...interface{}) { + duration := timex.Since(startTime) + content, e := json.Marshal(docs) + if e != nil { + logx.Error(err) + } else if err != nil { + if duration > slowThreshold.Load() { + logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s", + c.name, method, err.Error(), string(content)) + } else { + logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s", + c.name, method, err.Error(), string(content)) + } + } else { + if duration > slowThreshold.Load() { + logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s", + c.name, method, string(content)) + } else { + logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content)) + } + } +} + +func (c *decoratedCollection) logDurationSimple(method string, startTime time.Duration, err error) { + logDuration(c.name, method, startTime, err) +} + +func (p keepablePromise) accept(err error) error { + p.promise.Accept() + p.log(err) + return err +} + +func (p keepablePromise) keep(err error) error { + if acceptable(err) { + p.promise.Accept() + } else { + p.promise.Reject(err.Error()) + } + + p.log(err) + return err +} + +func acceptable(err error) bool { + return err == nil || err == mongo.ErrNoDocuments || err == mongo.ErrNilValue || + err == mongo.ErrNilDocument || err == mongo.ErrNilCursor || err == mongo.ErrEmptySlice +} diff --git a/core/stores/mon/collection_test.go b/core/stores/mon/collection_test.go new file mode 100644 index 00000000..2a3f62ae --- /dev/null +++ b/core/stores/mon/collection_test.go @@ -0,0 +1,608 @@ +package mon + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/breaker" + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/stringx" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" + mopt "go.mongodb.org/mongo-driver/mongo/options" +) + +var errDummy = errors.New("dummy") + +func init() { + logx.Disable() +} + +func TestKeepPromise_accept(t *testing.T) { + p := new(mockPromise) + kp := keepablePromise{ + promise: p, + log: func(error) {}, + } + assert.Nil(t, kp.accept(nil)) + assert.Equal(t, ErrNotFound, kp.accept(ErrNotFound)) +} + +func TestKeepPromise_keep(t *testing.T) { + tests := []struct { + err error + accepted bool + reason string + }{ + { + err: nil, + accepted: true, + reason: "", + }, + { + err: ErrNotFound, + accepted: true, + reason: "", + }, + { + err: errors.New("any"), + accepted: false, + reason: "any", + }, + } + + for _, test := range tests { + t.Run(stringx.RandId(), func(t *testing.T) { + p := new(mockPromise) + kp := keepablePromise{ + promise: p, + log: func(error) {}, + } + assert.Equal(t, test.err, kp.keep(test.err)) + assert.Equal(t, test.accepted, p.accepted) + assert.Equal(t, test.reason, p.reason) + }) + } +} + +func TestNewCollection(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + mt.Run("test", func(mt *mtest.T) { + coll := mt.Coll + assert.NotNil(t, coll) + col := newCollection(coll, breaker.GetBreaker("localhost")) + assert.Equal(t, t.Name()+"/test", col.(*decoratedCollection).name) + }) +} + +func TestCollection_Aggregate(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + mt.Run("test", func(mt *mtest.T) { + coll := mt.Coll + assert.NotNil(t, coll) + col := newCollection(coll, breaker.GetBreaker("localhost")) + ns := mt.Coll.Database().Name() + "." + mt.Coll.Name() + aggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch) + mt.AddMockResponses(aggRes) + assert.Equal(t, t.Name()+"/test", col.(*decoratedCollection).name) + cursor, err := col.Aggregate(context.Background(), mongo.Pipeline{}, mopt.Aggregate()) + assert.Nil(t, err) + cursor.Close(context.Background()) + }) +} + +func TestCollection_BulkWrite(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "ok", Value: 1}}...)) + res, err := c.BulkWrite(context.Background(), []mongo.WriteModel{ + mongo.NewInsertOneModel().SetDocument(bson.D{{Key: "foo", Value: 1}})}, + ) + assert.Nil(t, err) + assert.NotNil(t, res) + c.brk = new(dropBreaker) + _, err = c.BulkWrite(context.Background(), []mongo.WriteModel{ + mongo.NewInsertOneModel().SetDocument(bson.D{{Key: "foo", Value: 1}})}, + ) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_CountDocuments(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "n", Value: 1}, + })) + res, err := c.CountDocuments(context.Background(), bson.D{}) + assert.Nil(t, err) + assert.Equal(t, int64(1), res) + + c.brk = new(dropBreaker) + _, err = c.CountDocuments(context.Background(), bson.D{{Key: "foo", Value: 1}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestDecoratedCollection_DeleteMany(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + res, err := c.DeleteMany(context.Background(), bson.D{}) + assert.Nil(t, err) + assert.Equal(t, int64(1), res.DeletedCount) + + c.brk = new(dropBreaker) + _, err = c.DeleteMany(context.Background(), bson.D{{Key: "foo", Value: 1}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_Distinct(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(bson.D{{Key: "ok", Value: 1}, {Key: "values", Value: []int{1}}}) + resp, err := c.Distinct(context.Background(), "foo", bson.D{}) + assert.Nil(t, err) + assert.Equal(t, 1, len(resp)) + + c.brk = new(dropBreaker) + _, err = c.Distinct(context.Background(), "foo", bson.D{{Key: "foo", Value: 1}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_EstimatedDocumentCount(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(bson.D{{Key: "ok", Value: 1}, {Key: "n", Value: 1}}) + res, err := c.EstimatedDocumentCount(context.Background()) + assert.Nil(t, err) + assert.Equal(t, int64(1), res) + + c.brk = new(dropBreaker) + _, err = c.EstimatedDocumentCount(context.Background()) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollectionFind(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + find := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "name", Value: "John"}, + }) + getMore := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.NextBatch, + bson.D{ + {Key: "name", Value: "Mary"}, + }) + killCursors := mtest.CreateCursorResponse( + 0, + "DBName.CollectionName", + mtest.NextBatch) + mt.AddMockResponses(find, getMore, killCursors) + filter := bson.D{{Key: "x", Value: 1}} + cursor, err := c.Find(context.Background(), filter, mopt.Find()) + assert.Nil(t, err) + defer cursor.Close(context.Background()) + + var val []struct { + ID primitive.ObjectID `bson:"_id"` + Name string `bson:"name"` + } + assert.Nil(t, cursor.All(context.Background(), &val)) + assert.Equal(t, 2, len(val)) + assert.Equal(t, "John", val[0].Name) + assert.Equal(t, "Mary", val[1].Name) + + c.brk = new(dropBreaker) + _, err = c.Find(context.Background(), filter, mopt.Find()) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollectionFindOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + find := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "name", Value: "John"}, + }) + getMore := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.NextBatch, + bson.D{ + {Key: "name", Value: "Mary"}, + }) + killCursors := mtest.CreateCursorResponse( + 0, + "DBName.CollectionName", + mtest.NextBatch) + mt.AddMockResponses(find, getMore, killCursors) + filter := bson.D{{Key: "x", Value: 1}} + resp, err := c.FindOne(context.Background(), filter) + assert.Nil(t, err) + var val struct { + ID primitive.ObjectID `bson:"_id"` + Name string `bson:"name"` + } + assert.Nil(t, resp.Decode(&val)) + assert.Equal(t, "John", val.Name) + + c.brk = new(dropBreaker) + _, err = c.FindOne(context.Background(), filter) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_FindOneAndDelete(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + filter := bson.D{} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{}...)) + _, err := c.FindOneAndDelete(context.Background(), filter, mopt.FindOneAndDelete()) + assert.Equal(t, mongo.ErrNoDocuments, err) + + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "name", Value: "John"}}}, + }...)) + resp, err := c.FindOneAndDelete(context.Background(), filter, mopt.FindOneAndDelete()) + assert.Nil(t, err) + var val struct { + Name string `bson:"name"` + } + assert.Nil(t, resp.Decode(&val)) + assert.Equal(t, "John", val.Name) + + c.brk = new(dropBreaker) + _, err = c.FindOneAndDelete(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_FindOneAndReplace(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{}...)) + filter := bson.D{{Key: "x", Value: 1}} + replacement := bson.D{{Key: "x", Value: 2}} + opts := mopt.FindOneAndReplace().SetUpsert(true) + _, err := c.FindOneAndReplace(context.Background(), filter, replacement, opts) + assert.Equal(t, mongo.ErrNoDocuments, err) + mt.AddMockResponses(bson.D{{Key: "ok", Value: 1}, {Key: "value", Value: bson.D{ + {Key: "name", Value: "John"}, + }}}) + resp, err := c.FindOneAndReplace(context.Background(), filter, replacement, opts) + assert.Nil(t, err) + var val struct { + Name string `bson:"name"` + } + assert.Nil(t, resp.Decode(&val)) + assert.Equal(t, "John", val.Name) + + c.brk = new(dropBreaker) + _, err = c.FindOneAndReplace(context.Background(), filter, replacement, opts) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_FindOneAndUpdate(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(bson.D{{Key: "ok", Value: 1}}) + filter := bson.D{{Key: "x", Value: 1}} + update := bson.D{{Key: "$x", Value: 2}} + opts := mopt.FindOneAndUpdate().SetUpsert(true) + _, err := c.FindOneAndUpdate(context.Background(), filter, update, opts) + assert.Equal(t, mongo.ErrNoDocuments, err) + + mt.AddMockResponses(bson.D{{Key: "ok", Value: 1}, {Key: "value", Value: bson.D{ + {Key: "name", Value: "John"}, + }}}) + resp, err := c.FindOneAndUpdate(context.Background(), filter, update, opts) + assert.Nil(t, err) + var val struct { + Name string `bson:"name"` + } + assert.Nil(t, resp.Decode(&val)) + assert.Equal(t, "John", val.Name) + + c.brk = new(dropBreaker) + _, err = c.FindOneAndUpdate(context.Background(), filter, update, opts) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_InsertOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "ok", Value: 1}}...)) + res, err := c.InsertOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Nil(t, err) + assert.NotNil(t, res) + + c.brk = new(dropBreaker) + _, err = c.InsertOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_InsertMany(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "ok", Value: 1}}...)) + res, err := c.InsertMany(context.Background(), []interface{}{ + bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "foo", Value: "baz"}}, + }) + assert.Nil(t, err) + assert.NotNil(t, res) + assert.Equal(t, 2, len(res.InsertedIDs)) + + c.brk = new(dropBreaker) + _, err = c.InsertMany(context.Background(), []interface{}{bson.D{{Key: "foo", Value: "bar"}}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_Remove(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + res, err := c.DeleteOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), res.DeletedCount) + + c.brk = new(dropBreaker) + _, err = c.DeleteOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollectionRemoveAll(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + res, err := c.DeleteMany(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), res.DeletedCount) + + c.brk = new(dropBreaker) + _, err = c.DeleteMany(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_ReplaceOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + res, err := c.ReplaceOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "foo", Value: "baz"}}, + ) + assert.Nil(t, err) + assert.Equal(t, int64(1), res.MatchedCount) + + c.brk = new(dropBreaker) + _, err = c.ReplaceOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "foo", Value: "baz"}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_UpdateOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + resp, err := c.UpdateOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "$set", Value: bson.D{{Key: "baz", Value: "qux"}}}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), resp.MatchedCount) + + c.brk = new(dropBreaker) + _, err = c.UpdateOne(context.Background(), bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "$set", Value: bson.D{{Key: "baz", Value: "qux"}}}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_UpdateByID(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + resp, err := c.UpdateByID(context.Background(), primitive.NewObjectID(), + bson.D{{Key: "$set", Value: bson.D{{Key: "baz", Value: "qux"}}}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), resp.MatchedCount) + + c.brk = new(dropBreaker) + _, err = c.UpdateByID(context.Background(), primitive.NewObjectID(), + bson.D{{Key: "$set", Value: bson.D{{Key: "baz", Value: "qux"}}}}) + assert.Equal(t, errDummy, err) + }) +} + +func TestCollection_UpdateMany(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + c := decoratedCollection{ + Collection: mt.Coll, + brk: breaker.NewBreaker(), + } + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + resp, err := c.UpdateMany(context.Background(), bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "$set", Value: bson.D{{Key: "baz", Value: "qux"}}}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), resp.MatchedCount) + + c.brk = new(dropBreaker) + _, err = c.UpdateMany(context.Background(), bson.D{{Key: "foo", Value: "bar"}}, + bson.D{{Key: "$set", Value: bson.D{{Key: "baz", Value: "qux"}}}}) + assert.Equal(t, errDummy, err) + }) +} + +type mockPromise struct { + accepted bool + reason string +} + +func (p *mockPromise) Accept() { + p.accepted = true +} + +func (p *mockPromise) Reject(reason string) { + p.reason = reason +} + +type dropBreaker struct{} + +func (d *dropBreaker) Name() string { + return "dummy" +} + +func (d *dropBreaker) Allow() (breaker.Promise, error) { + return nil, errDummy +} + +func (d *dropBreaker) Do(req func() error) error { + return nil +} + +func (d *dropBreaker) DoWithAcceptable(req func() error, acceptable breaker.Acceptable) error { + return errDummy +} + +func (d *dropBreaker) DoWithFallback(req func() error, fallback func(err error) error) error { + return nil +} + +func (d *dropBreaker) DoWithFallbackAcceptable(_ func() error, _ func(err error) error, + _ breaker.Acceptable) error { + return nil +} diff --git a/core/stores/mon/model.go b/core/stores/mon/model.go new file mode 100644 index 00000000..7d07bbcd --- /dev/null +++ b/core/stores/mon/model.go @@ -0,0 +1,154 @@ +package mon + +import ( + "context" + "log" + "strings" + + "github.com/zeromicro/go-zero/core/breaker" + "github.com/zeromicro/go-zero/core/timex" + "go.mongodb.org/mongo-driver/mongo" + mopt "go.mongodb.org/mongo-driver/mongo/options" +) + +// Model is a mongodb store model that represents a collection. +type Model struct { + Collection + name string + cli *mongo.Client + brk breaker.Breaker + opts []Option +} + +// MustNewModel returns a Model, exits on errors. +func MustNewModel(uri, db, collection string, opts ...Option) *Model { + model, err := NewModel(uri, db, collection, opts...) + if err != nil { + log.Fatal(err) + } + + return model +} + +// NewModel returns a Model. +func NewModel(uri, db, collection string, opts ...Option) (*Model, error) { + cli, err := getClient(uri) + if err != nil { + return nil, err + } + + name := strings.Join([]string{uri, collection}, "/") + brk := breaker.GetBreaker(uri) + coll := newCollection(cli.Database(db).Collection(collection), brk) + return newModel(name, cli, coll, brk, opts...), nil +} + +func newModel(name string, cli *mongo.Client, coll Collection, brk breaker.Breaker, + opts ...Option) *Model { + return &Model{ + name: name, + Collection: coll, + cli: cli, + brk: brk, + opts: opts, + } +} + +// StartSession starts a new session. +func (m *Model) StartSession(opts ...*mopt.SessionOptions) (sess mongo.Session, err error) { + err = m.brk.DoWithAcceptable(func() error { + starTime := timex.Now() + defer func() { + logDuration(m.name, "StartSession", starTime, err) + }() + + sess, err = m.cli.StartSession(opts...) + return err + }, acceptable) + return +} + +// Aggregate executes an aggregation pipeline. +func (m *Model) Aggregate(ctx context.Context, v, pipeline interface{}, opts ...*mopt.AggregateOptions) error { + cur, err := m.Collection.Aggregate(ctx, pipeline, opts...) + if err != nil { + return err + } + defer cur.Close(ctx) + + return cur.All(ctx, v) +} + +// DeleteMany deletes documents that match the filter. +func (m *Model) DeleteMany(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) (int64, error) { + res, err := m.Collection.DeleteMany(ctx, filter, opts...) + if err != nil { + return 0, err + } + + return res.DeletedCount, nil +} + +// DeleteOne deletes the first document that matches the filter. +func (m *Model) DeleteOne(ctx context.Context, filter interface{}, opts ...*mopt.DeleteOptions) (int64, error) { + res, err := m.Collection.DeleteOne(ctx, filter, opts...) + if err != nil { + return 0, err + } + + return res.DeletedCount, nil +} + +// Find finds documents that match the filter. +func (m *Model) Find(ctx context.Context, v, filter interface{}, opts ...*mopt.FindOptions) error { + cur, err := m.Collection.Find(ctx, filter, opts...) + if err != nil { + return err + } + defer cur.Close(ctx) + + return cur.All(ctx, v) +} + +// FindOne finds the first document that matches the filter. +func (m *Model) FindOne(ctx context.Context, v, filter interface{}, opts ...*mopt.FindOneOptions) error { + res, err := m.Collection.FindOne(ctx, filter, opts...) + if err != nil { + return err + } + + return res.Decode(v) +} + +// FindOneAndDelete finds a single document and deletes it. +func (m *Model) FindOneAndDelete(ctx context.Context, v, filter interface{}, + opts ...*mopt.FindOneAndDeleteOptions) error { + res, err := m.Collection.FindOneAndDelete(ctx, filter, opts...) + if err != nil { + return err + } + + return res.Decode(v) +} + +// FindOneAndReplace finds a single document and replaces it. +func (m *Model) FindOneAndReplace(ctx context.Context, v, filter interface{}, replacement interface{}, + opts ...*mopt.FindOneAndReplaceOptions) error { + res, err := m.Collection.FindOneAndReplace(ctx, filter, replacement, opts...) + if err != nil { + return err + } + + return res.Decode(v) +} + +// FindOneAndUpdate finds a single document and updates it. +func (m *Model) FindOneAndUpdate(ctx context.Context, v, filter interface{}, update interface{}, + opts ...*mopt.FindOneAndUpdateOptions) error { + res, err := m.Collection.FindOneAndUpdate(ctx, filter, update, opts...) + if err != nil { + return err + } + + return res.Decode(v) +} diff --git a/core/stores/mon/model_test.go b/core/stores/mon/model_test.go new file mode 100644 index 00000000..7adb0a0e --- /dev/null +++ b/core/stores/mon/model_test.go @@ -0,0 +1,233 @@ +package mon + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" +) + +func TestModel_StartSession(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + sess, err := m.StartSession() + assert.Nil(t, err) + sess.EndSession(context.Background()) + }) +} + +func TestModel_Aggregate(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + find := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "name", Value: "John"}, + }) + getMore := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.NextBatch, + bson.D{ + {Key: "name", Value: "Mary"}, + }) + killCursors := mtest.CreateCursorResponse( + 0, + "DBName.CollectionName", + mtest.NextBatch) + mt.AddMockResponses(find, getMore, killCursors) + var result []interface{} + err := m.Aggregate(context.Background(), &result, mongo.Pipeline{}) + assert.Nil(t, err) + assert.Equal(t, 2, len(result)) + assert.Equal(t, "John", result[0].(bson.D).Map()["name"]) + assert.Equal(t, "Mary", result[1].(bson.D).Map()["name"]) + + triggerBreaker(m) + assert.Equal(t, errDummy, m.Aggregate(context.Background(), &result, mongo.Pipeline{})) + }) +} + +func TestModel_DeleteMany(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + val, err := m.DeleteMany(context.Background(), bson.D{}) + assert.Nil(t, err) + assert.Equal(t, int64(1), val) + + triggerBreaker(m) + _, err = m.DeleteMany(context.Background(), bson.D{}) + assert.Equal(t, errDummy, err) + }) +} + +func TestModel_DeleteOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + val, err := m.DeleteOne(context.Background(), bson.D{}) + assert.Nil(t, err) + assert.Equal(t, int64(1), val) + + triggerBreaker(m) + _, err = m.DeleteOne(context.Background(), bson.D{}) + assert.Equal(t, errDummy, err) + }) +} + +func TestModel_Find(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + find := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "name", Value: "John"}, + }) + getMore := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.NextBatch, + bson.D{ + {Key: "name", Value: "Mary"}, + }) + killCursors := mtest.CreateCursorResponse( + 0, + "DBName.CollectionName", + mtest.NextBatch) + mt.AddMockResponses(find, getMore, killCursors) + var result []interface{} + err := m.Find(context.Background(), &result, bson.D{}) + assert.Nil(t, err) + assert.Equal(t, 2, len(result)) + assert.Equal(t, "John", result[0].(bson.D).Map()["name"]) + assert.Equal(t, "Mary", result[1].(bson.D).Map()["name"]) + + triggerBreaker(m) + assert.Equal(t, errDummy, m.Find(context.Background(), &result, bson.D{})) + }) +} + +func TestModel_FindOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + find := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "name", Value: "John"}, + }) + killCursors := mtest.CreateCursorResponse( + 0, + "DBName.CollectionName", + mtest.NextBatch) + mt.AddMockResponses(find, killCursors) + var result bson.D + err := m.FindOne(context.Background(), &result, bson.D{}) + assert.Nil(t, err) + assert.Equal(t, "John", result.Map()["name"]) + + triggerBreaker(m) + assert.Equal(t, errDummy, m.FindOne(context.Background(), &result, bson.D{})) + }) +} + +func TestModel_FindOneAndDelete(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "name", Value: "John"}}}, + }...)) + var result bson.D + err := m.FindOneAndDelete(context.Background(), &result, bson.D{}) + assert.Nil(t, err) + assert.Equal(t, "John", result.Map()["name"]) + + triggerBreaker(m) + assert.Equal(t, errDummy, m.FindOneAndDelete(context.Background(), &result, bson.D{})) + }) +} + +func TestModel_FindOneAndReplace(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "name", Value: "John"}}}, + }...)) + var result bson.D + err := m.FindOneAndReplace(context.Background(), &result, bson.D{}, bson.D{ + {Key: "name", Value: "Mary"}, + }) + assert.Nil(t, err) + assert.Equal(t, "John", result.Map()["name"]) + + triggerBreaker(m) + assert.Equal(t, errDummy, m.FindOneAndReplace(context.Background(), &result, bson.D{}, bson.D{ + {Key: "name", Value: "Mary"}, + })) + }) +} + +func TestModel_FindOneAndUpdate(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(mt) + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "name", Value: "John"}}}, + }...)) + var result bson.D + err := m.FindOneAndUpdate(context.Background(), &result, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "name", Value: "Mary"}}}, + }) + assert.Nil(t, err) + assert.Equal(t, "John", result.Map()["name"]) + + triggerBreaker(m) + assert.Equal(t, errDummy, m.FindOneAndUpdate(context.Background(), &result, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "name", Value: "Mary"}}}, + })) + }) +} + +func createModel(mt *mtest.T) *Model { + Inject(mt.Name(), mt.Client) + return MustNewModel(mt.Name(), mt.DB.Name(), mt.Coll.Name()) +} + +func triggerBreaker(m *Model) { + m.Collection.(*decoratedCollection).brk = new(dropBreaker) +} diff --git a/core/stores/mon/options.go b/core/stores/mon/options.go new file mode 100644 index 00000000..31f9ecdb --- /dev/null +++ b/core/stores/mon/options.go @@ -0,0 +1,29 @@ +package mon + +import ( + "time" + + "github.com/zeromicro/go-zero/core/syncx" +) + +var slowThreshold = syncx.ForAtomicDuration(defaultSlowThreshold) + +type ( + options struct { + timeout time.Duration + } + + // Option defines the method to customize a mongo model. + Option func(opts *options) +) + +// SetSlowThreshold sets the slow threshold. +func SetSlowThreshold(threshold time.Duration) { + slowThreshold.Set(threshold) +} + +func defaultOptions() *options { + return &options{ + timeout: defaultTimeout, + } +} diff --git a/core/stores/mon/options_test.go b/core/stores/mon/options_test.go new file mode 100644 index 00000000..4cefad2a --- /dev/null +++ b/core/stores/mon/options_test.go @@ -0,0 +1,18 @@ +package mon + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSetSlowThreshold(t *testing.T) { + assert.Equal(t, defaultSlowThreshold, slowThreshold.Load()) + SetSlowThreshold(time.Second) + assert.Equal(t, time.Second, slowThreshold.Load()) +} + +func TestDefaultOptions(t *testing.T) { + assert.Equal(t, defaultTimeout, defaultOptions().timeout) +} diff --git a/core/stores/mon/util.go b/core/stores/mon/util.go new file mode 100644 index 00000000..619f872b --- /dev/null +++ b/core/stores/mon/util.go @@ -0,0 +1,25 @@ +package mon + +import ( + "strings" + "time" + + "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/timex" +) + +const mongoAddrSep = "," + +// FormatAddr formats mongo hosts to a string. +func FormatAddr(hosts []string) string { + return strings.Join(hosts, mongoAddrSep) +} + +func logDuration(name, method string, startTime time.Duration, err error) { + duration := timex.Since(startTime) + if err != nil { + logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s)", name, method, err.Error()) + } else { + logx.WithDuration(duration).Infof("mongo(%s) - %s - ok", name, method) + } +} diff --git a/core/stores/mon/util_test.go b/core/stores/mon/util_test.go new file mode 100644 index 00000000..559ad919 --- /dev/null +++ b/core/stores/mon/util_test.go @@ -0,0 +1,35 @@ +package mon + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFormatAddrs(t *testing.T) { + tests := []struct { + addrs []string + expect string + }{ + { + addrs: []string{"a", "b"}, + expect: "a,b", + }, + { + addrs: []string{"a", "b", "c"}, + expect: "a,b,c", + }, + { + addrs: []string{}, + expect: "", + }, + { + addrs: nil, + expect: "", + }, + } + + for _, test := range tests { + assert.Equal(t, test.expect, FormatAddr(test.addrs)) + } +} diff --git a/core/stores/monc/cachedmodel.go b/core/stores/monc/cachedmodel.go new file mode 100644 index 00000000..93d27694 --- /dev/null +++ b/core/stores/monc/cachedmodel.go @@ -0,0 +1,281 @@ +package monc + +import ( + "context" + "log" + + "github.com/zeromicro/go-zero/core/stores/cache" + "github.com/zeromicro/go-zero/core/stores/mon" + "github.com/zeromicro/go-zero/core/stores/redis" + "github.com/zeromicro/go-zero/core/syncx" + "go.mongodb.org/mongo-driver/mongo" + mopt "go.mongodb.org/mongo-driver/mongo/options" +) + +var ( + // ErrNotFound is an alias of mongo.ErrNoDocuments. + ErrNotFound = mongo.ErrNoDocuments + + // can't use one SingleFlight per conn, because multiple conns may share the same cache key. + singleFlight = syncx.NewSingleFlight() + stats = cache.NewStat("monc") +) + +// A Model is a mongo model that built with cache capability. +type Model struct { + *mon.Model + cache cache.Cache +} + +// MustNewModel returns a Model with a cache cluster, exists on errors. +func MustNewModel(uri, db, collection string, c cache.CacheConf, opts ...cache.Option) *Model { + model, err := NewModel(uri, db, collection, c, opts...) + if err != nil { + log.Fatal(err) + } + + return model +} + +// MustNewNodeModel returns a Model with a cache node, exists on errors. +func MustNewNodeModel(uri, db, collection string, rds *redis.Redis, opts ...cache.Option) *Model { + model, err := NewNodeModel(uri, db, collection, rds, opts...) + if err != nil { + log.Fatal(err) + } + + return model +} + +// NewModel returns a Model with a cache cluster. +func NewModel(uri, db, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) { + c := cache.New(conf, singleFlight, stats, mongo.ErrNoDocuments, opts...) + return NewModelWithCache(uri, db, collection, c) +} + +// NewModelWithCache returns a Model with a custom cache. +func NewModelWithCache(uri, db, collection string, c cache.Cache) (*Model, error) { + return newModel(uri, db, collection, c) +} + +// NewNodeModel returns a Model with a cache node. +func NewNodeModel(uri, db, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) { + c := cache.NewNode(rds, singleFlight, stats, mongo.ErrNoDocuments, opts...) + return NewModelWithCache(uri, db, collection, c) +} + +// newModel returns a Model with the given cache. +func newModel(uri, db, collection string, c cache.Cache) (*Model, error) { + model, err := mon.NewModel(uri, db, collection) + if err != nil { + return nil, err + } + + return &Model{ + Model: model, + cache: c, + }, nil +} + +// DelCache deletes the cache with given keys. +func (mm *Model) DelCache(ctx context.Context, keys ...string) error { + return mm.cache.DelCtx(ctx, keys...) +} + +// DeleteOne deletes the document with given filter, and remove it from cache. +func (mm *Model) DeleteOne(ctx context.Context, key string, filter interface{}, + opts ...*mopt.DeleteOptions) (int64, error) { + val, err := mm.Model.DeleteOne(ctx, filter, opts...) + if err != nil { + return 0, err + } + + if err := mm.DelCache(ctx, key); err != nil { + return 0, err + } + + return val, nil +} + +// DeleteOneNoCache deletes the document with given filter. +func (mm *Model) DeleteOneNoCache(ctx context.Context, filter interface{}, + opts ...*mopt.DeleteOptions) (int64, error) { + return mm.Model.DeleteOne(ctx, filter, opts...) +} + +// FindOne unmarshals a record into v with given key and query. +func (mm *Model) FindOne(ctx context.Context, key string, v, filter interface{}, + opts ...*mopt.FindOneOptions) error { + return mm.cache.TakeCtx(ctx, v, key, func(v interface{}) error { + return mm.Model.FindOne(ctx, v, filter, opts...) + }) +} + +// FindOneNoCache unmarshals a record into v with query, without cache. +func (mm *Model) FindOneNoCache(ctx context.Context, v, filter interface{}, + opts ...*mopt.FindOneOptions) error { + return mm.Model.FindOne(ctx, v, filter, opts...) +} + +// FindOneAndDelete deletes the document with given filter, and unmarshals it into v. +func (mm *Model) FindOneAndDelete(ctx context.Context, key string, v, filter interface{}, + opts ...*mopt.FindOneAndDeleteOptions) error { + if err := mm.Model.FindOneAndDelete(ctx, v, filter, opts...); err != nil { + return err + } + + return mm.DelCache(ctx, key) +} + +// FindOneAndDeleteNoCache deletes the document with given filter, and unmarshals it into v. +func (mm *Model) FindOneAndDeleteNoCache(ctx context.Context, v, filter interface{}, + opts ...*mopt.FindOneAndDeleteOptions) error { + return mm.Model.FindOneAndDelete(ctx, v, filter, opts...) +} + +// FindOneAndReplace replaces the document with given filter with replacement, and unmarshals it into v. +func (mm *Model) FindOneAndReplace(ctx context.Context, key string, v, filter interface{}, + replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) error { + if err := mm.Model.FindOneAndReplace(ctx, v, filter, replacement, opts...); err != nil { + return err + } + + return mm.DelCache(ctx, key) +} + +// FindOneAndReplaceNoCache replaces the document with given filter with replacement, and unmarshals it into v. +func (mm *Model) FindOneAndReplaceNoCache(ctx context.Context, v, filter interface{}, + replacement interface{}, opts ...*mopt.FindOneAndReplaceOptions) error { + return mm.Model.FindOneAndReplace(ctx, v, filter, replacement, opts...) +} + +// FindOneAndUpdate updates the document with given filter with update, and unmarshals it into v. +func (mm *Model) FindOneAndUpdate(ctx context.Context, key string, v, filter interface{}, + update interface{}, opts ...*mopt.FindOneAndUpdateOptions) error { + if err := mm.Model.FindOneAndUpdate(ctx, v, filter, update, opts...); err != nil { + return err + } + + return mm.DelCache(ctx, key) +} + +// FindOneAndUpdateNoCache updates the document with given filter with update, and unmarshals it into v. +func (mm *Model) FindOneAndUpdateNoCache(ctx context.Context, v, filter interface{}, + update interface{}, opts ...*mopt.FindOneAndUpdateOptions) error { + return mm.Model.FindOneAndUpdate(ctx, v, filter, update, opts...) +} + +// GetCache unmarshal the cache into v with given key. +func (mm *Model) GetCache(key string, v interface{}) error { + return mm.cache.Get(key, v) +} + +// InsertOne inserts a single document into the collection, and remove the cache placeholder. +func (mm *Model) InsertOne(ctx context.Context, key string, document interface{}, + opts ...*mopt.InsertOneOptions) (*mongo.InsertOneResult, error) { + res, err := mm.Model.InsertOne(ctx, document, opts...) + if err != nil { + return nil, err + } + + if err = mm.DelCache(ctx, key); err != nil { + return nil, err + } + + return res, nil +} + +// InsertOneNoCache inserts a single document into the collection. +func (mm *Model) InsertOneNoCache(ctx context.Context, document interface{}, + opts ...*mopt.InsertOneOptions) (*mongo.InsertOneResult, error) { + return mm.Model.InsertOne(ctx, document, opts...) +} + +// ReplaceOne replaces a single document in the collection, and remove the cache. +func (mm *Model) ReplaceOne(ctx context.Context, key string, filter interface{}, replacement interface{}, + opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error) { + res, err := mm.Model.ReplaceOne(ctx, filter, replacement, opts...) + if err != nil { + return nil, err + } + + if err = mm.DelCache(ctx, key); err != nil { + return nil, err + } + + return res, nil +} + +// ReplaceOneNoCache replaces a single document in the collection. +func (mm *Model) ReplaceOneNoCache(ctx context.Context, filter interface{}, replacement interface{}, + opts ...*mopt.ReplaceOptions) (*mongo.UpdateResult, error) { + return mm.Model.ReplaceOne(ctx, filter, replacement, opts...) +} + +// SetCache sets the cache with given key and value. +func (mm *Model) SetCache(key string, v interface{}) error { + return mm.cache.Set(key, v) +} + +// UpdateByID updates the document with given id with update, and remove the cache. +func (mm *Model) UpdateByID(ctx context.Context, key string, id interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) { + res, err := mm.Model.UpdateByID(ctx, id, update, opts...) + if err != nil { + return nil, err + } + + if err = mm.DelCache(ctx, key); err != nil { + return nil, err + } + + return res, nil +} + +// UpdateByIDNoCache updates the document with given id with update. +func (mm *Model) UpdateByIDNoCache(ctx context.Context, id interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) { + return mm.Model.UpdateByID(ctx, id, update, opts...) +} + +// UpdateMany updates the documents that match filter with update, and remove the cache. +func (mm *Model) UpdateMany(ctx context.Context, keys []string, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) { + res, err := mm.Model.UpdateMany(ctx, filter, update, opts...) + if err != nil { + return nil, err + } + + if err = mm.DelCache(ctx, keys...); err != nil { + return nil, err + } + + return res, nil +} + +// UpdateManyNoCache updates the documents that match filter with update. +func (mm *Model) UpdateManyNoCache(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) { + return mm.Model.UpdateMany(ctx, filter, update, opts...) +} + +// UpdateOne updates the first document that matches filter with update, and remove the cache. +func (mm *Model) UpdateOne(ctx context.Context, key string, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) { + res, err := mm.Model.UpdateOne(ctx, filter, update, opts...) + if err != nil { + return nil, err + } + + if err = mm.DelCache(ctx, key); err != nil { + return nil, err + } + + return res, nil +} + +// UpdateOneNoCache updates the first document that matches filter with update. +func (mm *Model) UpdateOneNoCache(ctx context.Context, filter interface{}, update interface{}, + opts ...*mopt.UpdateOptions) (*mongo.UpdateResult, error) { + return mm.Model.UpdateOne(ctx, filter, update, opts...) +} diff --git a/core/stores/monc/cachedmodel_test.go b/core/stores/monc/cachedmodel_test.go new file mode 100644 index 00000000..0abc68e0 --- /dev/null +++ b/core/stores/monc/cachedmodel_test.go @@ -0,0 +1,581 @@ +package monc + +import ( + "context" + "errors" + "sync/atomic" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/stores/cache" + "github.com/zeromicro/go-zero/core/stores/mon" + "github.com/zeromicro/go-zero/core/stores/redis" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" +) + +func TestNewModel(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + _, err := newModel("foo", mt.DB.Name(), mt.Coll.Name(), nil) + assert.NotNil(mt, err) + }) +} + +func TestModel_DelCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + assert.Nil(t, m.cache.Set("bar", "baz")) + assert.Nil(t, m.DelCache(context.Background(), "foo", "bar")) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + assert.True(t, m.cache.IsNotFound(m.cache.Get("bar", &v))) + }) +} + +func TestModel_DeleteOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + val, err := m.DeleteOne(context.Background(), "foo", bson.D{{Key: "foo", Value: "bar"}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), val) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + _, err = m.DeleteOne(context.Background(), "foo", bson.D{{Key: "foo", Value: "bar"}}) + assert.NotNil(t, err) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + _, err = m.DeleteOne(context.Background(), "foo", bson.D{{Key: "foo", Value: "bar"}}) + assert.Equal(t, errMocked, err) + }) +} + +func TestModel_DeleteOneNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "n", Value: 1}}...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + val, err := m.DeleteOneNoCache(context.Background(), bson.D{{Key: "foo", Value: "bar"}}) + assert.Nil(t, err) + assert.Equal(t, int64(1), val) + var v string + assert.Nil(t, m.cache.Get("foo", &v)) + }) +} + +func TestModel_FindOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + resp := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "foo", Value: "bar"}, + }) + mt.AddMockResponses(resp) + m := createModel(t, mt) + var v struct { + Foo string `bson:"foo"` + } + assert.Nil(t, m.FindOne(context.Background(), "foo", &v, bson.D{})) + assert.Equal(t, "bar", v.Foo) + assert.Nil(t, m.cache.Set("foo", "bar")) + }) +} + +func TestModel_FindOneNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + resp := mtest.CreateCursorResponse( + 1, + "DBName.CollectionName", + mtest.FirstBatch, + bson.D{ + {Key: "foo", Value: "bar"}, + }) + mt.AddMockResponses(resp) + m := createModel(t, mt) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneNoCache(context.Background(), &v, bson.D{})) + assert.Equal(t, "bar", v.Foo) + }) +} + +func TestModel_FindOneAndDelete(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneAndDelete(context.Background(), "foo", &v, bson.D{})) + assert.Equal(t, "bar", v.Foo) + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + assert.NotNil(t, m.FindOneAndDelete(context.Background(), "foo", &v, bson.D{})) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + assert.Equal(t, errMocked, m.FindOneAndDelete(context.Background(), "foo", &v, bson.D{})) + }) +} + +func TestModel_FindOneAndDeleteNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneAndDeleteNoCache(context.Background(), &v, bson.D{})) + assert.Equal(t, "bar", v.Foo) + }) +} + +func TestModel_FindOneAndReplace(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneAndReplace(context.Background(), "foo", &v, bson.D{}, bson.D{ + {Key: "name", Value: "Mary"}, + })) + assert.Equal(t, "bar", v.Foo) + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + assert.NotNil(t, m.FindOneAndReplace(context.Background(), "foo", &v, bson.D{}, bson.D{ + {Key: "name", Value: "Mary"}, + })) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + assert.Equal(t, errMocked, m.FindOneAndReplace(context.Background(), "foo", &v, bson.D{}, bson.D{ + {Key: "name", Value: "Mary"}, + })) + }) +} + +func TestModel_FindOneAndReplaceNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneAndReplaceNoCache(context.Background(), &v, bson.D{}, bson.D{ + {Key: "name", Value: "Mary"}, + })) + assert.Equal(t, "bar", v.Foo) + }) +} + +func TestModel_FindOneAndUpdate(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneAndUpdate(context.Background(), "foo", &v, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "name", Value: "Mary"}}}, + })) + assert.Equal(t, "bar", v.Foo) + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + assert.NotNil(t, m.FindOneAndUpdate(context.Background(), "foo", &v, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "name", Value: "Mary"}}}, + })) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + assert.Equal(t, errMocked, m.FindOneAndUpdate(context.Background(), "foo", &v, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "name", Value: "Mary"}}}, + })) + }) +} + +func TestModel_FindOneAndUpdateNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + v := struct { + Foo string `bson:"foo"` + }{} + assert.Nil(t, m.FindOneAndUpdateNoCache(context.Background(), &v, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "name", Value: "Mary"}}}, + })) + assert.Equal(t, "bar", v.Foo) + }) +} + +func TestModel_GetCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(t, mt) + assert.NotNil(t, m.cache) + assert.Nil(t, m.cache.Set("foo", "bar")) + var s string + assert.Nil(t, m.cache.Get("foo", &s)) + assert.Equal(t, "bar", s) + }) +} + +func TestModel_InsertOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + resp, err := m.InsertOne(context.Background(), "foo", bson.D{ + {Key: "name", Value: "Mary"}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + _, err = m.InsertOne(context.Background(), "foo", bson.D{ + {Key: "name", Value: "Mary"}, + }) + assert.NotNil(t, err) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + _, err = m.InsertOne(context.Background(), "foo", bson.D{ + {Key: "name", Value: "Mary"}, + }) + assert.Equal(t, errMocked, err) + }) +} + +func TestModel_InsertOneNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + resp, err := m.InsertOneNoCache(context.Background(), bson.D{ + {Key: "name", Value: "Mary"}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + }) +} + +func TestModel_ReplaceOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + resp, err := m.ReplaceOne(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "foo", Value: "baz"}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + _, err = m.ReplaceOne(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "foo", Value: "baz"}, + }) + assert.NotNil(t, err) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + _, err = m.ReplaceOne(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "foo", Value: "baz"}, + }) + assert.Equal(t, errMocked, err) + }) +} + +func TestModel_ReplaceOneNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + resp, err := m.ReplaceOneNoCache(context.Background(), bson.D{}, bson.D{ + {Key: "foo", Value: "baz"}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + }) +} + +func TestModel_SetCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + m := createModel(t, mt) + assert.Nil(t, m.SetCache("foo", "bar")) + var v string + assert.Nil(t, m.GetCache("foo", &v)) + assert.Equal(t, "bar", v) + }) +} + +func TestModel_UpdateByID(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + resp, err := m.UpdateByID(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + _, err = m.UpdateByID(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.NotNil(t, err) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + _, err = m.UpdateByID(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Equal(t, errMocked, err) + }) +} + +func TestModel_UpdateByIDNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + resp, err := m.UpdateByIDNoCache(context.Background(), bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + }) +} + +func TestModel_UpdateMany(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + assert.Nil(t, m.cache.Set("bar", "baz")) + resp, err := m.UpdateMany(context.Background(), []string{"foo", "bar"}, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + assert.True(t, m.cache.IsNotFound(m.cache.Get("bar", &v))) + _, err = m.UpdateMany(context.Background(), []string{"foo", "bar"}, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.NotNil(t, err) + + m.cache = mockedCache{m.cache} + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + _, err = m.UpdateMany(context.Background(), []string{"foo", "bar"}, bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Equal(t, errMocked, err) + }) +} + +func TestModel_UpdateManyNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + resp, err := m.UpdateManyNoCache(context.Background(), bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + }) +} + +func TestModel_UpdateOne(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + assert.Nil(t, m.cache.Set("foo", "bar")) + resp, err := m.UpdateOne(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + var v string + assert.True(t, m.cache.IsNotFound(m.cache.Get("foo", &v))) + _, err = m.UpdateOne(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.NotNil(t, err) + + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m.cache = mockedCache{m.cache} + _, err = m.UpdateOne(context.Background(), "foo", bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Equal(t, errMocked, err) + }) +} + +func TestModel_UpdateOneNoCache(t *testing.T) { + mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer mt.Close() + + mt.Run("test", func(mt *mtest.T) { + mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{ + {Key: "value", Value: bson.D{{Key: "foo", Value: "bar"}}}, + }...)) + m := createModel(t, mt) + resp, err := m.UpdateOneNoCache(context.Background(), bson.D{}, bson.D{ + {Key: "$set", Value: bson.D{{Key: "foo", Value: "baz"}}}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + }) +} + +func createModel(t *testing.T, mt *mtest.T) *Model { + s, err := miniredis.Run() + assert.Nil(t, err) + mon.Inject(mt.Name(), mt.Client) + if atomic.AddInt32(&index, 1)%2 == 0 { + return MustNewNodeModel(mt.Name(), mt.DB.Name(), mt.Coll.Name(), redis.New(s.Addr())) + } else { + return MustNewModel(mt.Name(), mt.DB.Name(), mt.Coll.Name(), cache.CacheConf{ + cache.NodeConf{ + RedisConf: redis.RedisConf{ + Host: s.Addr(), + Type: redis.NodeType, + }, + Weight: 100, + }, + }) + } +} + +var ( + errMocked = errors.New("mocked error") + index int32 +) + +type mockedCache struct { + cache.Cache +} + +func (m mockedCache) DelCtx(_ context.Context, _ ...string) error { + return errMocked +} diff --git a/core/stores/mongo/utils_test.go b/core/stores/mongo/util_test.go similarity index 100% rename from core/stores/mongo/utils_test.go rename to core/stores/mongo/util_test.go diff --git a/core/syncx/resourcemanager.go b/core/syncx/resourcemanager.go index 84e4220c..e863ba0a 100644 --- a/core/syncx/resourcemanager.go +++ b/core/syncx/resourcemanager.go @@ -68,3 +68,10 @@ func (manager *ResourceManager) GetResource(key string, create func() (io.Closer return val.(io.Closer), nil } + +// Inject injects the resource associated with given key. +func (manager *ResourceManager) Inject(key string, resource io.Closer) { + manager.lock.Lock() + manager.resources[key] = resource + manager.lock.Unlock() +} diff --git a/core/syncx/resourcemanager_test.go b/core/syncx/resourcemanager_test.go index 261bdb8e..a62b7892 100644 --- a/core/syncx/resourcemanager_test.go +++ b/core/syncx/resourcemanager_test.go @@ -47,6 +47,8 @@ func TestResourceManager_GetResourceError(t *testing.T) { func TestResourceManager_Close(t *testing.T) { manager := NewResourceManager() + defer manager.Close() + for i := 0; i < 10; i++ { _, err := manager.GetResource("key", func() (io.Closer, error) { return nil, errors.New("fail") @@ -61,6 +63,8 @@ func TestResourceManager_Close(t *testing.T) { func TestResourceManager_UseAfterClose(t *testing.T) { manager := NewResourceManager() + defer manager.Close() + _, err := manager.GetResource("key", func() (io.Closer, error) { return nil, errors.New("fail") }) @@ -72,3 +76,18 @@ func TestResourceManager_UseAfterClose(t *testing.T) { assert.NotNil(t, err) } } + +func TestResourceManager_Inject(t *testing.T) { + manager := NewResourceManager() + defer manager.Close() + + manager.Inject("key", &dummyResource{ + age: 10, + }) + + val, err := manager.GetResource("key", func() (io.Closer, error) { + return nil, nil + }) + assert.Nil(t, err) + assert.Equal(t, 10, val.(*dummyResource).age) +} diff --git a/go.mod b/go.mod index fdb4c4b2..5c7a743e 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/alicebob/miniredis/v2 v2.17.0 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 + github.com/go-redis/redis/v8 v8.11.4 github.com/go-sql-driver/mysql v1.6.0 github.com/golang-jwt/jwt/v4 v4.2.0 github.com/golang/mock v1.6.0 @@ -19,6 +20,7 @@ require ( github.com/stretchr/testify v1.7.0 go.etcd.io/etcd/api/v3 v3.5.2 go.etcd.io/etcd/client/v3 v3.5.2 + go.mongodb.org/mongo-driver v1.9.0 go.opentelemetry.io/otel v1.3.0 go.opentelemetry.io/otel/exporters/jaeger v1.3.0 go.opentelemetry.io/otel/exporters/zipkin v1.3.0 @@ -41,7 +43,6 @@ require ( require ( github.com/fatih/color v1.10.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-redis/redis/v8 v8.11.4 github.com/google/gofuzz v1.2.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect diff --git a/go.sum b/go.sum index 720f6faf..8ed0b604 100644 --- a/go.sum +++ b/go.sum @@ -157,6 +157,7 @@ github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Px github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -196,6 +197,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -269,6 +272,7 @@ github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo= github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNEe7i7qA= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -306,6 +310,7 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -393,10 +398,17 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.0.2 h1:akYIkZ28e6A96dkWNJQu3nmCzH3YfwMPQExUYDaRv7w= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= +github.com/xdg-go/stringprep v1.0.2 h1:6iq84/ryjjeRmMJwxutI51F2GIPlP5BfTvXHeYjyhBc= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -410,6 +422,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.2 h1:4hzqQ6hIb3blLyQ8usCU4h3NghkqcsohEQ3o3Vet go.etcd.io/etcd/client/pkg/v3 v3.5.2/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.2 h1:WdnejrUtQC4nCxK0/dLTMqKOB+U5TP/2Ya0BJL+1otA= go.etcd.io/etcd/client/v3 v3.5.2/go.mod h1:kOOaWFFgHygyT0WlSmL8TJiXmMysO/nNUlEsSsN6W4o= +go.mongodb.org/mongo-driver v1.9.0 h1:f3aLGJvQmBl8d9S40IL+jEyBC6hfLPbJjv9t5hEM9ck= +go.mongodb.org/mongo-driver v1.9.0/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -449,6 +463,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 h1:kETrAMYZq6WVGPa8IIixL0CaEcIUNi+1WX7grUoi3y8= golang.org/x/crypto v0.0.0-20210920023735-84f357641f63/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -543,6 +558,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -598,6 +614,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 h1:nhht2DYV/Sn3qOayu8lM+cU1ii9sTLUeBQwQQfUHtrs= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -625,6 +642,7 @@ golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= diff --git a/rest/handler/timeouthandler_test.go b/rest/handler/timeouthandler_test.go index d4d2ad20..f084a681 100644 --- a/rest/handler/timeouthandler_test.go +++ b/rest/handler/timeouthandler_test.go @@ -122,7 +122,7 @@ func TestTimeoutWriteBadCode(t *testing.T) { func TestTimeoutClientClosed(t *testing.T) { timeoutHandler := TimeoutHandler(time.Minute) handler := timeoutHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(1000) + w.WriteHeader(http.StatusServiceUnavailable) })) req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)