chore: remove mgo related packages (#2799)

This commit is contained in:
Kevin Wan 2023-01-16 23:13:59 +08:00 committed by GitHub
parent 6d129e0264
commit 2bfecf9354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1 additions and 3299 deletions

View File

@ -1,44 +0,0 @@
package jsontype
import (
"encoding/json"
"time"
"github.com/globalsign/mgo/bson"
)
// MilliTime represents time.Time that works better with mongodb.
type MilliTime struct {
time.Time
}
// MarshalJSON marshals mt to json bytes.
func (mt MilliTime) MarshalJSON() ([]byte, error) {
return json.Marshal(mt.Milli())
}
// UnmarshalJSON unmarshals data into mt.
func (mt *MilliTime) UnmarshalJSON(data []byte) error {
var milli int64
if err := json.Unmarshal(data, &milli); err != nil {
return err
}
mt.Time = time.Unix(0, milli*int64(time.Millisecond))
return nil
}
// GetBSON returns BSON base on mt.
func (mt MilliTime) GetBSON() (interface{}, error) {
return mt.Time, nil
}
// SetBSON sets raw into mt.
func (mt *MilliTime) SetBSON(raw bson.Raw) error {
return raw.Unmarshal(&mt.Time)
}
// Milli returns milliseconds for mt.
func (mt MilliTime) Milli() int64 {
return mt.UnixNano() / int64(time.Millisecond)
}

View File

@ -1,126 +0,0 @@
package jsontype
import (
"strconv"
"testing"
"time"
"github.com/globalsign/mgo/bson"
"github.com/stretchr/testify/assert"
)
func TestMilliTime_GetBSON(t *testing.T) {
tests := []struct {
name string
tm time.Time
}{
{
name: "now",
tm: time.Now(),
},
{
name: "future",
tm: time.Now().Add(time.Hour),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := MilliTime{test.tm}.GetBSON()
assert.Nil(t, err)
assert.Equal(t, test.tm, got)
})
}
}
func TestMilliTime_MarshalJSON(t *testing.T) {
tests := []struct {
name string
tm time.Time
}{
{
name: "now",
tm: time.Now(),
},
{
name: "future",
tm: time.Now().Add(time.Hour),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b, err := MilliTime{test.tm}.MarshalJSON()
assert.Nil(t, err)
assert.Equal(t, strconv.FormatInt(test.tm.UnixNano()/1e6, 10), string(b))
})
}
}
func TestMilliTime_Milli(t *testing.T) {
tests := []struct {
name string
tm time.Time
}{
{
name: "now",
tm: time.Now(),
},
{
name: "future",
tm: time.Now().Add(time.Hour),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
n := MilliTime{test.tm}.Milli()
assert.Equal(t, test.tm.UnixNano()/1e6, n)
})
}
}
func TestMilliTime_UnmarshalJSON(t *testing.T) {
tests := []struct {
name string
tm time.Time
}{
{
name: "now",
tm: time.Now(),
},
{
name: "future",
tm: time.Now().Add(time.Hour),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var mt MilliTime
s := strconv.FormatInt(test.tm.UnixNano()/1e6, 10)
err := mt.UnmarshalJSON([]byte(s))
assert.Nil(t, err)
s1, err := mt.MarshalJSON()
assert.Nil(t, err)
assert.Equal(t, s, string(s1))
})
}
}
func TestUnmarshalWithError(t *testing.T) {
var mt MilliTime
assert.NotNil(t, mt.UnmarshalJSON([]byte("hello")))
}
func TestSetBSON(t *testing.T) {
data, err := bson.Marshal(time.Now())
assert.Nil(t, err)
var raw bson.Raw
assert.Nil(t, bson.Unmarshal(data, &raw))
var mt MilliTime
assert.Nil(t, mt.SetBSON(raw))
assert.NotNil(t, mt.SetBSON(bson.Raw{}))
}

View File

@ -1,92 +0,0 @@
package mongo
import (
"time"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
)
const (
flushInterval = time.Second
maxBulkRows = 1000
)
type (
// ResultHandler is a handler that used to handle results.
ResultHandler func(*mgo.BulkResult, error)
// A BulkInserter is used to insert bulk of mongo records.
BulkInserter struct {
executor *executors.PeriodicalExecutor
inserter *dbInserter
}
)
// NewBulkInserter returns a BulkInserter.
func NewBulkInserter(session *mgo.Session, dbName string, collectionNamer func() string) *BulkInserter {
inserter := &dbInserter{
session: session,
dbName: dbName,
collectionNamer: collectionNamer,
}
return &BulkInserter{
executor: executors.NewPeriodicalExecutor(flushInterval, inserter),
inserter: inserter,
}
}
// Flush flushes the inserter, writes all pending records.
func (bi *BulkInserter) Flush() {
bi.executor.Flush()
}
// Insert inserts doc.
func (bi *BulkInserter) Insert(doc interface{}) {
bi.executor.Add(doc)
}
// SetResultHandler sets the result handler.
func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
bi.executor.Sync(func() {
bi.inserter.resultHandler = handler
})
}
type dbInserter struct {
session *mgo.Session
dbName string
collectionNamer func() string
documents []interface{}
resultHandler ResultHandler
}
func (in *dbInserter) AddTask(doc interface{}) bool {
in.documents = append(in.documents, doc)
return len(in.documents) >= maxBulkRows
}
func (in *dbInserter) Execute(objs interface{}) {
docs := objs.([]interface{})
if len(docs) == 0 {
return
}
bulk := in.session.DB(in.dbName).C(in.collectionNamer()).Bulk()
bulk.Insert(docs...)
bulk.Unordered()
result, err := bulk.Run()
if in.resultHandler != nil {
in.resultHandler(result, err)
} else if err != nil {
logx.Error(err)
}
}
func (in *dbInserter) RemoveAll() interface{} {
documents := in.documents
in.documents = nil
return documents
}

View File

@ -1,242 +0,0 @@
package mongo
import (
"encoding/json"
"time"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/mongo/internal"
"github.com/zeromicro/go-zero/core/timex"
)
const defaultSlowThreshold = time.Millisecond * 500
// ErrNotFound is an alias of mgo.ErrNotFound.
var ErrNotFound = mgo.ErrNotFound
type (
// Collection interface represents a mongo connection.
Collection interface {
Find(query interface{}) Query
FindId(id interface{}) Query
Insert(docs ...interface{}) error
Pipe(pipeline interface{}) Pipe
Remove(selector interface{}) error
RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
RemoveId(id interface{}) error
Update(selector, update interface{}) error
UpdateId(id, update interface{}) error
Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
}
decoratedCollection struct {
name string
collection internal.MgoCollection
brk breaker.Breaker
}
keepablePromise struct {
promise breaker.Promise
log func(error)
}
)
func newCollection(collection *mgo.Collection, brk breaker.Breaker) Collection {
return &decoratedCollection{
name: collection.FullName,
collection: collection,
brk: brk,
}
}
func (c *decoratedCollection) Find(query interface{}) Query {
promise, err := c.brk.Allow()
if err != nil {
return rejectedQuery{}
}
startTime := timex.Now()
return promisedQuery{
Query: c.collection.Find(query),
promise: keepablePromise{
promise: promise,
log: func(err error) {
duration := timex.Since(startTime)
c.logDuration("find", duration, err, query)
},
},
}
}
func (c *decoratedCollection) FindId(id interface{}) Query {
promise, err := c.brk.Allow()
if err != nil {
return rejectedQuery{}
}
startTime := timex.Now()
return promisedQuery{
Query: c.collection.FindId(id),
promise: keepablePromise{
promise: promise,
log: func(err error) {
duration := timex.Since(startTime)
c.logDuration("findId", duration, err, id)
},
},
}
}
func (c *decoratedCollection) Insert(docs ...interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("insert", duration, err, docs...)
}()
return c.collection.Insert(docs...)
}, acceptable)
}
func (c *decoratedCollection) Pipe(pipeline interface{}) Pipe {
promise, err := c.brk.Allow()
if err != nil {
return rejectedPipe{}
}
startTime := timex.Now()
return promisedPipe{
Pipe: c.collection.Pipe(pipeline),
promise: keepablePromise{
promise: promise,
log: func(err error) {
duration := timex.Since(startTime)
c.logDuration("pipe", duration, err, pipeline)
},
},
}
}
func (c *decoratedCollection) Remove(selector interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("remove", duration, err, selector)
}()
return c.collection.Remove(selector)
}, acceptable)
}
func (c *decoratedCollection) RemoveAll(selector interface{}) (info *mgo.ChangeInfo, err error) {
err = c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("removeAll", duration, err, selector)
}()
info, err = c.collection.RemoveAll(selector)
return err
}, acceptable)
return
}
func (c *decoratedCollection) RemoveId(id interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("removeId", duration, err, id)
}()
return c.collection.RemoveId(id)
}, acceptable)
}
func (c *decoratedCollection) Update(selector, update interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("update", duration, err, selector, update)
}()
return c.collection.Update(selector, update)
}, acceptable)
}
func (c *decoratedCollection) UpdateId(id, update interface{}) (err error) {
return c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("updateId", duration, err, id, update)
}()
return c.collection.UpdateId(id, update)
}, acceptable)
}
func (c *decoratedCollection) Upsert(selector, update interface{}) (info *mgo.ChangeInfo, err error) {
err = c.brk.DoWithAcceptable(func() error {
startTime := timex.Now()
defer func() {
duration := timex.Since(startTime)
c.logDuration("upsert", duration, err, selector, update)
}()
info, err = c.collection.Upsert(selector, update)
return err
}, acceptable)
return
}
func (c *decoratedCollection) logDuration(method string, duration time.Duration, err error, docs ...interface{}) {
content, e := json.Marshal(docs)
if e != nil {
logx.Error(err)
} else if err != nil {
if duration > slowThreshold.Load() {
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - fail(%s) - %s",
c.name, method, err.Error(), string(content))
} else {
logx.WithDuration(duration).Infof("mongo(%s) - %s - fail(%s) - %s",
c.name, method, err.Error(), string(content))
}
} else {
if duration > slowThreshold.Load() {
logx.WithDuration(duration).Slowf("[MONGO] mongo(%s) - slowcall - %s - ok - %s",
c.name, method, string(content))
} else {
logx.WithDuration(duration).Infof("mongo(%s) - %s - ok - %s", c.name, method, string(content))
}
}
}
func (p keepablePromise) accept(err error) error {
p.promise.Accept()
p.log(err)
return err
}
func (p keepablePromise) keep(err error) error {
if acceptable(err) {
p.promise.Accept()
} else {
p.promise.Reject(err.Error())
}
p.log(err)
return err
}
func acceptable(err error) bool {
return err == nil || err == mgo.ErrNotFound
}

View File

@ -1,345 +0,0 @@
package mongo
import (
"errors"
"strings"
"testing"
"time"
"github.com/globalsign/mgo"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/mongo/internal"
"github.com/zeromicro/go-zero/core/stringx"
)
var errDummy = errors.New("dummy")
func TestKeepPromise_accept(t *testing.T) {
p := new(mockPromise)
kp := keepablePromise{
promise: p,
log: func(error) {},
}
assert.Nil(t, kp.accept(nil))
assert.Equal(t, mgo.ErrNotFound, kp.accept(mgo.ErrNotFound))
}
func TestKeepPromise_keep(t *testing.T) {
tests := []struct {
err error
accepted bool
reason string
}{
{
err: nil,
accepted: true,
reason: "",
},
{
err: mgo.ErrNotFound,
accepted: true,
reason: "",
},
{
err: errors.New("any"),
accepted: false,
reason: "any",
},
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
p := new(mockPromise)
kp := keepablePromise{
promise: p,
log: func(error) {},
}
assert.Equal(t, test.err, kp.keep(test.err))
assert.Equal(t, test.accepted, p.accepted)
assert.Equal(t, test.reason, p.reason)
})
}
}
func TestNewCollection(t *testing.T) {
col := newCollection(&mgo.Collection{
Database: nil,
Name: "foo",
FullName: "bar",
}, breaker.GetBreaker("localhost"))
assert.Equal(t, "bar", col.(*decoratedCollection).name)
}
func TestCollectionFind(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var query mgo.Query
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().Find(gomock.Any()).Return(&query)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
actual := c.Find(nil)
switch v := actual.(type) {
case promisedQuery:
assert.Equal(t, &query, v.Query)
assert.Equal(t, errDummy, v.promise.keep(errDummy))
default:
t.Fail()
}
c.brk = new(dropBreaker)
actual = c.Find(nil)
assert.Equal(t, rejectedQuery{}, actual)
}
func TestCollectionFindId(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var query mgo.Query
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().FindId(gomock.Any()).Return(&query)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
actual := c.FindId(nil)
switch v := actual.(type) {
case promisedQuery:
assert.Equal(t, &query, v.Query)
assert.Equal(t, errDummy, v.promise.keep(errDummy))
default:
t.Fail()
}
c.brk = new(dropBreaker)
actual = c.FindId(nil)
assert.Equal(t, rejectedQuery{}, actual)
}
func TestCollectionInsert(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().Insert(nil, nil).Return(errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
err := c.Insert(nil, nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
err = c.Insert(nil, nil)
assert.Equal(t, errDummy, err)
}
func TestCollectionPipe(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
var pipe mgo.Pipe
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().Pipe(gomock.Any()).Return(&pipe)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
actual := c.Pipe(nil)
switch v := actual.(type) {
case promisedPipe:
assert.Equal(t, &pipe, v.Pipe)
assert.Equal(t, errDummy, v.promise.keep(errDummy))
default:
t.Fail()
}
c.brk = new(dropBreaker)
actual = c.Pipe(nil)
assert.Equal(t, rejectedPipe{}, actual)
}
func TestCollectionRemove(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().Remove(gomock.Any()).Return(errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
err := c.Remove(nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
err = c.Remove(nil)
assert.Equal(t, errDummy, err)
}
func TestCollectionRemoveAll(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().RemoveAll(gomock.Any()).Return(nil, errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
_, err := c.RemoveAll(nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
_, err = c.RemoveAll(nil)
assert.Equal(t, errDummy, err)
}
func TestCollectionRemoveId(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().RemoveId(gomock.Any()).Return(errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
err := c.RemoveId(nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
err = c.RemoveId(nil)
assert.Equal(t, errDummy, err)
}
func TestCollectionUpdate(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().Update(gomock.Any(), gomock.Any()).Return(errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
err := c.Update(nil, nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
err = c.Update(nil, nil)
assert.Equal(t, errDummy, err)
}
func TestCollectionUpdateId(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().UpdateId(gomock.Any(), gomock.Any()).Return(errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
err := c.UpdateId(nil, nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
err = c.UpdateId(nil, nil)
assert.Equal(t, errDummy, err)
}
func TestCollectionUpsert(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
col.EXPECT().Upsert(gomock.Any(), gomock.Any()).Return(nil, errDummy)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
_, err := c.Upsert(nil, nil)
assert.Equal(t, errDummy, err)
c.brk = new(dropBreaker)
_, err = c.Upsert(nil, nil)
assert.Equal(t, errDummy, err)
}
func Test_logDuration(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
col := internal.NewMockMgoCollection(ctrl)
c := decoratedCollection{
collection: col,
brk: breaker.NewBreaker(),
}
var buf strings.Builder
w := logx.NewWriter(&buf)
o := logx.Reset()
logx.SetWriter(w)
defer func() {
logx.Reset()
logx.SetWriter(o)
}()
buf.Reset()
c.logDuration("foo", time.Millisecond, nil, "bar")
assert.Contains(t, buf.String(), "foo")
assert.Contains(t, buf.String(), "bar")
buf.Reset()
c.logDuration("foo", time.Millisecond, errors.New("bar"), make(chan int))
assert.Contains(t, buf.String(), "bar")
buf.Reset()
c.logDuration("foo", slowThreshold.Load()+time.Millisecond, errors.New("bar"))
assert.Contains(t, buf.String(), "bar")
assert.Contains(t, buf.String(), "slowcall")
buf.Reset()
c.logDuration("foo", slowThreshold.Load()+time.Millisecond, nil)
assert.Contains(t, buf.String(), "foo")
assert.Contains(t, buf.String(), "slowcall")
}
type mockPromise struct {
accepted bool
reason string
}
func (p *mockPromise) Accept() {
p.accepted = true
}
func (p *mockPromise) Reject(reason string) {
p.reason = reason
}
type dropBreaker struct{}
func (d *dropBreaker) Name() string {
return "dummy"
}
func (d *dropBreaker) Allow() (breaker.Promise, error) {
return nil, errDummy
}
func (d *dropBreaker) Do(req func() error) error {
return nil
}
func (d *dropBreaker) DoWithAcceptable(req func() error, acceptable breaker.Acceptable) error {
return errDummy
}
func (d *dropBreaker) DoWithFallback(req func() error, fallback func(err error) error) error {
return nil
}
func (d *dropBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error,
acceptable breaker.Acceptable) error {
return nil
}

View File

@ -1,19 +0,0 @@
//go:generate mockgen -package internal -destination collection_mock.go -source collection.go
package internal
import "github.com/globalsign/mgo"
// MgoCollection interface represents a mgo collection.
type MgoCollection interface {
Find(query interface{}) *mgo.Query
FindId(id interface{}) *mgo.Query
Insert(docs ...interface{}) error
Pipe(pipeline interface{}) *mgo.Pipe
Remove(selector interface{}) error
RemoveAll(selector interface{}) (*mgo.ChangeInfo, error)
RemoveId(id interface{}) error
Update(selector, update interface{}) error
UpdateId(id, update interface{}) error
Upsert(selector, update interface{}) (*mgo.ChangeInfo, error)
}

View File

@ -1,181 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: collection.go
// Package internal is a generated GoMock package.
package internal
import (
reflect "reflect"
mgo "github.com/globalsign/mgo"
gomock "github.com/golang/mock/gomock"
)
// MockMgoCollection is a mock of MgoCollection interface
type MockMgoCollection struct {
ctrl *gomock.Controller
recorder *MockMgoCollectionMockRecorder
}
// MockMgoCollectionMockRecorder is the mock recorder for MockMgoCollection
type MockMgoCollectionMockRecorder struct {
mock *MockMgoCollection
}
// NewMockMgoCollection creates a new mock instance
func NewMockMgoCollection(ctrl *gomock.Controller) *MockMgoCollection {
mock := &MockMgoCollection{ctrl: ctrl}
mock.recorder = &MockMgoCollectionMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockMgoCollection) EXPECT() *MockMgoCollectionMockRecorder {
return m.recorder
}
// Find mocks base method
func (m *MockMgoCollection) Find(query interface{}) *mgo.Query {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Find", query)
ret0, _ := ret[0].(*mgo.Query)
return ret0
}
// Find indicates an expected call of Find
func (mr *MockMgoCollectionMockRecorder) Find(query interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Find", reflect.TypeOf((*MockMgoCollection)(nil).Find), query)
}
// FindId mocks base method
func (m *MockMgoCollection) FindId(id interface{}) *mgo.Query {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindId", id)
ret0, _ := ret[0].(*mgo.Query)
return ret0
}
// FindId indicates an expected call of FindId
func (mr *MockMgoCollectionMockRecorder) FindId(id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindId", reflect.TypeOf((*MockMgoCollection)(nil).FindId), id)
}
// Insert mocks base method
func (m *MockMgoCollection) Insert(docs ...interface{}) error {
m.ctrl.T.Helper()
varargs := []interface{}{}
for _, a := range docs {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Insert", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// Insert indicates an expected call of Insert
func (mr *MockMgoCollectionMockRecorder) Insert(docs ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockMgoCollection)(nil).Insert), docs...)
}
// Pipe mocks base method
func (m *MockMgoCollection) Pipe(pipeline interface{}) *mgo.Pipe {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Pipe", pipeline)
ret0, _ := ret[0].(*mgo.Pipe)
return ret0
}
// Pipe indicates an expected call of Pipe
func (mr *MockMgoCollectionMockRecorder) Pipe(pipeline interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pipe", reflect.TypeOf((*MockMgoCollection)(nil).Pipe), pipeline)
}
// Remove mocks base method
func (m *MockMgoCollection) Remove(selector interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Remove", selector)
ret0, _ := ret[0].(error)
return ret0
}
// Remove indicates an expected call of Remove
func (mr *MockMgoCollectionMockRecorder) Remove(selector interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockMgoCollection)(nil).Remove), selector)
}
// RemoveAll mocks base method
func (m *MockMgoCollection) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RemoveAll", selector)
ret0, _ := ret[0].(*mgo.ChangeInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RemoveAll indicates an expected call of RemoveAll
func (mr *MockMgoCollectionMockRecorder) RemoveAll(selector interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveAll", reflect.TypeOf((*MockMgoCollection)(nil).RemoveAll), selector)
}
// RemoveId mocks base method
func (m *MockMgoCollection) RemoveId(id interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RemoveId", id)
ret0, _ := ret[0].(error)
return ret0
}
// RemoveId indicates an expected call of RemoveId
func (mr *MockMgoCollectionMockRecorder) RemoveId(id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveId", reflect.TypeOf((*MockMgoCollection)(nil).RemoveId), id)
}
// Update mocks base method
func (m *MockMgoCollection) Update(selector, update interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Update", selector, update)
ret0, _ := ret[0].(error)
return ret0
}
// Update indicates an expected call of Update
func (mr *MockMgoCollectionMockRecorder) Update(selector, update interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockMgoCollection)(nil).Update), selector, update)
}
// UpdateId mocks base method
func (m *MockMgoCollection) UpdateId(id, update interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateId", id, update)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateId indicates an expected call of UpdateId
func (mr *MockMgoCollectionMockRecorder) UpdateId(id, update interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateId", reflect.TypeOf((*MockMgoCollection)(nil).UpdateId), id, update)
}
// Upsert mocks base method
func (m *MockMgoCollection) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Upsert", selector, update)
ret0, _ := ret[0].(*mgo.ChangeInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Upsert indicates an expected call of Upsert
func (mr *MockMgoCollectionMockRecorder) Upsert(selector, update interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upsert", reflect.TypeOf((*MockMgoCollection)(nil).Upsert), selector, update)
}

View File

@ -1,99 +0,0 @@
//go:generate mockgen -package mongo -destination iter_mock.go -source iter.go Iter
package mongo
import (
"github.com/globalsign/mgo/bson"
"github.com/zeromicro/go-zero/core/breaker"
)
type (
// Iter interface represents a mongo iter.
Iter interface {
All(result interface{}) error
Close() error
Done() bool
Err() error
For(result interface{}, f func() error) error
Next(result interface{}) bool
State() (int64, []bson.Raw)
Timeout() bool
}
// A ClosableIter is a closable mongo iter.
ClosableIter struct {
Iter
Cleanup func()
}
promisedIter struct {
Iter
promise keepablePromise
}
rejectedIter struct{}
)
func (i promisedIter) All(result interface{}) error {
return i.promise.keep(i.Iter.All(result))
}
func (i promisedIter) Close() error {
return i.promise.keep(i.Iter.Close())
}
func (i promisedIter) Err() error {
return i.Iter.Err()
}
func (i promisedIter) For(result interface{}, f func() error) error {
var ferr error
err := i.Iter.For(result, func() error {
ferr = f()
return ferr
})
if ferr == err {
return i.promise.accept(err)
}
return i.promise.keep(err)
}
// Close closes a mongo iter.
func (it *ClosableIter) Close() error {
err := it.Iter.Close()
it.Cleanup()
return err
}
func (i rejectedIter) All(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (i rejectedIter) Close() error {
return breaker.ErrServiceUnavailable
}
func (i rejectedIter) Done() bool {
return false
}
func (i rejectedIter) Err() error {
return breaker.ErrServiceUnavailable
}
func (i rejectedIter) For(result interface{}, f func() error) error {
return breaker.ErrServiceUnavailable
}
func (i rejectedIter) Next(result interface{}) bool {
return false
}
func (i rejectedIter) State() (int64, []bson.Raw) {
return 0, nil
}
func (i rejectedIter) Timeout() bool {
return false
}

View File

@ -1,148 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: iter.go
// Package mongo is a generated GoMock package.
package mongo
import (
reflect "reflect"
bson "github.com/globalsign/mgo/bson"
gomock "github.com/golang/mock/gomock"
)
// MockIter is a mock of Iter interface
type MockIter struct {
ctrl *gomock.Controller
recorder *MockIterMockRecorder
}
// MockIterMockRecorder is the mock recorder for MockIter
type MockIterMockRecorder struct {
mock *MockIter
}
// NewMockIter creates a new mock instance
func NewMockIter(ctrl *gomock.Controller) *MockIter {
mock := &MockIter{ctrl: ctrl}
mock.recorder = &MockIterMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockIter) EXPECT() *MockIterMockRecorder {
return m.recorder
}
// All mocks base method
func (m *MockIter) All(result interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "All", result)
ret0, _ := ret[0].(error)
return ret0
}
// All indicates an expected call of All
func (mr *MockIterMockRecorder) All(result interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "All", reflect.TypeOf((*MockIter)(nil).All), result)
}
// Close mocks base method
func (m *MockIter) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close
func (mr *MockIterMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockIter)(nil).Close))
}
// Done mocks base method
func (m *MockIter) Done() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Done")
ret0, _ := ret[0].(bool)
return ret0
}
// Done indicates an expected call of Done
func (mr *MockIterMockRecorder) Done() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockIter)(nil).Done))
}
// Err mocks base method
func (m *MockIter) Err() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Err")
ret0, _ := ret[0].(error)
return ret0
}
// Err indicates an expected call of Err
func (mr *MockIterMockRecorder) Err() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockIter)(nil).Err))
}
// For mocks base method
func (m *MockIter) For(result interface{}, f func() error) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "For", result, f)
ret0, _ := ret[0].(error)
return ret0
}
// For indicates an expected call of For
func (mr *MockIterMockRecorder) For(result, f interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "For", reflect.TypeOf((*MockIter)(nil).For), result, f)
}
// Next mocks base method
func (m *MockIter) Next(result interface{}) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Next", result)
ret0, _ := ret[0].(bool)
return ret0
}
// Next indicates an expected call of Next
func (mr *MockIterMockRecorder) Next(result interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockIter)(nil).Next), result)
}
// State mocks base method
func (m *MockIter) State() (int64, []bson.Raw) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "State")
ret0, _ := ret[0].(int64)
ret1, _ := ret[1].([]bson.Raw)
return ret0, ret1
}
// State indicates an expected call of State
func (mr *MockIterMockRecorder) State() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "State", reflect.TypeOf((*MockIter)(nil).State))
}
// Timeout mocks base method
func (m *MockIter) Timeout() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Timeout")
ret0, _ := ret[0].(bool)
return ret0
}
// Timeout indicates an expected call of Timeout
func (mr *MockIterMockRecorder) Timeout() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Timeout", reflect.TypeOf((*MockIter)(nil).Timeout))
}

View File

@ -1,264 +0,0 @@
package mongo
import (
"errors"
"testing"
"github.com/globalsign/mgo"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/stringx"
"github.com/zeromicro/go-zero/core/syncx"
)
func TestClosableIter_Close(t *testing.T) {
errs := []error{
nil,
mgo.ErrNotFound,
}
for _, err := range errs {
t.Run(stringx.RandId(), func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cleaned := syncx.NewAtomicBool()
iter := NewMockIter(ctrl)
iter.EXPECT().Close().Return(err)
ci := ClosableIter{
Iter: iter,
Cleanup: func() {
cleaned.Set(true)
},
}
assert.Equal(t, err, ci.Close())
assert.True(t, cleaned.True())
})
}
}
func TestPromisedIter_AllAndClose(t *testing.T) {
tests := []struct {
err error
accepted bool
reason string
}{
{
err: nil,
accepted: true,
reason: "",
},
{
err: mgo.ErrNotFound,
accepted: true,
reason: "",
},
{
err: errors.New("any"),
accepted: false,
reason: "any",
},
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().All(gomock.Any()).Return(test.err)
promise := new(mockPromise)
pi := promisedIter{
Iter: iter,
promise: keepablePromise{
promise: promise,
log: func(error) {},
},
}
assert.Equal(t, test.err, pi.All(nil))
assert.Equal(t, test.accepted, promise.accepted)
assert.Equal(t, test.reason, promise.reason)
})
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().Close().Return(test.err)
promise := new(mockPromise)
pi := promisedIter{
Iter: iter,
promise: keepablePromise{
promise: promise,
log: func(error) {},
},
}
assert.Equal(t, test.err, pi.Close())
assert.Equal(t, test.accepted, promise.accepted)
assert.Equal(t, test.reason, promise.reason)
})
}
}
func TestPromisedIter_Err(t *testing.T) {
errs := []error{
nil,
mgo.ErrNotFound,
}
for _, err := range errs {
t.Run(stringx.RandId(), func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().Err().Return(err)
promise := new(mockPromise)
pi := promisedIter{
Iter: iter,
promise: keepablePromise{
promise: promise,
log: func(error) {},
},
}
assert.Equal(t, err, pi.Err())
})
}
}
func TestPromisedIter_For(t *testing.T) {
tests := []struct {
err error
accepted bool
reason string
}{
{
err: nil,
accepted: true,
reason: "",
},
{
err: mgo.ErrNotFound,
accepted: true,
reason: "",
},
{
err: errors.New("any"),
accepted: false,
reason: "any",
},
}
for _, test := range tests {
t.Run(stringx.RandId(), func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().For(gomock.Any(), gomock.Any()).Return(test.err)
promise := new(mockPromise)
pi := promisedIter{
Iter: iter,
promise: keepablePromise{
promise: promise,
log: func(error) {},
},
}
assert.Equal(t, test.err, pi.For(nil, nil))
assert.Equal(t, test.accepted, promise.accepted)
assert.Equal(t, test.reason, promise.reason)
})
}
}
func TestRejectedIter_All(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedIter).All(nil))
}
func TestRejectedIter_Close(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedIter).Close())
}
func TestRejectedIter_Done(t *testing.T) {
assert.False(t, new(rejectedIter).Done())
}
func TestRejectedIter_Err(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedIter).Err())
}
func TestRejectedIter_For(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedIter).For(nil, nil))
}
func TestRejectedIter_Next(t *testing.T) {
assert.False(t, new(rejectedIter).Next(nil))
}
func TestRejectedIter_State(t *testing.T) {
n, raw := new(rejectedIter).State()
assert.Equal(t, int64(0), n)
assert.Nil(t, raw)
}
func TestRejectedIter_Timeout(t *testing.T) {
assert.False(t, new(rejectedIter).Timeout())
}
func TestIter_Done(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().Done().Return(true)
ci := ClosableIter{
Iter: iter,
Cleanup: nil,
}
assert.True(t, ci.Done())
}
func TestIter_Next(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().Next(gomock.Any()).Return(true)
ci := ClosableIter{
Iter: iter,
Cleanup: nil,
}
assert.True(t, ci.Next(nil))
}
func TestIter_State(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().State().Return(int64(1), nil)
ci := ClosableIter{
Iter: iter,
Cleanup: nil,
}
n, raw := ci.State()
assert.Equal(t, int64(1), n)
assert.Nil(t, raw)
}
func TestIter_Timeout(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
iter := NewMockIter(ctrl)
iter.EXPECT().Timeout().Return(true)
ci := ClosableIter{
Iter: iter,
Cleanup: nil,
}
assert.True(t, ci.Timeout())
}

View File

@ -1,177 +0,0 @@
package mongo
import (
"log"
"time"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/breaker"
)
// A Model is a mongo model.
type Model struct {
session *concurrentSession
db *mgo.Database
collection string
brk breaker.Breaker
opts []Option
}
// MustNewModel returns a Model, exits on errors.
func MustNewModel(url, collection string, opts ...Option) *Model {
model, err := NewModel(url, collection, opts...)
if err != nil {
log.Fatal(err)
}
return model
}
// NewModel returns a Model.
func NewModel(url, collection string, opts ...Option) (*Model, error) {
session, err := getConcurrentSession(url)
if err != nil {
return nil, err
}
return &Model{
session: session,
// If name is empty, the database name provided in the dialed URL is used instead
db: session.DB(""),
collection: collection,
brk: breaker.GetBreaker(url),
opts: opts,
}, nil
}
// Find finds a record with given query.
func (mm *Model) Find(query interface{}) (Query, error) {
return mm.query(func(c Collection) Query {
return c.Find(query)
})
}
// FindId finds a record with given id.
func (mm *Model) FindId(id interface{}) (Query, error) {
return mm.query(func(c Collection) Query {
return c.FindId(id)
})
}
// GetCollection returns a Collection with given session.
func (mm *Model) GetCollection(session *mgo.Session) Collection {
return newCollection(mm.db.C(mm.collection).With(session), mm.brk)
}
// Insert inserts docs into mm.
func (mm *Model) Insert(docs ...interface{}) error {
return mm.execute(func(c Collection) error {
return c.Insert(docs...)
})
}
// Pipe returns a Pipe with given pipeline.
func (mm *Model) Pipe(pipeline interface{}) (Pipe, error) {
return mm.pipe(func(c Collection) Pipe {
return c.Pipe(pipeline)
})
}
// PutSession returns the given session.
func (mm *Model) PutSession(session *mgo.Session) {
mm.session.putSession(session)
}
// Remove removes the records with given selector.
func (mm *Model) Remove(selector interface{}) error {
return mm.execute(func(c Collection) error {
return c.Remove(selector)
})
}
// RemoveAll removes all with given selector and returns a mgo.ChangeInfo.
func (mm *Model) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
return c.RemoveAll(selector)
})
}
// RemoveId removes a record with given id.
func (mm *Model) RemoveId(id interface{}) error {
return mm.execute(func(c Collection) error {
return c.RemoveId(id)
})
}
// TakeSession gets a session.
func (mm *Model) TakeSession() (*mgo.Session, error) {
return mm.session.takeSession(mm.opts...)
}
// Update updates a record with given selector.
func (mm *Model) Update(selector, update interface{}) error {
return mm.execute(func(c Collection) error {
return c.Update(selector, update)
})
}
// UpdateId updates a record with given id.
func (mm *Model) UpdateId(id, update interface{}) error {
return mm.execute(func(c Collection) error {
return c.UpdateId(id, update)
})
}
// Upsert upserts a record with given selector, and returns a mgo.ChangeInfo.
func (mm *Model) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c Collection) (*mgo.ChangeInfo, error) {
return c.Upsert(selector, update)
})
}
func (mm *Model) change(fn func(c Collection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session))
}
func (mm *Model) execute(fn func(c Collection) error) error {
session, err := mm.TakeSession()
if err != nil {
return err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session))
}
func (mm *Model) pipe(fn func(c Collection) Pipe) (Pipe, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session)), nil
}
func (mm *Model) query(fn func(c Collection) Query) (Query, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session)), nil
}
// WithTimeout customizes an operation with given timeout.
func WithTimeout(timeout time.Duration) Option {
return func(opts *options) {
opts.timeout = timeout
}
}

View File

@ -1,14 +0,0 @@
package mongo
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestWithTimeout(t *testing.T) {
o := defaultOptions()
WithTimeout(time.Second)(o)
assert.Equal(t, time.Second, o.timeout)
}

View File

@ -1,29 +0,0 @@
package mongo
import (
"time"
"github.com/zeromicro/go-zero/core/syncx"
)
var slowThreshold = syncx.ForAtomicDuration(defaultSlowThreshold)
type (
options struct {
timeout time.Duration
}
// Option defines the method to customize a mongo model.
Option func(opts *options)
)
// SetSlowThreshold sets the slow threshold.
func SetSlowThreshold(threshold time.Duration) {
slowThreshold.Set(threshold)
}
func defaultOptions() *options {
return &options{
timeout: defaultTimeout,
}
}

View File

@ -1,14 +0,0 @@
package mongo
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSetSlowThreshold(t *testing.T) {
assert.Equal(t, defaultSlowThreshold, slowThreshold.Load())
SetSlowThreshold(time.Second)
assert.Equal(t, time.Second, slowThreshold.Load())
}

View File

@ -1,100 +0,0 @@
package mongo
import (
"time"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/breaker"
)
type (
// Pipe interface represents a mongo pipe.
Pipe interface {
All(result interface{}) error
AllowDiskUse() Pipe
Batch(n int) Pipe
Collation(collation *mgo.Collation) Pipe
Explain(result interface{}) error
Iter() Iter
One(result interface{}) error
SetMaxTime(d time.Duration) Pipe
}
promisedPipe struct {
*mgo.Pipe
promise keepablePromise
}
rejectedPipe struct{}
)
func (p promisedPipe) All(result interface{}) error {
return p.promise.keep(p.Pipe.All(result))
}
func (p promisedPipe) AllowDiskUse() Pipe {
p.Pipe.AllowDiskUse()
return p
}
func (p promisedPipe) Batch(n int) Pipe {
p.Pipe.Batch(n)
return p
}
func (p promisedPipe) Collation(collation *mgo.Collation) Pipe {
p.Pipe.Collation(collation)
return p
}
func (p promisedPipe) Explain(result interface{}) error {
return p.promise.keep(p.Pipe.Explain(result))
}
func (p promisedPipe) Iter() Iter {
return promisedIter{
Iter: p.Pipe.Iter(),
promise: p.promise,
}
}
func (p promisedPipe) One(result interface{}) error {
return p.promise.keep(p.Pipe.One(result))
}
func (p promisedPipe) SetMaxTime(d time.Duration) Pipe {
p.Pipe.SetMaxTime(d)
return p
}
func (p rejectedPipe) All(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (p rejectedPipe) AllowDiskUse() Pipe {
return p
}
func (p rejectedPipe) Batch(n int) Pipe {
return p
}
func (p rejectedPipe) Collation(collation *mgo.Collation) Pipe {
return p
}
func (p rejectedPipe) Explain(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (p rejectedPipe) Iter() Iter {
return rejectedIter{}
}
func (p rejectedPipe) One(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (p rejectedPipe) SetMaxTime(d time.Duration) Pipe {
return p
}

View File

@ -1,44 +0,0 @@
package mongo
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/breaker"
)
func TestRejectedPipe_All(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedPipe).All(nil))
}
func TestRejectedPipe_AllowDiskUse(t *testing.T) {
var p rejectedPipe
assert.Equal(t, p, p.AllowDiskUse())
}
func TestRejectedPipe_Batch(t *testing.T) {
var p rejectedPipe
assert.Equal(t, p, p.Batch(1))
}
func TestRejectedPipe_Collation(t *testing.T) {
var p rejectedPipe
assert.Equal(t, p, p.Collation(nil))
}
func TestRejectedPipe_Explain(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedPipe).Explain(nil))
}
func TestRejectedPipe_Iter(t *testing.T) {
assert.EqualValues(t, rejectedIter{}, new(rejectedPipe).Iter())
}
func TestRejectedPipe_One(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedPipe).One(nil))
}
func TestRejectedPipe_SetMaxTime(t *testing.T) {
var p rejectedPipe
assert.Equal(t, p, p.SetMaxTime(0))
}

View File

@ -1,285 +0,0 @@
package mongo
import (
"time"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/breaker"
)
type (
// Query interface represents a mongo query.
Query interface {
All(result interface{}) error
Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error)
Batch(n int) Query
Collation(collation *mgo.Collation) Query
Comment(comment string) Query
Count() (int, error)
Distinct(key string, result interface{}) error
Explain(result interface{}) error
For(result interface{}, f func() error) error
Hint(indexKey ...string) Query
Iter() Iter
Limit(n int) Query
LogReplay() Query
MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error)
One(result interface{}) error
Prefetch(p float64) Query
Select(selector interface{}) Query
SetMaxScan(n int) Query
SetMaxTime(d time.Duration) Query
Skip(n int) Query
Snapshot() Query
Sort(fields ...string) Query
Tail(timeout time.Duration) Iter
}
promisedQuery struct {
*mgo.Query
promise keepablePromise
}
rejectedQuery struct{}
)
func (q promisedQuery) All(result interface{}) error {
return q.promise.keep(q.Query.All(result))
}
func (q promisedQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
info, err := q.Query.Apply(change, result)
return info, q.promise.keep(err)
}
func (q promisedQuery) Batch(n int) Query {
return promisedQuery{
Query: q.Query.Batch(n),
promise: q.promise,
}
}
func (q promisedQuery) Collation(collation *mgo.Collation) Query {
return promisedQuery{
Query: q.Query.Collation(collation),
promise: q.promise,
}
}
func (q promisedQuery) Comment(comment string) Query {
return promisedQuery{
Query: q.Query.Comment(comment),
promise: q.promise,
}
}
func (q promisedQuery) Count() (int, error) {
v, err := q.Query.Count()
return v, q.promise.keep(err)
}
func (q promisedQuery) Distinct(key string, result interface{}) error {
return q.promise.keep(q.Query.Distinct(key, result))
}
func (q promisedQuery) Explain(result interface{}) error {
return q.promise.keep(q.Query.Explain(result))
}
func (q promisedQuery) For(result interface{}, f func() error) error {
var ferr error
err := q.Query.For(result, func() error {
ferr = f()
return ferr
})
if ferr == err {
return q.promise.accept(err)
}
return q.promise.keep(err)
}
func (q promisedQuery) Hint(indexKey ...string) Query {
return promisedQuery{
Query: q.Query.Hint(indexKey...),
promise: q.promise,
}
}
func (q promisedQuery) Iter() Iter {
return promisedIter{
Iter: q.Query.Iter(),
promise: q.promise,
}
}
func (q promisedQuery) Limit(n int) Query {
return promisedQuery{
Query: q.Query.Limit(n),
promise: q.promise,
}
}
func (q promisedQuery) LogReplay() Query {
return promisedQuery{
Query: q.Query.LogReplay(),
promise: q.promise,
}
}
func (q promisedQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
info, err := q.Query.MapReduce(job, result)
return info, q.promise.keep(err)
}
func (q promisedQuery) One(result interface{}) error {
return q.promise.keep(q.Query.One(result))
}
func (q promisedQuery) Prefetch(p float64) Query {
return promisedQuery{
Query: q.Query.Prefetch(p),
promise: q.promise,
}
}
func (q promisedQuery) Select(selector interface{}) Query {
return promisedQuery{
Query: q.Query.Select(selector),
promise: q.promise,
}
}
func (q promisedQuery) SetMaxScan(n int) Query {
return promisedQuery{
Query: q.Query.SetMaxScan(n),
promise: q.promise,
}
}
func (q promisedQuery) SetMaxTime(d time.Duration) Query {
return promisedQuery{
Query: q.Query.SetMaxTime(d),
promise: q.promise,
}
}
func (q promisedQuery) Skip(n int) Query {
return promisedQuery{
Query: q.Query.Skip(n),
promise: q.promise,
}
}
func (q promisedQuery) Snapshot() Query {
return promisedQuery{
Query: q.Query.Snapshot(),
promise: q.promise,
}
}
func (q promisedQuery) Sort(fields ...string) Query {
return promisedQuery{
Query: q.Query.Sort(fields...),
promise: q.promise,
}
}
func (q promisedQuery) Tail(timeout time.Duration) Iter {
return promisedIter{
Iter: q.Query.Tail(timeout),
promise: q.promise,
}
}
func (q rejectedQuery) All(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (q rejectedQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
return nil, breaker.ErrServiceUnavailable
}
func (q rejectedQuery) Batch(n int) Query {
return q
}
func (q rejectedQuery) Collation(collation *mgo.Collation) Query {
return q
}
func (q rejectedQuery) Comment(comment string) Query {
return q
}
func (q rejectedQuery) Count() (int, error) {
return 0, breaker.ErrServiceUnavailable
}
func (q rejectedQuery) Distinct(key string, result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (q rejectedQuery) Explain(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (q rejectedQuery) For(result interface{}, f func() error) error {
return breaker.ErrServiceUnavailable
}
func (q rejectedQuery) Hint(indexKey ...string) Query {
return q
}
func (q rejectedQuery) Iter() Iter {
return rejectedIter{}
}
func (q rejectedQuery) Limit(n int) Query {
return q
}
func (q rejectedQuery) LogReplay() Query {
return q
}
func (q rejectedQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
return nil, breaker.ErrServiceUnavailable
}
func (q rejectedQuery) One(result interface{}) error {
return breaker.ErrServiceUnavailable
}
func (q rejectedQuery) Prefetch(p float64) Query {
return q
}
func (q rejectedQuery) Select(selector interface{}) Query {
return q
}
func (q rejectedQuery) SetMaxScan(n int) Query {
return q
}
func (q rejectedQuery) SetMaxTime(d time.Duration) Query {
return q
}
func (q rejectedQuery) Skip(n int) Query {
return q
}
func (q rejectedQuery) Snapshot() Query {
return q
}
func (q rejectedQuery) Sort(fields ...string) Query {
return q
}
func (q rejectedQuery) Tail(timeout time.Duration) Iter {
return rejectedIter{}
}

View File

@ -1,120 +0,0 @@
package mongo
import (
"testing"
"github.com/globalsign/mgo"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/breaker"
)
func Test_rejectedQuery_All(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedQuery).All(nil))
}
func Test_rejectedQuery_Apply(t *testing.T) {
info, err := new(rejectedQuery).Apply(mgo.Change{}, nil)
assert.Equal(t, breaker.ErrServiceUnavailable, err)
assert.Nil(t, info)
}
func Test_rejectedQuery_Batch(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Batch(1))
}
func Test_rejectedQuery_Collation(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Collation(nil))
}
func Test_rejectedQuery_Comment(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Comment(""))
}
func Test_rejectedQuery_Count(t *testing.T) {
n, err := new(rejectedQuery).Count()
assert.Equal(t, breaker.ErrServiceUnavailable, err)
assert.Equal(t, 0, n)
}
func Test_rejectedQuery_Distinct(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedQuery).Distinct("", nil))
}
func Test_rejectedQuery_Explain(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedQuery).Explain(nil))
}
func Test_rejectedQuery_For(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedQuery).For(nil, nil))
}
func Test_rejectedQuery_Hint(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Hint())
}
func Test_rejectedQuery_Iter(t *testing.T) {
assert.EqualValues(t, rejectedIter{}, new(rejectedQuery).Iter())
}
func Test_rejectedQuery_Limit(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Limit(1))
}
func Test_rejectedQuery_LogReplay(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.LogReplay())
}
func Test_rejectedQuery_MapReduce(t *testing.T) {
info, err := new(rejectedQuery).MapReduce(nil, nil)
assert.Equal(t, breaker.ErrServiceUnavailable, err)
assert.Nil(t, info)
}
func Test_rejectedQuery_One(t *testing.T) {
assert.Equal(t, breaker.ErrServiceUnavailable, new(rejectedQuery).One(nil))
}
func Test_rejectedQuery_Prefetch(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Prefetch(1))
}
func Test_rejectedQuery_Select(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Select(nil))
}
func Test_rejectedQuery_SetMaxScan(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.SetMaxScan(0))
}
func Test_rejectedQuery_SetMaxTime(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.SetMaxTime(0))
}
func Test_rejectedQuery_Skip(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Skip(0))
}
func Test_rejectedQuery_Snapshot(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Snapshot())
}
func Test_rejectedQuery_Sort(t *testing.T) {
var q rejectedQuery
assert.Equal(t, q, q.Sort())
}
func Test_rejectedQuery_Tail(t *testing.T) {
assert.EqualValues(t, rejectedIter{}, new(rejectedQuery).Tail(0))
}

View File

@ -1,70 +0,0 @@
package mongo
import (
"io"
"time"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
)
const (
defaultConcurrency = 50
defaultTimeout = time.Second
)
var sessionManager = syncx.NewResourceManager()
type concurrentSession struct {
*mgo.Session
limit syncx.TimeoutLimit
}
func (cs *concurrentSession) Close() error {
cs.Session.Close()
return nil
}
func getConcurrentSession(url string) (*concurrentSession, error) {
val, err := sessionManager.GetResource(url, func() (io.Closer, error) {
mgoSession, err := mgo.Dial(url)
if err != nil {
return nil, err
}
concurrentSess := &concurrentSession{
Session: mgoSession,
limit: syncx.NewTimeoutLimit(defaultConcurrency),
}
return concurrentSess, nil
})
if err != nil {
return nil, err
}
return val.(*concurrentSession), nil
}
func (cs *concurrentSession) putSession(session *mgo.Session) {
if err := cs.limit.Return(); err != nil {
logx.Error(err)
}
// anyway, we need to close the session
session.Close()
}
func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
o := defaultOptions()
for _, opt := range opts {
opt(o)
}
if err := cs.limit.Borrow(o.timeout); err != nil {
return nil, err
}
return cs.Copy(), nil
}

View File

@ -1,10 +0,0 @@
package mongo
import "strings"
const mongoAddrSep = ","
// FormatAddr formats mongo hosts to a string.
func FormatAddr(hosts []string) string {
return strings.Join(hosts, mongoAddrSep)
}

View File

@ -1,35 +0,0 @@
package mongo
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestFormatAddrs(t *testing.T) {
tests := []struct {
addrs []string
expect string
}{
{
addrs: []string{"a", "b"},
expect: "a,b",
},
{
addrs: []string{"a", "b", "c"},
expect: "a,b,c",
},
{
addrs: []string{},
expect: "",
},
{
addrs: nil,
expect: "",
},
}
for _, test := range tests {
assert.Equal(t, test.expect, FormatAddr(test.addrs))
}
}

View File

@ -1,199 +0,0 @@
package mongoc
import (
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/mongo"
"github.com/zeromicro/go-zero/core/syncx"
)
var (
// ErrNotFound is an alias of mgo.ErrNotFound.
ErrNotFound = mgo.ErrNotFound
// can't use one SingleFlight per conn, because multiple conns may share the same cache key.
singleFlight = syncx.NewSingleFlight()
stats = cache.NewStat("mongoc")
)
type (
// QueryOption defines the method to customize a mongo query.
QueryOption func(query mongo.Query) mongo.Query
// CachedCollection interface represents a mongo collection with cache.
CachedCollection interface {
Count(query interface{}) (int, error)
DelCache(keys ...string) error
FindAllNoCache(v, query interface{}, opts ...QueryOption) error
FindOne(v interface{}, key string, query interface{}) error
FindOneNoCache(v, query interface{}) error
FindOneId(v interface{}, key string, id interface{}) error
FindOneIdNoCache(v, id interface{}) error
GetCache(key string, v interface{}) error
Insert(docs ...interface{}) error
Pipe(pipeline interface{}) mongo.Pipe
Remove(selector interface{}, keys ...string) error
RemoveNoCache(selector interface{}) error
RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error)
RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error)
RemoveId(id interface{}, keys ...string) error
RemoveIdNoCache(id interface{}) error
SetCache(key string, v interface{}) error
Update(selector, update interface{}, keys ...string) error
UpdateNoCache(selector, update interface{}) error
UpdateId(id, update interface{}, keys ...string) error
UpdateIdNoCache(id, update interface{}) error
Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error)
UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error)
}
cachedCollection struct {
collection mongo.Collection
cache cache.Cache
}
)
func newCollection(collection mongo.Collection, c cache.Cache) CachedCollection {
return &cachedCollection{
collection: collection,
cache: c,
}
}
func (c *cachedCollection) Count(query interface{}) (int, error) {
return c.collection.Find(query).Count()
}
func (c *cachedCollection) DelCache(keys ...string) error {
return c.cache.Del(keys...)
}
func (c *cachedCollection) FindAllNoCache(v, query interface{}, opts ...QueryOption) error {
q := c.collection.Find(query)
for _, opt := range opts {
q = opt(q)
}
return q.All(v)
}
func (c *cachedCollection) FindOne(v interface{}, key string, query interface{}) error {
return c.cache.Take(v, key, func(v interface{}) error {
q := c.collection.Find(query)
return q.One(v)
})
}
func (c *cachedCollection) FindOneNoCache(v, query interface{}) error {
q := c.collection.Find(query)
return q.One(v)
}
func (c *cachedCollection) FindOneId(v interface{}, key string, id interface{}) error {
return c.cache.Take(v, key, func(v interface{}) error {
q := c.collection.FindId(id)
return q.One(v)
})
}
func (c *cachedCollection) FindOneIdNoCache(v, id interface{}) error {
q := c.collection.FindId(id)
return q.One(v)
}
func (c *cachedCollection) GetCache(key string, v interface{}) error {
return c.cache.Get(key, v)
}
func (c *cachedCollection) Insert(docs ...interface{}) error {
return c.collection.Insert(docs...)
}
func (c *cachedCollection) Pipe(pipeline interface{}) mongo.Pipe {
return c.collection.Pipe(pipeline)
}
func (c *cachedCollection) Remove(selector interface{}, keys ...string) error {
if err := c.RemoveNoCache(selector); err != nil {
return err
}
return c.DelCache(keys...)
}
func (c *cachedCollection) RemoveNoCache(selector interface{}) error {
return c.collection.Remove(selector)
}
func (c *cachedCollection) RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) {
info, err := c.RemoveAllNoCache(selector)
if err != nil {
return nil, err
}
if err := c.DelCache(keys...); err != nil {
return nil, err
}
return info, nil
}
func (c *cachedCollection) RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) {
return c.collection.RemoveAll(selector)
}
func (c *cachedCollection) RemoveId(id interface{}, keys ...string) error {
if err := c.RemoveIdNoCache(id); err != nil {
return err
}
return c.DelCache(keys...)
}
func (c *cachedCollection) RemoveIdNoCache(id interface{}) error {
return c.collection.RemoveId(id)
}
func (c *cachedCollection) SetCache(key string, v interface{}) error {
return c.cache.Set(key, v)
}
func (c *cachedCollection) Update(selector, update interface{}, keys ...string) error {
if err := c.UpdateNoCache(selector, update); err != nil {
return err
}
return c.DelCache(keys...)
}
func (c *cachedCollection) UpdateNoCache(selector, update interface{}) error {
return c.collection.Update(selector, update)
}
func (c *cachedCollection) UpdateId(id, update interface{}, keys ...string) error {
if err := c.UpdateIdNoCache(id, update); err != nil {
return err
}
return c.DelCache(keys...)
}
func (c *cachedCollection) UpdateIdNoCache(id, update interface{}) error {
return c.collection.UpdateId(id, update)
}
func (c *cachedCollection) Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) {
info, err := c.UpsertNoCache(selector, update)
if err != nil {
return nil, err
}
if err := c.DelCache(keys...); err != nil {
return nil, err
}
return info, nil
}
func (c *cachedCollection) UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) {
return c.collection.Upsert(selector, update)
}

View File

@ -1,365 +0,0 @@
package mongoc
import (
"encoding/json"
"errors"
"io"
"log"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/mongo"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/redis/redistest"
)
const dummyCount = 10
func init() {
stat.SetReporter(nil)
}
func TestCollection_Count(t *testing.T) {
resetStats()
r, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
defer clean()
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
c := newCollection(dummyConn{}, cach)
val, err := c.Count("any")
assert.Nil(t, err)
assert.Equal(t, dummyCount, val)
var value string
assert.Nil(t, r.Set("any", `"foo"`))
assert.Nil(t, c.GetCache("any", &value))
assert.Equal(t, "foo", value)
assert.Nil(t, c.DelCache("any"))
assert.Nil(t, c.SetCache("any", "bar"))
assert.Nil(t, c.FindAllNoCache(&value, "any", func(query mongo.Query) mongo.Query {
return query
}))
assert.Nil(t, c.FindOne(&value, "any", "foo"))
assert.Equal(t, "bar", value)
assert.Nil(t, c.DelCache("any"))
c = newCollection(dummyConn{val: `"bar"`}, cach)
assert.Nil(t, c.FindOne(&value, "any", "foo"))
assert.Equal(t, "bar", value)
assert.Nil(t, c.FindOneNoCache(&value, "foo"))
assert.Equal(t, "bar", value)
assert.Nil(t, c.FindOneId(&value, "anyone", "foo"))
assert.Equal(t, "bar", value)
assert.Nil(t, c.FindOneIdNoCache(&value, "foo"))
assert.Equal(t, "bar", value)
assert.Nil(t, c.Insert("foo"))
assert.Nil(t, c.Pipe("foo"))
assert.Nil(t, c.Remove("any"))
assert.Nil(t, c.RemoveId("any"))
_, err = c.RemoveAll("any")
assert.Nil(t, err)
assert.Nil(t, c.Update("foo", "bar"))
assert.Nil(t, c.UpdateId("foo", "bar"))
_, err = c.Upsert("foo", "bar")
assert.Nil(t, err)
c = newCollection(dummyConn{
val: `"bar"`,
removeErr: errors.New("any"),
}, cach)
assert.NotNil(t, c.Remove("any"))
_, err = c.RemoveAll("any", "bar")
assert.NotNil(t, err)
assert.NotNil(t, c.RemoveId("any"))
c = newCollection(dummyConn{
val: `"bar"`,
updateErr: errors.New("any"),
}, cach)
assert.NotNil(t, c.Update("foo", "bar"))
assert.NotNil(t, c.UpdateId("foo", "bar"))
_, err = c.Upsert("foo", "bar")
assert.NotNil(t, err)
}
func TestStat(t *testing.T) {
resetStats()
r, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
defer clean()
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
c := newCollection(dummyConn{}, cach).(*cachedCollection)
for i := 0; i < 10; i++ {
var str string
if err = c.cache.Take(&str, "name", func(v interface{}) error {
*v.(*string) = "zero"
return nil
}); err != nil {
t.Error(err)
}
}
assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
}
func TestStatCacheFails(t *testing.T) {
resetStats()
log.SetOutput(io.Discard)
defer log.SetOutput(os.Stdout)
r := redis.New("localhost:59999")
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
c := newCollection(dummyConn{}, cach)
for i := 0; i < 20; i++ {
var str string
err := c.FindOne(&str, "name", bson.M{})
assert.NotNil(t, err)
}
assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
}
func TestStatDbFails(t *testing.T) {
resetStats()
r, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
defer clean()
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
c := newCollection(dummyConn{}, cach).(*cachedCollection)
for i := 0; i < 20; i++ {
var str string
err := c.cache.Take(&str, "name", func(v interface{}) error {
return errors.New("db failed")
})
assert.NotNil(t, err)
}
assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
}
func TestStatFromMemory(t *testing.T) {
resetStats()
r, clean, err := redistest.CreateRedis()
assert.Nil(t, err)
defer clean()
cach := cache.NewNode(r, singleFlight, stats, mgo.ErrNotFound)
c := newCollection(dummyConn{}, cach).(*cachedCollection)
var all sync.WaitGroup
var wait sync.WaitGroup
all.Add(10)
wait.Add(4)
go func() {
var str string
if err := c.cache.Take(&str, "name", func(v interface{}) error {
*v.(*string) = "zero"
return nil
}); err != nil {
t.Error(err)
}
wait.Wait()
runtime.Gosched()
all.Done()
}()
for i := 0; i < 4; i++ {
go func() {
var str string
wait.Done()
if err := c.cache.Take(&str, "name", func(v interface{}) error {
*v.(*string) = "zero"
return nil
}); err != nil {
t.Error(err)
}
all.Done()
}()
}
for i := 0; i < 5; i++ {
go func() {
var str string
if err := c.cache.Take(&str, "name", func(v interface{}) error {
*v.(*string) = "zero"
return nil
}); err != nil {
t.Error(err)
}
all.Done()
}()
}
all.Wait()
assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
}
func resetStats() {
atomic.StoreUint64(&stats.Total, 0)
atomic.StoreUint64(&stats.Hit, 0)
atomic.StoreUint64(&stats.Miss, 0)
atomic.StoreUint64(&stats.DbFails, 0)
}
type dummyConn struct {
val string
removeErr error
updateErr error
}
func (c dummyConn) Find(query interface{}) mongo.Query {
return dummyQuery{val: c.val}
}
func (c dummyConn) FindId(id interface{}) mongo.Query {
return dummyQuery{val: c.val}
}
func (c dummyConn) Insert(docs ...interface{}) error {
return nil
}
func (c dummyConn) Remove(selector interface{}) error {
return c.removeErr
}
func (dummyConn) Pipe(pipeline interface{}) mongo.Pipe {
return nil
}
func (c dummyConn) RemoveAll(selector interface{}) (*mgo.ChangeInfo, error) {
return nil, c.removeErr
}
func (c dummyConn) RemoveId(id interface{}) error {
return c.removeErr
}
func (c dummyConn) Update(selector, update interface{}) error {
return c.updateErr
}
func (c dummyConn) UpdateId(id, update interface{}) error {
return c.updateErr
}
func (c dummyConn) Upsert(selector, update interface{}) (*mgo.ChangeInfo, error) {
return nil, c.updateErr
}
type dummyQuery struct {
val string
}
func (d dummyQuery) All(result interface{}) error {
return nil
}
func (d dummyQuery) Apply(change mgo.Change, result interface{}) (*mgo.ChangeInfo, error) {
return nil, nil
}
func (d dummyQuery) Count() (int, error) {
return dummyCount, nil
}
func (d dummyQuery) Distinct(key string, result interface{}) error {
return nil
}
func (d dummyQuery) Explain(result interface{}) error {
return nil
}
func (d dummyQuery) For(result interface{}, f func() error) error {
return nil
}
func (d dummyQuery) MapReduce(job *mgo.MapReduce, result interface{}) (*mgo.MapReduceInfo, error) {
return nil, nil
}
func (d dummyQuery) One(result interface{}) error {
return json.Unmarshal([]byte(d.val), result)
}
func (d dummyQuery) Batch(n int) mongo.Query {
return d
}
func (d dummyQuery) Collation(collation *mgo.Collation) mongo.Query {
return d
}
func (d dummyQuery) Comment(comment string) mongo.Query {
return d
}
func (d dummyQuery) Hint(indexKey ...string) mongo.Query {
return d
}
func (d dummyQuery) Iter() mongo.Iter {
return &mgo.Iter{}
}
func (d dummyQuery) Limit(n int) mongo.Query {
return d
}
func (d dummyQuery) LogReplay() mongo.Query {
return d
}
func (d dummyQuery) Prefetch(p float64) mongo.Query {
return d
}
func (d dummyQuery) Select(selector interface{}) mongo.Query {
return d
}
func (d dummyQuery) SetMaxScan(n int) mongo.Query {
return d
}
func (d dummyQuery) SetMaxTime(duration time.Duration) mongo.Query {
return d
}
func (d dummyQuery) Skip(n int) mongo.Query {
return d
}
func (d dummyQuery) Snapshot() mongo.Query {
return d
}
func (d dummyQuery) Sort(fields ...string) mongo.Query {
return d
}
func (d dummyQuery) Tail(timeout time.Duration) mongo.Iter {
return &mgo.Iter{}
}

View File

@ -1,273 +0,0 @@
package mongoc
import (
"log"
"github.com/globalsign/mgo"
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/mongo"
"github.com/zeromicro/go-zero/core/stores/redis"
)
// A Model is a mongo model that built with cache capability.
type Model struct {
*mongo.Model
cache cache.Cache
generateCollection func(*mgo.Session) CachedCollection
}
// MustNewNodeModel returns a Model with a cache node, exists on errors.
func MustNewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) *Model {
model, err := NewNodeModel(url, collection, rds, opts...)
if err != nil {
log.Fatal(err)
}
return model
}
// MustNewModel returns a Model with a cache cluster, exists on errors.
func MustNewModel(url, collection string, c cache.CacheConf, opts ...cache.Option) *Model {
model, err := NewModel(url, collection, c, opts...)
if err != nil {
log.Fatal(err)
}
return model
}
// NewModel returns a Model with a cache cluster.
func NewModel(url, collection string, conf cache.CacheConf, opts ...cache.Option) (*Model, error) {
c := cache.New(conf, singleFlight, stats, mgo.ErrNotFound, opts...)
return NewModelWithCache(url, collection, c)
}
// NewModelWithCache returns a Model with a custom cache.
func NewModelWithCache(url, collection string, c cache.Cache) (*Model, error) {
return createModel(url, collection, c, func(collection mongo.Collection) CachedCollection {
return newCollection(collection, c)
})
}
// NewNodeModel returns a Model with a cache node.
func NewNodeModel(url, collection string, rds *redis.Redis, opts ...cache.Option) (*Model, error) {
c := cache.NewNode(rds, singleFlight, stats, mgo.ErrNotFound, opts...)
return NewModelWithCache(url, collection, c)
}
// Count returns the count of given query.
func (mm *Model) Count(query interface{}) (int, error) {
return mm.executeInt(func(c CachedCollection) (int, error) {
return c.Count(query)
})
}
// DelCache deletes the cache with given keys.
func (mm *Model) DelCache(keys ...string) error {
return mm.cache.Del(keys...)
}
// GetCache unmarshal the cache into v with given key.
func (mm *Model) GetCache(key string, v interface{}) error {
return mm.cache.Get(key, v)
}
// GetCollection returns a cache collection.
func (mm *Model) GetCollection(session *mgo.Session) CachedCollection {
return mm.generateCollection(session)
}
// FindAllNoCache finds all records without cache.
func (mm *Model) FindAllNoCache(v, query interface{}, opts ...QueryOption) error {
return mm.execute(func(c CachedCollection) error {
return c.FindAllNoCache(v, query, opts...)
})
}
// FindOne unmarshals a record into v with given key and query.
func (mm *Model) FindOne(v interface{}, key string, query interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOne(v, key, query)
})
}
// FindOneNoCache unmarshals a record into v with query, without cache.
func (mm *Model) FindOneNoCache(v, query interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOneNoCache(v, query)
})
}
// FindOneId unmarshals a record into v with query.
func (mm *Model) FindOneId(v interface{}, key string, id interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOneId(v, key, id)
})
}
// FindOneIdNoCache unmarshals a record into v with query, without cache.
func (mm *Model) FindOneIdNoCache(v, id interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.FindOneIdNoCache(v, id)
})
}
// Insert inserts docs.
func (mm *Model) Insert(docs ...interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.Insert(docs...)
})
}
// Pipe returns a mongo pipe with given pipeline.
func (mm *Model) Pipe(pipeline interface{}) (mongo.Pipe, error) {
return mm.pipe(func(c CachedCollection) mongo.Pipe {
return c.Pipe(pipeline)
})
}
// Remove removes a record with given selector, and remove it from cache with given keys.
func (mm *Model) Remove(selector interface{}, keys ...string) error {
return mm.execute(func(c CachedCollection) error {
return c.Remove(selector, keys...)
})
}
// RemoveNoCache removes a record with given selector.
func (mm *Model) RemoveNoCache(selector interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.RemoveNoCache(selector)
})
}
// RemoveAll removes all records with given selector, and removes cache with given keys.
func (mm *Model) RemoveAll(selector interface{}, keys ...string) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.RemoveAll(selector, keys...)
})
}
// RemoveAllNoCache removes all records with given selector, and returns a mgo.ChangeInfo.
func (mm *Model) RemoveAllNoCache(selector interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.RemoveAllNoCache(selector)
})
}
// RemoveId removes a record with given id, and removes cache with given keys.
func (mm *Model) RemoveId(id interface{}, keys ...string) error {
return mm.execute(func(c CachedCollection) error {
return c.RemoveId(id, keys...)
})
}
// RemoveIdNoCache removes a record with given id.
func (mm *Model) RemoveIdNoCache(id interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.RemoveIdNoCache(id)
})
}
// SetCache sets the cache with given key and value.
func (mm *Model) SetCache(key string, v interface{}) error {
return mm.cache.Set(key, v)
}
// Update updates the record with given selector, and delete cache with given keys.
func (mm *Model) Update(selector, update interface{}, keys ...string) error {
return mm.execute(func(c CachedCollection) error {
return c.Update(selector, update, keys...)
})
}
// UpdateNoCache updates the record with given selector.
func (mm *Model) UpdateNoCache(selector, update interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.UpdateNoCache(selector, update)
})
}
// UpdateId updates the record with given id, and delete cache with given keys.
func (mm *Model) UpdateId(id, update interface{}, keys ...string) error {
return mm.execute(func(c CachedCollection) error {
return c.UpdateId(id, update, keys...)
})
}
// UpdateIdNoCache updates the record with given id.
func (mm *Model) UpdateIdNoCache(id, update interface{}) error {
return mm.execute(func(c CachedCollection) error {
return c.UpdateIdNoCache(id, update)
})
}
// Upsert upserts a record with given selector, and delete cache with given keys.
func (mm *Model) Upsert(selector, update interface{}, keys ...string) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.Upsert(selector, update, keys...)
})
}
// UpsertNoCache upserts a record with given selector.
func (mm *Model) UpsertNoCache(selector, update interface{}) (*mgo.ChangeInfo, error) {
return mm.change(func(c CachedCollection) (*mgo.ChangeInfo, error) {
return c.UpsertNoCache(selector, update)
})
}
func (mm *Model) change(fn func(c CachedCollection) (*mgo.ChangeInfo, error)) (*mgo.ChangeInfo, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session))
}
func (mm *Model) execute(fn func(c CachedCollection) error) error {
session, err := mm.TakeSession()
if err != nil {
return err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session))
}
func (mm *Model) executeInt(fn func(c CachedCollection) (int, error)) (int, error) {
session, err := mm.TakeSession()
if err != nil {
return 0, err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session))
}
func (mm *Model) pipe(fn func(c CachedCollection) mongo.Pipe) (mongo.Pipe, error) {
session, err := mm.TakeSession()
if err != nil {
return nil, err
}
defer mm.PutSession(session)
return fn(mm.GetCollection(session)), nil
}
func createModel(url, collection string, c cache.Cache,
create func(mongo.Collection) CachedCollection) (*Model, error) {
model, err := mongo.NewModel(url, collection)
if err != nil {
return nil, err
}
return &Model{
Model: model,
cache: c,
generateCollection: func(session *mgo.Session) CachedCollection {
collection := model.GetCollection(session)
return create(collection)
},
}, nil
}

3
go.mod
View File

@ -7,8 +7,8 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/alicebob/miniredis/v2 v2.30.0
github.com/fatih/color v1.13.0
github.com/felixge/fgprof v0.9.3
github.com/fullstorydev/grpcurl v1.8.7
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.7.0
github.com/golang-jwt/jwt/v4 v4.4.3
@ -49,7 +49,6 @@ require (
)
require (
github.com/felixge/fgprof v0.9.3
github.com/google/gofuzz v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect

2
go.sum
View File

@ -465,8 +465,6 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4
github.com/fullstorydev/grpcurl v1.8.7 h1:xJWosq3BQovQ4QrdPO72OrPiWuGgEsxY8ldYsJbPrqI=
github.com/fullstorydev/grpcurl v1.8.7/go.mod h1:pVtM4qe3CMoLaIzYS8uvTuDj2jVYmXqMUkZeijnXp/E=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 h1:DujepqpGd1hyOd7aW59XpK7Qymp8iy83xq74fLr21is=
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=