mirror of
https://github.com/zeromicro/go-zero.git
synced 2025-01-23 09:00:20 +08:00
Merge branch 'master' into master
This commit is contained in:
commit
e3032508f2
4
.github/dependabot.yml
vendored
4
.github/dependabot.yml
vendored
@ -9,3 +9,7 @@ updates:
|
||||
directory: "/" # Location of package manifests
|
||||
schedule:
|
||||
interval: "daily"
|
||||
- package-ecosystem: "gomod" # See documentation for possible values
|
||||
directory: "/tools/goctl" # Location of package manifests
|
||||
schedule:
|
||||
interval: "daily"
|
||||
|
2
.gitignore
vendored
2
.gitignore
vendored
@ -11,7 +11,7 @@
|
||||
!api
|
||||
|
||||
# ignore
|
||||
.idea
|
||||
**/.idea
|
||||
**/.DS_Store
|
||||
**/logs
|
||||
|
||||
|
@ -107,6 +107,20 @@ func MustLoad(path string, v any, opts ...Option) {
|
||||
}
|
||||
}
|
||||
|
||||
func addOrMergeFields(info map[string]fieldInfo, key, name string, fields map[string]fieldInfo) {
|
||||
if prev, ok := info[key]; ok {
|
||||
// merge fields
|
||||
for k, v := range fields {
|
||||
prev.children[k] = v
|
||||
}
|
||||
} else {
|
||||
info[key] = fieldInfo{
|
||||
name: name,
|
||||
children: fields,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildFieldsInfo(tp reflect.Type) map[string]fieldInfo {
|
||||
tp = mapping.Deref(tp)
|
||||
|
||||
@ -134,11 +148,12 @@ func buildStructFieldsInfo(tp reflect.Type) map[string]fieldInfo {
|
||||
if ft.Kind() == reflect.Struct {
|
||||
fields := buildFieldsInfo(ft)
|
||||
for k, v := range fields {
|
||||
info[k] = v
|
||||
addOrMergeFields(info, k, v.name, v.children)
|
||||
}
|
||||
} else {
|
||||
info[lowerCaseName] = fieldInfo{
|
||||
name: name,
|
||||
name: name,
|
||||
children: make(map[string]fieldInfo),
|
||||
}
|
||||
}
|
||||
continue
|
||||
@ -154,17 +169,7 @@ func buildStructFieldsInfo(tp reflect.Type) map[string]fieldInfo {
|
||||
fields = buildFieldsInfo(ft.Elem())
|
||||
}
|
||||
|
||||
if prev, ok := info[lowerCaseName]; ok {
|
||||
// merge fields
|
||||
for k, v := range fields {
|
||||
prev.children[k] = v
|
||||
}
|
||||
} else {
|
||||
info[lowerCaseName] = fieldInfo{
|
||||
name: name,
|
||||
children: fields,
|
||||
}
|
||||
}
|
||||
addOrMergeFields(info, lowerCaseName, name, fields)
|
||||
}
|
||||
|
||||
return info
|
||||
|
@ -420,6 +420,42 @@ func TestLoadFromYamlItemOverlay(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadFromYamlItemOverlayReverse(t *testing.T) {
|
||||
type (
|
||||
Redis struct {
|
||||
Host string
|
||||
Port int
|
||||
}
|
||||
|
||||
RedisKey struct {
|
||||
Redis
|
||||
Key string
|
||||
}
|
||||
|
||||
Server struct {
|
||||
Redis Redis
|
||||
}
|
||||
|
||||
TestConfig struct {
|
||||
Redis RedisKey
|
||||
Server
|
||||
}
|
||||
)
|
||||
|
||||
input := []byte(`Redis:
|
||||
Host: localhost
|
||||
Port: 6379
|
||||
Key: test
|
||||
`)
|
||||
|
||||
var c TestConfig
|
||||
if assert.NoError(t, LoadFromYamlBytes(input, &c)) {
|
||||
assert.Equal(t, "localhost", c.Redis.Host)
|
||||
assert.Equal(t, 6379, c.Redis.Port)
|
||||
assert.Equal(t, "test", c.Redis.Key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadFromYamlItemOverlayWithMap(t *testing.T) {
|
||||
type (
|
||||
Redis struct {
|
||||
|
@ -12,7 +12,6 @@ import (
|
||||
|
||||
// PropertyError represents a configuration error message.
|
||||
type PropertyError struct {
|
||||
error
|
||||
message string
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeromicro/go-zero/core/fs"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestDailyRotateRuleMarkRotated(t *testing.T) {
|
||||
@ -232,6 +233,23 @@ func TestRotateLoggerWithSizeLimitRotateRuleMayCompressFileTrue(t *testing.T) {
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestRotateLoggerWithSizeLimitRotateRuleMayCompressFileFailed(t *testing.T) {
|
||||
old := os.Stdout
|
||||
os.Stdout = os.NewFile(0, os.DevNull)
|
||||
defer func() {
|
||||
os.Stdout = old
|
||||
}()
|
||||
|
||||
filename := stringx.RandId()
|
||||
logger, err := NewLogger(filename, new(SizeLimitRotateRule), true)
|
||||
defer os.Remove(filename)
|
||||
if assert.NoError(t, err) {
|
||||
assert.NotPanics(t, func() {
|
||||
logger.maybeCompressFile(stringx.RandId())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRotateLoggerWithSizeLimitRotateRuleRotate(t *testing.T) {
|
||||
filename, err := fs.TempFilenameWithText("foo")
|
||||
assert.Nil(t, err)
|
||||
|
@ -108,7 +108,7 @@ func TestNopWriter(t *testing.T) {
|
||||
w.Stack("foo")
|
||||
w.Stat("foo")
|
||||
w.Slow("foo")
|
||||
w.Close()
|
||||
_ = w.Close()
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/errorx"
|
||||
"github.com/zeromicro/go-zero/core/lang"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -24,30 +23,30 @@ var (
|
||||
|
||||
type (
|
||||
// ForEachFunc is used to do element processing, but no output.
|
||||
ForEachFunc func(item any)
|
||||
ForEachFunc[T any] func(item T)
|
||||
// GenerateFunc is used to let callers send elements into source.
|
||||
GenerateFunc func(source chan<- any)
|
||||
GenerateFunc[T any] func(source chan<- T)
|
||||
// MapFunc is used to do element processing and write the output to writer.
|
||||
MapFunc func(item any, writer Writer)
|
||||
MapFunc[T, U any] func(item T, writer Writer[U])
|
||||
// MapperFunc is used to do element processing and write the output to writer,
|
||||
// use cancel func to cancel the processing.
|
||||
MapperFunc func(item any, writer Writer, cancel func(error))
|
||||
MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))
|
||||
// ReducerFunc is used to reduce all the mapping output and write to writer,
|
||||
// use cancel func to cancel the processing.
|
||||
ReducerFunc func(pipe <-chan any, writer Writer, cancel func(error))
|
||||
ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))
|
||||
// VoidReducerFunc is used to reduce all the mapping output, but no output.
|
||||
// Use cancel func to cancel the processing.
|
||||
VoidReducerFunc func(pipe <-chan any, cancel func(error))
|
||||
VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))
|
||||
// Option defines the method to customize the mapreduce.
|
||||
Option func(opts *mapReduceOptions)
|
||||
|
||||
mapperContext struct {
|
||||
mapperContext[T, U any] struct {
|
||||
ctx context.Context
|
||||
mapper MapFunc
|
||||
source <-chan any
|
||||
mapper MapFunc[T, U]
|
||||
source <-chan T
|
||||
panicChan *onceChan
|
||||
collector chan<- any
|
||||
doneChan <-chan lang.PlaceholderType
|
||||
collector chan<- U
|
||||
doneChan <-chan struct{}
|
||||
workers int
|
||||
}
|
||||
|
||||
@ -57,8 +56,8 @@ type (
|
||||
}
|
||||
|
||||
// Writer interface wraps Write method.
|
||||
Writer interface {
|
||||
Write(v any)
|
||||
Writer[T any] interface {
|
||||
Write(v T)
|
||||
}
|
||||
)
|
||||
|
||||
@ -68,12 +67,11 @@ func Finish(fns ...func() error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return MapReduceVoid(func(source chan<- any) {
|
||||
return MapReduceVoid(func(source chan<- func() error) {
|
||||
for _, fn := range fns {
|
||||
source <- fn
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
fn := item.(func() error)
|
||||
}, func(fn func() error, writer Writer[any], cancel func(error)) {
|
||||
if err := fn(); err != nil {
|
||||
cancel(err)
|
||||
}
|
||||
@ -87,27 +85,26 @@ func FinishVoid(fns ...func()) {
|
||||
return
|
||||
}
|
||||
|
||||
ForEach(func(source chan<- any) {
|
||||
ForEach(func(source chan<- func()) {
|
||||
for _, fn := range fns {
|
||||
source <- fn
|
||||
}
|
||||
}, func(item any) {
|
||||
fn := item.(func())
|
||||
}, func(fn func()) {
|
||||
fn()
|
||||
}, WithWorkers(len(fns)))
|
||||
}
|
||||
|
||||
// ForEach maps all elements from given generate but no output.
|
||||
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||
func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option) {
|
||||
options := buildOptions(opts...)
|
||||
panicChan := &onceChan{channel: make(chan any)}
|
||||
source := buildSource(generate, panicChan)
|
||||
collector := make(chan any)
|
||||
done := make(chan lang.PlaceholderType)
|
||||
done := make(chan struct{})
|
||||
|
||||
go executeMappers(mapperContext{
|
||||
go executeMappers(mapperContext[T, any]{
|
||||
ctx: options.ctx,
|
||||
mapper: func(item any, _ Writer) {
|
||||
mapper: func(item T, _ Writer[any]) {
|
||||
mapper(item)
|
||||
},
|
||||
source: source,
|
||||
@ -131,26 +128,26 @@ func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
|
||||
|
||||
// MapReduce maps all elements generated from given generate func,
|
||||
// and reduces the output elements with given reducer.
|
||||
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
|
||||
opts ...Option) (any, error) {
|
||||
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
|
||||
opts ...Option) (V, error) {
|
||||
panicChan := &onceChan{channel: make(chan any)}
|
||||
source := buildSource(generate, panicChan)
|
||||
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||
}
|
||||
|
||||
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
|
||||
func MapReduceChan(source <-chan any, mapper MapperFunc, reducer ReducerFunc,
|
||||
opts ...Option) (any, error) {
|
||||
func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
|
||||
opts ...Option) (V, error) {
|
||||
panicChan := &onceChan{channel: make(chan any)}
|
||||
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
|
||||
}
|
||||
|
||||
// mapReduceWithPanicChan maps all elements from source, and reduce the output elements with given reducer.
|
||||
func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper MapperFunc,
|
||||
reducer ReducerFunc, opts ...Option) (any, error) {
|
||||
func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U],
|
||||
reducer ReducerFunc[U, V], opts ...Option) (val V, err error) {
|
||||
options := buildOptions(opts...)
|
||||
// output is used to write the final result
|
||||
output := make(chan any)
|
||||
output := make(chan V)
|
||||
defer func() {
|
||||
// reducer can only write once, if more, panic
|
||||
for range output {
|
||||
@ -159,12 +156,12 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe
|
||||
}()
|
||||
|
||||
// collector is used to collect data from mapper, and consume in reducer
|
||||
collector := make(chan any, options.workers)
|
||||
collector := make(chan U, options.workers)
|
||||
// if done is closed, all mappers and reducer should stop processing
|
||||
done := make(chan lang.PlaceholderType)
|
||||
done := make(chan struct{})
|
||||
writer := newGuardedWriter(options.ctx, output, done)
|
||||
var closeOnce sync.Once
|
||||
// use atomic.Value to avoid data race
|
||||
// use atomic type to avoid data race
|
||||
var retErr errorx.AtomicError
|
||||
finish := func() {
|
||||
closeOnce.Do(func() {
|
||||
@ -195,9 +192,9 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe
|
||||
reducer(collector, writer, cancel)
|
||||
}()
|
||||
|
||||
go executeMappers(mapperContext{
|
||||
go executeMappers(mapperContext[T, U]{
|
||||
ctx: options.ctx,
|
||||
mapper: func(item any, w Writer) {
|
||||
mapper: func(item T, w Writer[U]) {
|
||||
mapper(item, w, cancel)
|
||||
},
|
||||
source: source,
|
||||
@ -210,26 +207,29 @@ func mapReduceWithPanicChan(source <-chan any, panicChan *onceChan, mapper Mappe
|
||||
select {
|
||||
case <-options.ctx.Done():
|
||||
cancel(context.DeadlineExceeded)
|
||||
return nil, context.DeadlineExceeded
|
||||
err = context.DeadlineExceeded
|
||||
case v := <-panicChan.channel:
|
||||
// drain output here, otherwise for loop panic in defer
|
||||
drain(output)
|
||||
panic(v)
|
||||
case v, ok := <-output:
|
||||
if err := retErr.Load(); err != nil {
|
||||
return nil, err
|
||||
if e := retErr.Load(); e != nil {
|
||||
err = e
|
||||
} else if ok {
|
||||
return v, nil
|
||||
val = v
|
||||
} else {
|
||||
return nil, ErrReduceNoOutput
|
||||
err = ErrReduceNoOutput
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// MapReduceVoid maps all elements generated from given generate,
|
||||
// and reduce the output elements with given reducer.
|
||||
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
|
||||
_, err := MapReduce(generate, mapper, func(input <-chan any, writer Writer, cancel func(error)) {
|
||||
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
|
||||
reducer VoidReducerFunc[U], opts ...Option) error {
|
||||
_, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) {
|
||||
reducer(input, cancel)
|
||||
}, opts...)
|
||||
if errors.Is(err, ErrReduceNoOutput) {
|
||||
@ -266,8 +266,8 @@ func buildOptions(opts ...Option) *mapReduceOptions {
|
||||
return options
|
||||
}
|
||||
|
||||
func buildSource(generate GenerateFunc, panicChan *onceChan) chan any {
|
||||
source := make(chan any)
|
||||
func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T {
|
||||
source := make(chan T)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@ -283,13 +283,13 @@ func buildSource(generate GenerateFunc, panicChan *onceChan) chan any {
|
||||
}
|
||||
|
||||
// drain drains the channel.
|
||||
func drain(channel <-chan any) {
|
||||
func drain[T any](channel <-chan T) {
|
||||
// drain the channel
|
||||
for range channel {
|
||||
}
|
||||
}
|
||||
|
||||
func executeMappers(mCtx mapperContext) {
|
||||
func executeMappers[T, U any](mCtx mapperContext[T, U]) {
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
@ -298,7 +298,7 @@ func executeMappers(mCtx mapperContext) {
|
||||
}()
|
||||
|
||||
var failed int32
|
||||
pool := make(chan lang.PlaceholderType, mCtx.workers)
|
||||
pool := make(chan struct{}, mCtx.workers)
|
||||
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
|
||||
for atomic.LoadInt32(&failed) == 0 {
|
||||
select {
|
||||
@ -306,7 +306,7 @@ func executeMappers(mCtx mapperContext) {
|
||||
return
|
||||
case <-mCtx.doneChan:
|
||||
return
|
||||
case pool <- lang.Placeholder:
|
||||
case pool <- struct{}{}:
|
||||
item, ok := <-mCtx.source
|
||||
if !ok {
|
||||
<-pool
|
||||
@ -346,22 +346,21 @@ func once(fn func(error)) func(error) {
|
||||
}
|
||||
}
|
||||
|
||||
type guardedWriter struct {
|
||||
type guardedWriter[T any] struct {
|
||||
ctx context.Context
|
||||
channel chan<- any
|
||||
done <-chan lang.PlaceholderType
|
||||
channel chan<- T
|
||||
done <-chan struct{}
|
||||
}
|
||||
|
||||
func newGuardedWriter(ctx context.Context, channel chan<- any,
|
||||
done <-chan lang.PlaceholderType) guardedWriter {
|
||||
return guardedWriter{
|
||||
func newGuardedWriter[T any](ctx context.Context, channel chan<- T, done <-chan struct{}) guardedWriter[T] {
|
||||
return guardedWriter[T]{
|
||||
ctx: ctx,
|
||||
channel: channel,
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
func (gw guardedWriter) Write(v any) {
|
||||
func (gw guardedWriter[T]) Write(v T) {
|
||||
select {
|
||||
case <-gw.ctx.Done():
|
||||
return
|
||||
|
@ -1,6 +1,3 @@
|
||||
//go:build go1.18
|
||||
// +build go1.18
|
||||
|
||||
package mr
|
||||
|
||||
import (
|
||||
@ -18,9 +15,9 @@ import (
|
||||
func FuzzMapReduce(f *testing.F) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
f.Add(uint(10), uint(runtime.NumCPU()))
|
||||
f.Fuzz(func(t *testing.T, num, workers uint) {
|
||||
n := int64(num)%5000 + 5000
|
||||
f.Add(int64(10), runtime.NumCPU())
|
||||
f.Fuzz(func(t *testing.T, n int64, workers int) {
|
||||
n = n%5000 + 5000
|
||||
genPanic := rand.Intn(100) == 0
|
||||
mapperPanic := rand.Intn(100) == 0
|
||||
reducerPanic := rand.Intn(100) == 0
|
||||
@ -29,34 +26,33 @@ func FuzzMapReduce(f *testing.F) {
|
||||
reducerIdx := rand.Int63n(n)
|
||||
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||
|
||||
fn := func() (any, error) {
|
||||
fn := func() (int64, error) {
|
||||
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||
|
||||
return MapReduce(func(source chan<- any) {
|
||||
return MapReduce(func(source chan<- int64) {
|
||||
for i := int64(0); i < n; i++ {
|
||||
source <- i
|
||||
if genPanic && i == genIdx {
|
||||
panic("foo")
|
||||
}
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int64)
|
||||
}, func(v int64, writer Writer[int64], cancel func(error)) {
|
||||
if mapperPanic && v == mapperIdx {
|
||||
panic("bar")
|
||||
}
|
||||
writer.Write(v * v)
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) {
|
||||
var idx int64
|
||||
var total int64
|
||||
for v := range pipe {
|
||||
if reducerPanic && idx == reducerIdx {
|
||||
panic("baz")
|
||||
}
|
||||
total += v.(int64)
|
||||
total += v
|
||||
idx++
|
||||
}
|
||||
writer.Write(total)
|
||||
}, WithWorkers(int(workers)%50+runtime.NumCPU()/2))
|
||||
}, WithWorkers(workers%50+runtime.NumCPU()))
|
||||
}
|
||||
|
||||
if genPanic || mapperPanic || reducerPanic {
|
||||
@ -72,7 +68,7 @@ func FuzzMapReduce(f *testing.F) {
|
||||
} else {
|
||||
val, err := fn()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, squareSum, val.(int64))
|
||||
assert.Equal(t, squareSum, val)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -54,28 +54,27 @@ func TestMapReduceRandom(t *testing.T) {
|
||||
reducerIdx := rand.Int63n(n)
|
||||
squareSum := (n - 1) * n * (2*n - 1) / 6
|
||||
|
||||
fn := func() (any, error) {
|
||||
return MapReduce(func(source chan<- any) {
|
||||
fn := func() (int64, error) {
|
||||
return MapReduce(func(source chan<- int64) {
|
||||
for i := int64(0); i < n; i++ {
|
||||
source <- i
|
||||
if genPanic && i == genIdx {
|
||||
panic("foo")
|
||||
}
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int64)
|
||||
}, func(v int64, writer Writer[int64], cancel func(error)) {
|
||||
if mapperPanic && v == mapperIdx {
|
||||
panic("bar")
|
||||
}
|
||||
writer.Write(v * v)
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) {
|
||||
var idx int64
|
||||
var total int64
|
||||
for v := range pipe {
|
||||
if reducerPanic && idx == reducerIdx {
|
||||
panic("baz")
|
||||
}
|
||||
total += v.(int64)
|
||||
total += v
|
||||
idx++
|
||||
}
|
||||
writer.Write(total)
|
||||
@ -95,7 +94,7 @@ func TestMapReduceRandom(t *testing.T) {
|
||||
} else {
|
||||
val, err := fn()
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, squareSum, val.(int64))
|
||||
assert.Equal(t, squareSum, val)
|
||||
}
|
||||
bar.Increment()
|
||||
})
|
||||
|
@ -3,7 +3,7 @@ package mr
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
@ -17,7 +17,7 @@ import (
|
||||
var errDummy = errors.New("dummy")
|
||||
|
||||
func init() {
|
||||
log.SetOutput(io.Discard)
|
||||
log.SetOutput(ioutil.Discard)
|
||||
}
|
||||
|
||||
func TestFinish(t *testing.T) {
|
||||
@ -91,11 +91,11 @@ func TestForEach(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var count uint32
|
||||
ForEach(func(source chan<- any) {
|
||||
ForEach(func(source chan<- int) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any) {
|
||||
}, func(item int) {
|
||||
atomic.AddUint32(&count, 1)
|
||||
}, WithWorkers(-1))
|
||||
|
||||
@ -106,12 +106,12 @@ func TestForEach(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var count uint32
|
||||
ForEach(func(source chan<- any) {
|
||||
ForEach(func(source chan<- int) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any) {
|
||||
if item.(int)%2 == 0 {
|
||||
}, func(item int) {
|
||||
if item%2 == 0 {
|
||||
atomic.AddUint32(&count, 1)
|
||||
}
|
||||
})
|
||||
@ -123,11 +123,11 @@ func TestForEach(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
ForEach(func(source chan<- any) {
|
||||
ForEach(func(source chan<- int) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any) {
|
||||
}, func(item int) {
|
||||
panic("foo")
|
||||
})
|
||||
})
|
||||
@ -139,9 +139,9 @@ func TestGeneratePanic(t *testing.T) {
|
||||
|
||||
t.Run("all", func(t *testing.T) {
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
ForEach(func(source chan<- any) {
|
||||
ForEach(func(source chan<- int) {
|
||||
panic("foo")
|
||||
}, func(item any) {
|
||||
}, func(item int) {
|
||||
})
|
||||
})
|
||||
})
|
||||
@ -154,14 +154,14 @@ func TestMapperPanic(t *testing.T) {
|
||||
var run int32
|
||||
t.Run("all", func(t *testing.T) {
|
||||
assert.PanicsWithValue(t, "foo", func() {
|
||||
_, _ = MapReduce(func(source chan<- any) {
|
||||
_, _ = MapReduce(func(source chan<- int) {
|
||||
for i := 0; i < tasks; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
}, func(item int, writer Writer[int], cancel func(error)) {
|
||||
atomic.AddInt32(&run, 1)
|
||||
panic("foo")
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
})
|
||||
})
|
||||
assert.True(t, atomic.LoadInt32(&run) < tasks/2)
|
||||
@ -173,10 +173,10 @@ func TestMapReduce(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper MapperFunc
|
||||
reducer ReducerFunc
|
||||
mapper MapperFunc[int, int]
|
||||
reducer ReducerFunc[int, int]
|
||||
expectErr error
|
||||
expectValue any
|
||||
expectValue int
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
@ -185,8 +185,7 @@ func TestMapReduce(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "cancel with error",
|
||||
mapper: func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
mapper: func(v int, writer Writer[int], cancel func(error)) {
|
||||
if v%3 == 0 {
|
||||
cancel(errDummy)
|
||||
}
|
||||
@ -196,22 +195,20 @@ func TestMapReduce(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "cancel with nil",
|
||||
mapper: func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
mapper: func(v int, writer Writer[int], cancel func(error)) {
|
||||
if v%3 == 0 {
|
||||
cancel(nil)
|
||||
}
|
||||
writer.Write(v * v)
|
||||
},
|
||||
expectErr: ErrCancelWithNil,
|
||||
expectValue: nil,
|
||||
expectErr: ErrCancelWithNil,
|
||||
},
|
||||
{
|
||||
name: "cancel with more",
|
||||
reducer: func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
reducer: func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
result += item
|
||||
if result > 10 {
|
||||
cancel(errDummy)
|
||||
}
|
||||
@ -226,21 +223,20 @@ func TestMapReduce(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
test.mapper = func(v int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
result += item
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
value, err := MapReduce(func(source chan<- any) {
|
||||
value, err := MapReduce(func(source chan<- int) {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
}
|
||||
@ -256,22 +252,21 @@ func TestMapReduce(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
test.mapper = func(v int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
test.reducer = func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
var result int
|
||||
for item := range pipe {
|
||||
result += item.(int)
|
||||
result += item
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
}
|
||||
|
||||
source := make(chan any)
|
||||
source := make(chan int)
|
||||
go func() {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
@ -291,13 +286,13 @@ func TestMapReduceWithReduerWriteMoreThanOnce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
MapReduce(func(source chan<- any) {
|
||||
MapReduce(func(source chan<- int) {
|
||||
for i := 0; i < 10; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
}, func(item int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(item)
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer Writer[string], cancel func(error)) {
|
||||
drain(pipe)
|
||||
writer.Write("one")
|
||||
writer.Write("two")
|
||||
@ -311,8 +306,8 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
var value uint32
|
||||
tests := []struct {
|
||||
name string
|
||||
mapper MapperFunc
|
||||
reducer VoidReducerFunc
|
||||
mapper MapperFunc[int, int]
|
||||
reducer VoidReducerFunc[int]
|
||||
expectValue uint32
|
||||
expectErr error
|
||||
}{
|
||||
@ -323,8 +318,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "cancel with error",
|
||||
mapper: func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
mapper: func(v int, writer Writer[int], cancel func(error)) {
|
||||
if v%3 == 0 {
|
||||
cancel(errDummy)
|
||||
}
|
||||
@ -334,8 +328,7 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "cancel with nil",
|
||||
mapper: func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
mapper: func(v int, writer Writer[int], cancel func(error)) {
|
||||
if v%3 == 0 {
|
||||
cancel(nil)
|
||||
}
|
||||
@ -345,9 +338,9 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "cancel with more",
|
||||
reducer: func(pipe <-chan any, cancel func(error)) {
|
||||
reducer: func(pipe <-chan int, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
result := atomic.AddUint32(&value, uint32(item.(int)))
|
||||
result := atomic.AddUint32(&value, uint32(item))
|
||||
if result > 10 {
|
||||
cancel(errDummy)
|
||||
}
|
||||
@ -362,19 +355,18 @@ func TestMapReduceVoid(t *testing.T) {
|
||||
atomic.StoreUint32(&value, 0)
|
||||
|
||||
if test.mapper == nil {
|
||||
test.mapper = func(item any, writer Writer, cancel func(error)) {
|
||||
v := item.(int)
|
||||
test.mapper = func(v int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(v * v)
|
||||
}
|
||||
}
|
||||
if test.reducer == nil {
|
||||
test.reducer = func(pipe <-chan any, cancel func(error)) {
|
||||
test.reducer = func(pipe <-chan int, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
atomic.AddUint32(&value, uint32(item.(int)))
|
||||
atomic.AddUint32(&value, uint32(item))
|
||||
}
|
||||
}
|
||||
}
|
||||
err := MapReduceVoid(func(source chan<- any) {
|
||||
err := MapReduceVoid(func(source chan<- int) {
|
||||
for i := 1; i < 5; i++ {
|
||||
source <- i
|
||||
}
|
||||
@ -392,18 +384,17 @@ func TestMapReduceVoidWithDelay(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- any) {
|
||||
err := MapReduceVoid(func(source chan<- int) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], cancel func(error)) {
|
||||
if i == 0 {
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, cancel func(error)) {
|
||||
}, func(pipe <-chan int, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
i := item.(int)
|
||||
i := item
|
||||
result = append(result, i)
|
||||
}
|
||||
})
|
||||
@ -417,13 +408,12 @@ func TestMapReducePanic(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- any) {
|
||||
_, _ = MapReduce(func(source chan<- int) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("panic")
|
||||
}
|
||||
@ -435,17 +425,16 @@ func TestMapReducePanicOnce(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- any) {
|
||||
_, _ = MapReduce(func(source chan<- int) {
|
||||
for i := 0; i < 100; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], cancel func(error)) {
|
||||
if i == 0 {
|
||||
panic("foo")
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
for range pipe {
|
||||
panic("bar")
|
||||
}
|
||||
@ -457,12 +446,12 @@ func TestMapReducePanicBothMapperAndReducer(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
assert.Panics(t, func() {
|
||||
_, _ = MapReduce(func(source chan<- any) {
|
||||
_, _ = MapReduce(func(source chan<- int) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
}, func(item int, writer Writer[int], cancel func(error)) {
|
||||
panic("foo")
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
panic("bar")
|
||||
})
|
||||
})
|
||||
@ -472,18 +461,17 @@ func TestMapReduceVoidCancel(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- any) {
|
||||
err := MapReduceVoid(func(source chan<- int) {
|
||||
source <- 0
|
||||
source <- 1
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], cancel func(error)) {
|
||||
if i == 1 {
|
||||
cancel(errors.New("anything"))
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, cancel func(error)) {
|
||||
}, func(pipe <-chan int, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
i := item.(int)
|
||||
i := item
|
||||
result = append(result, i)
|
||||
}
|
||||
})
|
||||
@ -496,21 +484,19 @@ func TestMapReduceVoidCancelWithRemains(t *testing.T) {
|
||||
|
||||
var done int32
|
||||
var result []int
|
||||
err := MapReduceVoid(func(source chan<- any) {
|
||||
err := MapReduceVoid(func(source chan<- int) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], cancel func(error)) {
|
||||
if i == defaultWorkers/2 {
|
||||
cancel(errors.New("anything"))
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, cancel func(error)) {
|
||||
}, func(pipe <-chan int, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
i := item.(int)
|
||||
result = append(result, i)
|
||||
result = append(result, item)
|
||||
}
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
@ -522,18 +508,18 @@ func TestMapReduceWithoutReducerWrite(t *testing.T) {
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
uids := []int{1, 2, 3}
|
||||
res, err := MapReduce(func(source chan<- any) {
|
||||
res, err := MapReduce(func(source chan<- int) {
|
||||
for _, uid := range uids {
|
||||
source <- uid
|
||||
}
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
}, func(item int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(item)
|
||||
}, func(pipe <-chan any, writer Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
|
||||
drain(pipe)
|
||||
// not calling writer.Write(...), should not panic
|
||||
})
|
||||
assert.Equal(t, ErrReduceNoOutput, err)
|
||||
assert.Nil(t, res)
|
||||
assert.Equal(t, 0, res)
|
||||
}
|
||||
|
||||
func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
||||
@ -542,15 +528,14 @@ func TestMapReduceVoidPanicInReducer(t *testing.T) {
|
||||
const message = "foo"
|
||||
assert.Panics(t, func() {
|
||||
var done int32
|
||||
_ = MapReduceVoid(func(source chan<- any) {
|
||||
_ = MapReduceVoid(func(source chan<- int) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item any, writer Writer, cancel func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], cancel func(error)) {
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, cancel func(error)) {
|
||||
}, func(pipe <-chan int, cancel func(error)) {
|
||||
panic(message)
|
||||
}, WithWorkers(1))
|
||||
})
|
||||
@ -561,13 +546,12 @@ func TestForEachWithContext(t *testing.T) {
|
||||
|
||||
var done int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ForEach(func(source chan<- any) {
|
||||
ForEach(func(source chan<- int) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item any) {
|
||||
i := item.(int)
|
||||
}, func(i int) {
|
||||
if i == defaultWorkers/2 {
|
||||
cancel()
|
||||
}
|
||||
@ -580,20 +564,19 @@ func TestMapReduceWithContext(t *testing.T) {
|
||||
var done int32
|
||||
var result []int
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
err := MapReduceVoid(func(source chan<- any) {
|
||||
err := MapReduceVoid(func(source chan<- int) {
|
||||
for i := 0; i < defaultWorkers*2; i++ {
|
||||
source <- i
|
||||
}
|
||||
atomic.AddInt32(&done, 1)
|
||||
}, func(item any, writer Writer, c func(error)) {
|
||||
i := item.(int)
|
||||
}, func(i int, writer Writer[int], c func(error)) {
|
||||
if i == defaultWorkers/2 {
|
||||
cancel()
|
||||
}
|
||||
writer.Write(i)
|
||||
}, func(pipe <-chan any, cancel func(error)) {
|
||||
}, func(pipe <-chan int, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
i := item.(int)
|
||||
i := item
|
||||
result = append(result, i)
|
||||
}
|
||||
}, WithContext(ctx))
|
||||
@ -604,19 +587,19 @@ func TestMapReduceWithContext(t *testing.T) {
|
||||
func BenchmarkMapReduce(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
|
||||
mapper := func(v any, writer Writer, cancel func(error)) {
|
||||
writer.Write(v.(int64) * v.(int64))
|
||||
mapper := func(v int64, writer Writer[int64], cancel func(error)) {
|
||||
writer.Write(v * v)
|
||||
}
|
||||
reducer := func(input <-chan any, writer Writer, cancel func(error)) {
|
||||
reducer := func(input <-chan int64, writer Writer[int64], cancel func(error)) {
|
||||
var result int64
|
||||
for v := range input {
|
||||
result += v.(int64)
|
||||
result += v
|
||||
}
|
||||
writer.Write(result)
|
||||
}
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
MapReduce(func(input chan<- any) {
|
||||
MapReduce(func(input chan<- int64) {
|
||||
for j := 0; j < 2; j++ {
|
||||
input <- int64(j)
|
||||
}
|
||||
|
@ -58,20 +58,19 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
val, err := mr.MapReduce(func(source chan<- any) {
|
||||
val, err := mr.MapReduce(func(source chan<- int) {
|
||||
// generator
|
||||
for i := 0; i < 10; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any, writer mr.Writer, cancel func(error)) {
|
||||
}, func(i int, writer mr.Writer[int], cancel func(error)) {
|
||||
// mapper
|
||||
i := item.(int)
|
||||
writer.Write(i * i)
|
||||
}, func(pipe <-chan any, writer mr.Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
|
||||
// reducer
|
||||
var sum int
|
||||
for i := range pipe {
|
||||
sum += i.(int)
|
||||
sum += i
|
||||
}
|
||||
writer.Write(sum)
|
||||
})
|
||||
|
@ -59,20 +59,19 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
val, err := mr.MapReduce(func(source chan<- any) {
|
||||
val, err := mr.MapReduce(func(source chan<- int) {
|
||||
// generator
|
||||
for i := 0; i < 10; i++ {
|
||||
source <- i
|
||||
}
|
||||
}, func(item any, writer mr.Writer, cancel func(error)) {
|
||||
}, func(i int, writer mr.Writer[int], cancel func(error)) {
|
||||
// mapper
|
||||
i := item.(int)
|
||||
writer.Write(i * i)
|
||||
}, func(pipe <-chan any, writer mr.Writer, cancel func(error)) {
|
||||
}, func(pipe <-chan int, writer mr.Writer[int], cancel func(error)) {
|
||||
// reducer
|
||||
var sum int
|
||||
for i := range pipe {
|
||||
sum += i.(int)
|
||||
sum += i
|
||||
}
|
||||
writer.Write(sum)
|
||||
})
|
||||
|
@ -15,5 +15,14 @@ func AddWrapUpListener(fn func()) func() {
|
||||
return fn
|
||||
}
|
||||
|
||||
// SetTimeToForceQuit does nothing on windows.
|
||||
func SetTimeToForceQuit(duration time.Duration) {
|
||||
}
|
||||
|
||||
// Shutdown does nothing on windows.
|
||||
func Shutdown() {
|
||||
}
|
||||
|
||||
// WrapUp does nothing on windows.
|
||||
func WrapUp() {
|
||||
}
|
||||
|
5
core/stores/cache/cache.go
vendored
5
core/stores/cache/cache.go
vendored
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/zeromicro/go-zero/core/errorx"
|
||||
"github.com/zeromicro/go-zero/core/hash"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
)
|
||||
|
||||
@ -62,12 +63,12 @@ func New(c ClusterConf, barrier syncx.SingleFlight, st *Stat, errNotFound error,
|
||||
}
|
||||
|
||||
if len(c) == 1 {
|
||||
return NewNode(c[0].NewRedis(), barrier, st, errNotFound, opts...)
|
||||
return NewNode(redis.MustNewRedis(c[0].RedisConf), barrier, st, errNotFound, opts...)
|
||||
}
|
||||
|
||||
dispatcher := hash.NewConsistentHash()
|
||||
for _, node := range c {
|
||||
cn := NewNode(node.NewRedis(), barrier, st, errNotFound, opts...)
|
||||
cn := NewNode(redis.MustNewRedis(node.RedisConf), barrier, st, errNotFound, opts...)
|
||||
dispatcher.AddWithWeight(cn, node.Weight)
|
||||
}
|
||||
|
||||
|
4
core/stores/cache/cache_test.go
vendored
4
core/stores/cache/cache_test.go
vendored
@ -163,12 +163,10 @@ func TestCache_SetDel(t *testing.T) {
|
||||
r1, err := miniredis.Run()
|
||||
assert.NoError(t, err)
|
||||
defer r1.Close()
|
||||
r1.SetError("mock error")
|
||||
|
||||
r2, err := miniredis.Run()
|
||||
assert.NoError(t, err)
|
||||
defer r2.Close()
|
||||
r2.SetError("mock error")
|
||||
|
||||
conf := ClusterConf{
|
||||
{
|
||||
@ -187,6 +185,8 @@ func TestCache_SetDel(t *testing.T) {
|
||||
},
|
||||
}
|
||||
c := New(conf, syncx.NewSingleFlight(), NewStat("mock"), errPlaceholder)
|
||||
r1.SetError("mock error")
|
||||
r2.SetError("mock error")
|
||||
assert.NoError(t, c.Del("a", "b", "c"))
|
||||
})
|
||||
}
|
||||
|
@ -1,14 +0,0 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
// imports the driver, don't remove this comment, golint requires.
|
||||
_ "github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
)
|
||||
|
||||
const clickHouseDriverName = "clickhouse"
|
||||
|
||||
// New returns a clickhouse connection.
|
||||
func New(datasource string, opts ...sqlx.SqlOption) sqlx.SqlConn {
|
||||
return sqlx.NewSqlConn(clickHouseDriverName, datasource, opts...)
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestClickHouse(t *testing.T) {
|
||||
assert.NotNil(t, New("clickhouse"))
|
||||
}
|
@ -164,7 +164,7 @@ func NewStore(c KvConf) Store {
|
||||
// because Store and redis.Redis has different methods.
|
||||
dispatcher := hash.NewConsistentHash()
|
||||
for _, node := range c {
|
||||
cn := node.NewRedis()
|
||||
cn := redis.MustNewRedis(node.RedisConf)
|
||||
dispatcher.AddWithWeight(cn, node.Weight)
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,8 @@ var (
|
||||
ErrEmptyType = errors.New("empty redis type")
|
||||
// ErrEmptyKey is an error that indicates no redis key is set.
|
||||
ErrEmptyKey = errors.New("empty redis key")
|
||||
// ErrPing is an error that indicates ping failed.
|
||||
ErrPing = errors.New("ping redis failed")
|
||||
)
|
||||
|
||||
type (
|
||||
@ -28,6 +30,7 @@ type (
|
||||
)
|
||||
|
||||
// NewRedis returns a Redis.
|
||||
// Deprecated: use MustNewRedis or NewRedis instead.
|
||||
func (rc RedisConf) NewRedis() *Redis {
|
||||
var opts []Option
|
||||
if rc.Type == ClusterType {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -86,7 +87,46 @@ type (
|
||||
)
|
||||
|
||||
// New returns a Redis with given options.
|
||||
// Deprecated: use MustNewRedis or NewRedis instead.
|
||||
func New(addr string, opts ...Option) *Redis {
|
||||
return newRedis(addr, opts...)
|
||||
}
|
||||
|
||||
// MustNewRedis returns a Redis with given options.
|
||||
func MustNewRedis(conf RedisConf, opts ...Option) *Redis {
|
||||
rds, err := NewRedis(conf, opts...)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
return rds
|
||||
}
|
||||
|
||||
// NewRedis returns a Redis with given options.
|
||||
func NewRedis(conf RedisConf, opts ...Option) (*Redis, error) {
|
||||
if err := conf.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if conf.Type == ClusterType {
|
||||
opts = append([]Option{Cluster()}, opts...)
|
||||
}
|
||||
if len(conf.Pass) > 0 {
|
||||
opts = append([]Option{WithPass(conf.Pass)}, opts...)
|
||||
}
|
||||
if conf.Tls {
|
||||
opts = append([]Option{WithTLS()}, opts...)
|
||||
}
|
||||
|
||||
rds := newRedis(conf.Host, opts...)
|
||||
if !rds.Ping() {
|
||||
return nil, ErrPing
|
||||
}
|
||||
|
||||
return rds, nil
|
||||
}
|
||||
|
||||
func newRedis(addr string, opts ...Option) *Redis {
|
||||
r := &Redis{
|
||||
Addr: addr,
|
||||
Type: NodeType,
|
||||
|
@ -16,6 +16,116 @@ import (
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
func TestNewRedis(t *testing.T) {
|
||||
r1, err := miniredis.Run()
|
||||
assert.NoError(t, err)
|
||||
defer r1.Close()
|
||||
|
||||
r2, err := miniredis.Run()
|
||||
assert.NoError(t, err)
|
||||
defer r2.Close()
|
||||
r2.SetError("mock")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
RedisConf
|
||||
ok bool
|
||||
redisErr bool
|
||||
}{
|
||||
{
|
||||
name: "missing host",
|
||||
RedisConf: RedisConf{
|
||||
Host: "",
|
||||
Type: NodeType,
|
||||
Pass: "",
|
||||
},
|
||||
ok: false,
|
||||
},
|
||||
{
|
||||
name: "missing type",
|
||||
RedisConf: RedisConf{
|
||||
Host: "localhost:6379",
|
||||
Type: "",
|
||||
Pass: "",
|
||||
},
|
||||
ok: false,
|
||||
},
|
||||
{
|
||||
name: "ok",
|
||||
RedisConf: RedisConf{
|
||||
Host: r1.Addr(),
|
||||
Type: NodeType,
|
||||
Pass: "",
|
||||
},
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "ok",
|
||||
RedisConf: RedisConf{
|
||||
Host: r1.Addr(),
|
||||
Type: ClusterType,
|
||||
Pass: "",
|
||||
},
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "password",
|
||||
RedisConf: RedisConf{
|
||||
Host: r1.Addr(),
|
||||
Type: NodeType,
|
||||
Pass: "pw",
|
||||
},
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "tls",
|
||||
RedisConf: RedisConf{
|
||||
Host: r1.Addr(),
|
||||
Type: NodeType,
|
||||
Tls: true,
|
||||
},
|
||||
ok: true,
|
||||
},
|
||||
{
|
||||
name: "node error",
|
||||
RedisConf: RedisConf{
|
||||
Host: r2.Addr(),
|
||||
Type: NodeType,
|
||||
Pass: "",
|
||||
},
|
||||
ok: true,
|
||||
redisErr: true,
|
||||
},
|
||||
{
|
||||
name: "cluster error",
|
||||
RedisConf: RedisConf{
|
||||
Host: r2.Addr(),
|
||||
Type: ClusterType,
|
||||
Pass: "",
|
||||
},
|
||||
ok: true,
|
||||
redisErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(stringx.RandId(), func(t *testing.T) {
|
||||
rds, err := NewRedis(test.RedisConf)
|
||||
if test.ok {
|
||||
if test.redisErr {
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, rds)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, rds)
|
||||
}
|
||||
} else {
|
||||
assert.Error(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedis_Decr(t *testing.T) {
|
||||
runOnRedis(t, func(client *Redis) {
|
||||
_, err := New(client.Addr, badType()).Decr("a")
|
||||
@ -1666,7 +1776,10 @@ func runOnRedis(t *testing.T, fn func(client *Redis)) {
|
||||
}
|
||||
}()
|
||||
|
||||
fn(New(s.Addr()))
|
||||
fn(MustNewRedis(RedisConf{
|
||||
Host: s.Addr(),
|
||||
Type: NodeType,
|
||||
}))
|
||||
}
|
||||
|
||||
func runOnRedisWithError(t *testing.T, fn func(client *Redis)) {
|
||||
|
@ -64,7 +64,7 @@ type (
|
||||
// query arguments into one string and do underlying query without arguments
|
||||
commonSqlConn struct {
|
||||
connProv connProvider
|
||||
onError func(error)
|
||||
onError func(context.Context, error)
|
||||
beginTx beginnable
|
||||
brk breaker.Breaker
|
||||
accept func(error) bool
|
||||
@ -98,8 +98,8 @@ func NewSqlConn(driverName, datasource string, opts ...SqlOption) SqlConn {
|
||||
connProv: func() (*sql.DB, error) {
|
||||
return getSqlConn(driverName, datasource)
|
||||
},
|
||||
onError: func(err error) {
|
||||
logInstanceError(datasource, err)
|
||||
onError: func(ctx context.Context, err error) {
|
||||
logInstanceError(ctx, datasource, err)
|
||||
},
|
||||
beginTx: begin,
|
||||
brk: breaker.NewBreaker(),
|
||||
@ -118,8 +118,8 @@ func NewSqlConnFromDB(db *sql.DB, opts ...SqlOption) SqlConn {
|
||||
connProv: func() (*sql.DB, error) {
|
||||
return db, nil
|
||||
},
|
||||
onError: func(err error) {
|
||||
logx.Errorf("Error on getting sql instance: %v", err)
|
||||
onError: func(ctx context.Context, err error) {
|
||||
logx.WithContext(ctx).Errorf("Error on getting sql instance: %v", err)
|
||||
},
|
||||
beginTx: begin,
|
||||
brk: breaker.NewBreaker(),
|
||||
@ -146,7 +146,7 @@ func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...any) (
|
||||
var conn *sql.DB
|
||||
conn, err = db.connProv()
|
||||
if err != nil {
|
||||
db.onError(err)
|
||||
db.onError(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -174,7 +174,7 @@ func (db *commonSqlConn) PrepareCtx(ctx context.Context, query string) (stmt Stm
|
||||
var conn *sql.DB
|
||||
conn, err = db.connProv()
|
||||
if err != nil {
|
||||
db.onError(err)
|
||||
db.onError(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -301,7 +301,7 @@ func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows)
|
||||
err = db.brk.DoWithAcceptable(func() error {
|
||||
conn, err := db.connProv()
|
||||
if err != nil {
|
||||
db.onError(err)
|
||||
db.onError(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -139,7 +139,7 @@ func transact(ctx context.Context, db *commonSqlConn, b beginnable,
|
||||
fn func(context.Context, Session) error) (err error) {
|
||||
conn, err := db.connProv()
|
||||
if err != nil {
|
||||
db.onError(err)
|
||||
db.onError(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -130,9 +130,9 @@ func format(query string, args ...any) (string, error) {
|
||||
return b.String(), nil
|
||||
}
|
||||
|
||||
func logInstanceError(datasource string, err error) {
|
||||
func logInstanceError(ctx context.Context, datasource string, err error) {
|
||||
datasource = desensitize(datasource)
|
||||
logx.Errorf("Error on getting sql instance of %s: %v", datasource, err)
|
||||
logx.WithContext(ctx).Errorf("Error on getting sql instance of %s: %v", datasource, err)
|
||||
}
|
||||
|
||||
func logSqlError(ctx context.Context, stmt string, err error) {
|
||||
|
@ -98,3 +98,52 @@ func (n *node) find(chars []rune) []scope {
|
||||
|
||||
return scopes
|
||||
}
|
||||
|
||||
func (n *node) longestMatch(chars []rune, start int) (used int, jump *node, matched bool) {
|
||||
cur := n
|
||||
var matchedNode *node
|
||||
|
||||
for i := start; i < len(chars); i++ {
|
||||
child, ok := cur.children[chars[i]]
|
||||
if ok {
|
||||
cur = child
|
||||
if cur.end {
|
||||
matchedNode = cur
|
||||
}
|
||||
} else {
|
||||
if matchedNode != nil {
|
||||
return matchedNode.depth, nil, true
|
||||
}
|
||||
|
||||
if n.end {
|
||||
return start, nil, true
|
||||
}
|
||||
|
||||
var jump *node
|
||||
for cur.fail != nil {
|
||||
jump, ok = cur.fail.children[chars[i]]
|
||||
if ok {
|
||||
break
|
||||
}
|
||||
cur = cur.fail
|
||||
}
|
||||
if jump != nil {
|
||||
return i + 1 - jump.depth, jump, false
|
||||
}
|
||||
|
||||
return i + 1, nil, false
|
||||
}
|
||||
}
|
||||
|
||||
// longest matched node
|
||||
if matchedNode != nil {
|
||||
return matchedNode.depth, nil, true
|
||||
}
|
||||
|
||||
// last matched node
|
||||
if n.end {
|
||||
return start, nil, true
|
||||
}
|
||||
|
||||
return len(chars), nil, false
|
||||
}
|
||||
|
@ -6,6 +6,15 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestLongestMatchGuardedCondition(t *testing.T) {
|
||||
n := new(node)
|
||||
n.end = true
|
||||
used, jump, matched := n.longestMatch([]rune(""), 0)
|
||||
assert.Equal(t, 0, used)
|
||||
assert.Nil(t, jump)
|
||||
assert.True(t, matched)
|
||||
}
|
||||
|
||||
func TestFuzzNodeCase1(t *testing.T) {
|
||||
keywords := []string{
|
||||
"cs8Zh",
|
||||
|
@ -1,6 +1,8 @@
|
||||
package stringx
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
type (
|
||||
// Replacer interface wraps the Replace method.
|
||||
@ -30,68 +32,30 @@ func NewReplacer(mapping map[string]string) Replacer {
|
||||
|
||||
// Replace replaces text with given substitutes.
|
||||
func (r *replacer) Replace(text string) string {
|
||||
var builder strings.Builder
|
||||
var start int
|
||||
chars := []rune(text)
|
||||
size := len(chars)
|
||||
var buf strings.Builder
|
||||
var nextStart int
|
||||
target := []rune(text)
|
||||
cur := r.node
|
||||
|
||||
for start < size {
|
||||
cur := r.node
|
||||
|
||||
if start > 0 {
|
||||
builder.WriteString(string(chars[:start]))
|
||||
}
|
||||
|
||||
for i := start; i < size; i++ {
|
||||
child, ok := cur.children[chars[i]]
|
||||
if ok {
|
||||
cur = child
|
||||
} else if cur == r.node {
|
||||
builder.WriteRune(chars[i])
|
||||
// cur already points to root, set start only
|
||||
start = i + 1
|
||||
continue
|
||||
for len(target) != 0 {
|
||||
used, jump, matched := cur.longestMatch(target, nextStart)
|
||||
if matched {
|
||||
replaced := r.mapping[string(target[:used])]
|
||||
target = append([]rune(replaced), target[used:]...)
|
||||
cur = r.node
|
||||
nextStart = 0
|
||||
} else {
|
||||
buf.WriteString(string(target[:used]))
|
||||
target = target[used:]
|
||||
if jump != nil {
|
||||
cur = jump
|
||||
nextStart = jump.depth
|
||||
} else {
|
||||
curDepth := cur.depth
|
||||
cur = cur.fail
|
||||
child, ok = cur.children[chars[i]]
|
||||
if !ok {
|
||||
// write this path
|
||||
builder.WriteString(string(chars[i-curDepth : i+1]))
|
||||
// go to root
|
||||
cur = r.node
|
||||
start = i + 1
|
||||
continue
|
||||
}
|
||||
|
||||
failDepth := cur.depth
|
||||
// write path before jump
|
||||
builder.WriteString(string(chars[start : start+curDepth-failDepth]))
|
||||
start += curDepth - failDepth
|
||||
cur = child
|
||||
cur = r.node
|
||||
nextStart = 0
|
||||
}
|
||||
|
||||
if cur.end {
|
||||
val := string(chars[i+1-cur.depth : i+1])
|
||||
builder.WriteString(r.mapping[val])
|
||||
builder.WriteString(string(chars[i+1:]))
|
||||
// only matching this path, all previous paths are done
|
||||
if start >= i+1-cur.depth && i+1 >= size {
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
chars = []rune(builder.String())
|
||||
size = len(chars)
|
||||
builder.Reset()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !cur.end {
|
||||
builder.WriteString(string(chars[start:]))
|
||||
return builder.String()
|
||||
}
|
||||
}
|
||||
|
||||
return string(chars)
|
||||
return buf.String()
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
//go:build go1.18
|
||||
// +build go1.18
|
||||
|
||||
package stringx
|
||||
|
||||
|
@ -51,6 +51,31 @@ func TestReplacer_ReplaceMultiMatches(t *testing.T) {
|
||||
assert.Equal(t, "零一23四五一23四五", NewReplacer(mapping).Replace("零一二三四五一二三四五"))
|
||||
}
|
||||
|
||||
func TestReplacer_ReplaceLongestMatching(t *testing.T) {
|
||||
keywords := map[string]string{
|
||||
"日本": "japan",
|
||||
"日本的首都": "东京",
|
||||
}
|
||||
replacer := NewReplacer(keywords)
|
||||
assert.Equal(t, "东京在japan", replacer.Replace("日本的首都在日本"))
|
||||
}
|
||||
|
||||
func TestReplacer_ReplaceLongestOverlap(t *testing.T) {
|
||||
keywords := map[string]string{
|
||||
"456": "def",
|
||||
"abcd": "1234",
|
||||
}
|
||||
replacer := NewReplacer(keywords)
|
||||
assert.Equal(t, "123def7", replacer.Replace("abcd567"))
|
||||
}
|
||||
|
||||
func TestReplacer_ReplaceLongestLonger(t *testing.T) {
|
||||
mapping := map[string]string{
|
||||
"c": "3",
|
||||
}
|
||||
assert.Equal(t, "3d", NewReplacer(mapping).Replace("cd"))
|
||||
}
|
||||
|
||||
func TestReplacer_ReplaceJumpToFail(t *testing.T) {
|
||||
mapping := map[string]string{
|
||||
"bcdf": "1235",
|
||||
|
@ -63,12 +63,11 @@ func (s *Server) build() error {
|
||||
return err
|
||||
}
|
||||
|
||||
return mr.MapReduceVoid(func(source chan<- any) {
|
||||
return mr.MapReduceVoid(func(source chan<- Upstream) {
|
||||
for _, up := range s.upstreams {
|
||||
source <- up
|
||||
}
|
||||
}, func(item any, writer mr.Writer, cancel func(error)) {
|
||||
up := item.(Upstream)
|
||||
}, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) {
|
||||
cli := zrpc.MustNewClient(up.Grpc)
|
||||
source, err := s.createDescriptorSource(cli, up)
|
||||
if err != nil {
|
||||
@ -109,9 +108,8 @@ func (s *Server) build() error {
|
||||
Handler: s.buildHandler(source, resolver, cli, m.RpcPath),
|
||||
})
|
||||
}
|
||||
}, func(pipe <-chan any, cancel func(error)) {
|
||||
for item := range pipe {
|
||||
route := item.(rest.Route)
|
||||
}, func(pipe <-chan rest.Route, cancel func(error)) {
|
||||
for route := range pipe {
|
||||
s.Server.AddRoute(route)
|
||||
}
|
||||
})
|
||||
|
33
go.mod
33
go.mod
@ -3,7 +3,6 @@ module github.com/zeromicro/go-zero
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.5.1
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||
github.com/alicebob/miniredis/v2 v2.30.0
|
||||
github.com/fatih/color v1.14.1
|
||||
@ -25,19 +24,19 @@ require (
|
||||
go.etcd.io/etcd/api/v3 v3.5.7
|
||||
go.etcd.io/etcd/client/v3 v3.5.7
|
||||
go.mongodb.org/mongo-driver v1.11.1
|
||||
go.opentelemetry.io/otel v1.11.2
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.11.2
|
||||
go.opentelemetry.io/otel v1.13.0
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.13.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2
|
||||
go.opentelemetry.io/otel/exporters/zipkin v1.11.2
|
||||
go.opentelemetry.io/otel/sdk v1.11.2
|
||||
go.opentelemetry.io/otel/trace v1.11.2
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.13.0
|
||||
go.opentelemetry.io/otel/exporters/zipkin v1.13.0
|
||||
go.opentelemetry.io/otel/sdk v1.13.0
|
||||
go.opentelemetry.io/otel/trace v1.13.0
|
||||
go.uber.org/automaxprocs v1.5.1
|
||||
go.uber.org/goleak v1.2.0
|
||||
golang.org/x/sys v0.4.0
|
||||
golang.org/x/sys v0.5.0
|
||||
golang.org/x/time v0.3.0
|
||||
google.golang.org/genproto v0.0.0-20230123190316-2c411cf9d197
|
||||
google.golang.org/grpc v1.52.0
|
||||
google.golang.org/grpc v1.53.0
|
||||
google.golang.org/protobuf v1.28.1
|
||||
gopkg.in/cheggaaa/pb.v1 v1.0.28
|
||||
gopkg.in/h2non/gock.v1 v1.1.2
|
||||
@ -49,19 +48,15 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/ch-go v0.51.1 // indirect
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
|
||||
github.com/andybalholm/brotli v1.0.4 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/coreos/go-semver v0.3.1 // indirect
|
||||
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
|
||||
github.com/go-faster/city v1.0.1 // indirect
|
||||
github.com/go-faster/errors v0.6.1 // indirect
|
||||
github.com/go-logr/logr v1.2.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||
@ -88,31 +83,27 @@ require (
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/openzipkin/zipkin-go v0.4.1 // indirect
|
||||
github.com/paulmach/orb v0.8.0 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.17 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/segmentio/asm v1.2.0 // indirect
|
||||
github.com/shopspring/decimal v1.3.1 // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.1.1 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.3 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 // indirect
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.7 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.13.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.13.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
go.uber.org/zap v1.24.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2 // indirect
|
||||
golang.org/x/net v0.5.0 // indirect
|
||||
golang.org/x/oauth2 v0.3.0 // indirect
|
||||
golang.org/x/oauth2 v0.4.0 // indirect
|
||||
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect
|
||||
golang.org/x/term v0.4.0 // indirect
|
||||
golang.org/x/text v0.6.0 // indirect
|
||||
|
66
go.sum
66
go.sum
@ -33,10 +33,6 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/ClickHouse/ch-go v0.51.1 h1:qT66gldDXAzZNbgu3E/LwTROWrZejG6n8tVMRuyw/vA=
|
||||
github.com/ClickHouse/ch-go v0.51.1/go.mod h1:z+/hEezvvHvRMV/I00CaXBnxOx+td4zRe7HJpBYLwGU=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.5.1 h1:+KebkZtGJKaCilgNF0vQBrc7hOdNWnheue0bX1OVhl4=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.5.1/go.mod h1:21ga8MAMxWl6AKFJTaoT/ur/zIo8OJccxj/5bF8T9SE=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
@ -49,8 +45,6 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
|
||||
github.com/alicebob/miniredis/v2 v2.30.0 h1:uA3uhDbCxfO9+DI/DuGeAMr9qI+noVWwGPNTFuKID5M=
|
||||
github.com/alicebob/miniredis/v2 v2.30.0/go.mod h1:84TWKZlxYkfgMucPBf5SOQBYJceZeQRFIaQgNMiCX6Q=
|
||||
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
|
||||
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
@ -62,8 +56,9 @@ github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
@ -106,10 +101,6 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
|
||||
github.com/fullstorydev/grpcurl v1.8.7 h1:xJWosq3BQovQ4QrdPO72OrPiWuGgEsxY8ldYsJbPrqI=
|
||||
github.com/fullstorydev/grpcurl v1.8.7/go.mod h1:pVtM4qe3CMoLaIzYS8uvTuDj2jVYmXqMUkZeijnXp/E=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
|
||||
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
|
||||
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
|
||||
github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY=
|
||||
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
|
||||
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
|
||||
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
|
||||
@ -304,13 +295,8 @@ github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs=
|
||||
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
|
||||
github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A=
|
||||
github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM=
|
||||
github.com/paulmach/orb v0.8.0 h1:W5XAt5yNPNnhaMNEf0xNSkBMJ1LzOzdk2MRlB6EN0Vs=
|
||||
github.com/paulmach/orb v0.8.0/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A=
|
||||
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
|
||||
github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU=
|
||||
github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek=
|
||||
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
|
||||
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
@ -348,10 +334,6 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
|
||||
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
|
||||
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
|
||||
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
@ -405,24 +387,24 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0=
|
||||
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.11.2 h1:ES8/j2+aB+3/BUw51ioxa50V9btN1eew/2J7N7n1tsE=
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.11.2/go.mod h1:nwcF/DK4Hk0auZ/a5vw20uMsaJSXbzeeimhN5f9d0Lc=
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 h1:htgM8vZIF8oPSCxa341e3IZ4yr/sKxgu8KZYllByiVY=
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2/go.mod h1:rqbht/LlhVBgn5+k3M5QK96K5Xb0DvXpMJ5SFQpY6uw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 h1:fqR1kli93643au1RKo0Uma3d2aPQKT+WBKfTSBaKbOc=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2/go.mod h1:5Qn6qvgkMsLDX+sYK64rHb1FPhpn0UtxF+ouX1uhyJE=
|
||||
go.opentelemetry.io/otel v1.13.0 h1:1ZAKnNQKwBBxFtww/GwxNUyTf0AxkZzrukO8MeXqe4Y=
|
||||
go.opentelemetry.io/otel v1.13.0/go.mod h1:FH3RtdZCzRkJYFTCsAKDy9l/XYjMdNv6QrkFFB8DvVg=
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.13.0 h1:VAMoGujbVV8Q0JNM/cEbhzUIWWBxnEqH45HP9iBKN04=
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.13.0/go.mod h1:fHwbmle6mBFJA1p2ZIhilvffCdq/dM5UTIiCOmEjS+w=
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.13.0 h1:pa05sNT/P8OsIQ8mPZKTIyiBuzS/xDGLVx+DCt0y6Vs=
|
||||
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.13.0/go.mod h1:rqbht/LlhVBgn5+k3M5QK96K5Xb0DvXpMJ5SFQpY6uw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.13.0 h1:Any/nVxaoMq1T2w0W85d6w5COlLuCCgOYKQhJJWEMwQ=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.13.0/go.mod h1:46vAP6RWfNn7EKov73l5KBFlNxz8kYlxR1woU+bJ4ZY=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 h1:ERwKPn9Aer7Gxsc0+ZlutlH1bEEAUXAUhqm3Y45ABbk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2/go.mod h1:jWZUM2MWhWCJ9J9xVbRx7tzK1mXKpAlze4CeulycwVY=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2 h1:Us8tbCmuN16zAnK5TC69AtODLycKbwnskQzaB6DfFhc=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2/go.mod h1:GZWSQQky8AgdJj50r1KJm8oiQiIPaAX7uZCFQX9GzC8=
|
||||
go.opentelemetry.io/otel/exporters/zipkin v1.11.2 h1:wGdWn04d1sEnxfO4TUF/UcQfEIu80IvqUXU1lENKyFg=
|
||||
go.opentelemetry.io/otel/exporters/zipkin v1.11.2/go.mod h1:I60/FdYilVKkuDOzenyp8LqJLryRC/Mr918G5hchvkM=
|
||||
go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU=
|
||||
go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU=
|
||||
go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0=
|
||||
go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.13.0 h1:Ntu7izEOIRHEgQNjbGc7j3eNtYMAiZfElJJ4JiiRDH4=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.13.0/go.mod h1:wZ9SAjm2sjw3vStBhlCfMZWZusyOQrwrHOFo00jyMC4=
|
||||
go.opentelemetry.io/otel/exporters/zipkin v1.13.0 h1:RqPV1VhJjrx28qOKYFPj3Mso56uaBovur3GZehF9y9s=
|
||||
go.opentelemetry.io/otel/exporters/zipkin v1.13.0/go.mod h1:x6S2VkXmdpoYUqQx9FKiMEsndal6xkcwDdV0Oi1RlLM=
|
||||
go.opentelemetry.io/otel/sdk v1.13.0 h1:BHib5g8MvdqS65yo2vV1s6Le42Hm6rrw08qU6yz5JaM=
|
||||
go.opentelemetry.io/otel/sdk v1.13.0/go.mod h1:YLKPx5+6Vx/o1TCUYYs+bpymtkmazOMT6zoRrC7AQ7I=
|
||||
go.opentelemetry.io/otel/trace v1.13.0 h1:CBgRZ6ntv+Amuj1jDsMhZtlAPT6gbyIRdaIzFhfBSdY=
|
||||
go.opentelemetry.io/otel/trace v1.13.0/go.mod h1:muCvmmO9KKpvuXSf3KKAXXB2ygNYHQ+ZfI5X08d3tds=
|
||||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
|
||||
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
|
||||
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
|
||||
@ -521,8 +503,8 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr
|
||||
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
|
||||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
|
||||
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
|
||||
golang.org/x/oauth2 v0.3.0 h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8=
|
||||
golang.org/x/oauth2 v0.3.0/go.mod h1:rQrIauxkUhJ6CuwEXwymO2/eh4xz2ZWF1nBkcxS+tGk=
|
||||
golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M=
|
||||
golang.org/x/oauth2 v0.4.0/go.mod h1:RznEsdpjGAINPTOF0UH/t+xJ75L18YO3Ho6Pyn+uRec=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@ -581,8 +563,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
|
||||
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
|
||||
@ -726,8 +708,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
|
||||
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
|
||||
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
|
||||
google.golang.org/grpc v1.52.0 h1:kd48UiU7EHsV4rnLyOJRuP/Il/UHE7gdDAQ+SZI7nZk=
|
||||
google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
|
||||
google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
|
||||
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
@ -9,7 +9,7 @@ import (
|
||||
"github.com/zeromicro/go-zero/core/syncx"
|
||||
)
|
||||
|
||||
// defaultHealthManager is global comboHealthManager for byone self.
|
||||
// defaultHealthManager is global comboHealthManager.
|
||||
var defaultHealthManager = newComboHealthManager()
|
||||
|
||||
type (
|
||||
|
@ -296,6 +296,8 @@ go-zero 已被许多公司用于生产部署,接入场景如在线教育、电
|
||||
>81. 广州机智云物联网科技有限公司
|
||||
>82. 厦门亿联网络技术股份有限公司
|
||||
>83. 北京麦芽田网络科技有限公司
|
||||
>84. 佛山市振联科技有限公司
|
||||
>85. 苏州智言信息科技有限公司
|
||||
|
||||
如果贵公司也已使用 go-zero,欢迎在 [登记地址](https://github.com/zeromicro/go-zero/issues/602) 登记,仅仅为了推广,不做其它用途。
|
||||
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
@ -36,7 +37,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
||||
`
|
||||
routesAdditionTemplate = `
|
||||
server.AddRoutes(
|
||||
{{.routes}} {{.jwt}}{{.signature}} {{.prefix}} {{.timeout}}
|
||||
{{.routes}} {{.jwt}}{{.signature}} {{.prefix}} {{.timeout}} {{.maxBytes}}
|
||||
)
|
||||
`
|
||||
timeoutThreshold = time.Millisecond
|
||||
@ -64,6 +65,7 @@ type (
|
||||
middlewares []string
|
||||
prefix string
|
||||
jwtTrans string
|
||||
maxBytes string
|
||||
}
|
||||
route struct {
|
||||
method string
|
||||
@ -127,10 +129,20 @@ rest.WithPrefix("%s"),`, g.prefix)
|
||||
return fmt.Errorf("timeout should not less than 1ms, now %v", duration)
|
||||
}
|
||||
|
||||
timeout = fmt.Sprintf("rest.WithTimeout(%d * time.Millisecond),", duration/time.Millisecond)
|
||||
timeout = fmt.Sprintf("\n rest.WithTimeout(%d * time.Millisecond),", duration/time.Millisecond)
|
||||
hasTimeout = true
|
||||
}
|
||||
|
||||
var maxBytes string
|
||||
if len(g.maxBytes) > 0 {
|
||||
_, err := strconv.ParseInt(g.maxBytes, 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("maxBytes %s parse error,it is an invalid number", g.maxBytes)
|
||||
}
|
||||
|
||||
maxBytes = fmt.Sprintf("\n rest.WithMaxBytes(%s),", g.maxBytes)
|
||||
}
|
||||
|
||||
var routes string
|
||||
if len(g.middlewares) > 0 {
|
||||
gbuilder.WriteString("\n}...,")
|
||||
@ -152,6 +164,7 @@ rest.WithPrefix("%s"),`, g.prefix)
|
||||
"signature": signature,
|
||||
"prefix": prefix,
|
||||
"timeout": timeout,
|
||||
"maxBytes": maxBytes,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -230,6 +243,7 @@ func getRoutes(api *spec.ApiSpec) ([]group, error) {
|
||||
}
|
||||
|
||||
groupedRoutes.timeout = g.GetAnnotation("timeout")
|
||||
groupedRoutes.maxBytes = g.GetAnnotation("maxBytes")
|
||||
|
||||
jwt := g.GetAnnotation("jwt")
|
||||
if len(jwt) > 0 {
|
||||
|
@ -59,12 +59,10 @@ func (s *rpcServer) Start(register RegisterFn) error {
|
||||
return err
|
||||
}
|
||||
|
||||
unaryInterceptors := s.buildUnaryInterceptors()
|
||||
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
|
||||
streamInterceptors := s.buildStreamInterceptors()
|
||||
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
|
||||
options := append(s.options, grpc.ChainUnaryInterceptor(unaryInterceptors...),
|
||||
grpc.ChainStreamInterceptor(streamInterceptors...))
|
||||
unaryInterceptorOption := grpc.ChainUnaryInterceptor(s.buildUnaryInterceptors()...)
|
||||
streamInterceptorOption := grpc.ChainStreamInterceptor(s.buildStreamInterceptors()...)
|
||||
|
||||
options := append(s.options, unaryInterceptorOption, streamInterceptorOption)
|
||||
server := grpc.NewServer(options...)
|
||||
register(server)
|
||||
|
||||
@ -102,7 +100,7 @@ func (s *rpcServer) buildStreamInterceptors() []grpc.StreamServerInterceptor {
|
||||
interceptors = append(interceptors, serverinterceptors.StreamBreakerInterceptor)
|
||||
}
|
||||
|
||||
return interceptors
|
||||
return append(interceptors, s.streamInterceptors...)
|
||||
}
|
||||
|
||||
func (s *rpcServer) buildUnaryInterceptors() []grpc.UnaryServerInterceptor {
|
||||
@ -124,7 +122,7 @@ func (s *rpcServer) buildUnaryInterceptors() []grpc.UnaryServerInterceptor {
|
||||
interceptors = append(interceptors, serverinterceptors.UnaryBreakerInterceptor)
|
||||
}
|
||||
|
||||
return interceptors
|
||||
return append(interceptors, s.unaryInterceptors...)
|
||||
}
|
||||
|
||||
// WithMetrics returns a func that sets metrics to a Server.
|
||||
|
@ -1,6 +1,7 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@ -21,10 +22,11 @@ func TestRpcServer(t *testing.T) {
|
||||
Breaker: true,
|
||||
}, WithMetrics(metrics), WithRpcHealth(true))
|
||||
server.SetName("mock")
|
||||
var wg sync.WaitGroup
|
||||
var wg, wgDone sync.WaitGroup
|
||||
var grpcServer *grpc.Server
|
||||
var lock sync.Mutex
|
||||
wg.Add(1)
|
||||
wgDone.Add(1)
|
||||
go func() {
|
||||
err := server.Start(func(server *grpc.Server) {
|
||||
lock.Lock()
|
||||
@ -34,6 +36,7 @@ func TestRpcServer(t *testing.T) {
|
||||
wg.Done()
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
wgDone.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
@ -42,6 +45,9 @@ func TestRpcServer(t *testing.T) {
|
||||
lock.Lock()
|
||||
grpcServer.GracefulStop()
|
||||
lock.Unlock()
|
||||
|
||||
proc.WrapUp()
|
||||
wgDone.Wait()
|
||||
}
|
||||
|
||||
func TestRpcServer_WithBadAddress(t *testing.T) {
|
||||
@ -57,4 +63,118 @@ func TestRpcServer_WithBadAddress(t *testing.T) {
|
||||
mock.RegisterDepositServiceServer(server, new(mock.DepositServer))
|
||||
})
|
||||
assert.NotNil(t, err)
|
||||
|
||||
proc.WrapUp()
|
||||
}
|
||||
|
||||
func TestRpcServer_buildUnaryInterceptor(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
r *rpcServer
|
||||
len int
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
r: &rpcServer{
|
||||
baseRpcServer: &baseRpcServer{},
|
||||
},
|
||||
len: 0,
|
||||
},
|
||||
{
|
||||
name: "custom",
|
||||
r: &rpcServer{
|
||||
baseRpcServer: &baseRpcServer{
|
||||
unaryInterceptors: []grpc.UnaryServerInterceptor{
|
||||
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (interface{}, error) {
|
||||
return nil, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
len: 1,
|
||||
},
|
||||
{
|
||||
name: "middleware",
|
||||
r: &rpcServer{
|
||||
baseRpcServer: &baseRpcServer{
|
||||
unaryInterceptors: []grpc.UnaryServerInterceptor{
|
||||
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (interface{}, error) {
|
||||
return nil, nil
|
||||
},
|
||||
},
|
||||
},
|
||||
middlewares: ServerMiddlewaresConf{
|
||||
Trace: true,
|
||||
Recover: true,
|
||||
Stat: true,
|
||||
Prometheus: true,
|
||||
Breaker: true,
|
||||
},
|
||||
},
|
||||
len: 6,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
assert.Equal(t, test.len, len(test.r.buildUnaryInterceptors()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRpcServer_buildStreamInterceptor(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
r *rpcServer
|
||||
len int
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
r: &rpcServer{
|
||||
baseRpcServer: &baseRpcServer{},
|
||||
},
|
||||
len: 0,
|
||||
},
|
||||
{
|
||||
name: "custom",
|
||||
r: &rpcServer{
|
||||
baseRpcServer: &baseRpcServer{
|
||||
streamInterceptors: []grpc.StreamServerInterceptor{
|
||||
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
len: 1,
|
||||
},
|
||||
{
|
||||
name: "middleware",
|
||||
r: &rpcServer{
|
||||
baseRpcServer: &baseRpcServer{
|
||||
streamInterceptors: []grpc.StreamServerInterceptor{
|
||||
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
return nil
|
||||
},
|
||||
},
|
||||
},
|
||||
middlewares: ServerMiddlewaresConf{
|
||||
Trace: true,
|
||||
Recover: true,
|
||||
Breaker: true,
|
||||
},
|
||||
},
|
||||
len: 4,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
assert.Equal(t, test.len, len(test.r.buildStreamInterceptors()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/zeromicro/go-zero/core/load"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/core/stat"
|
||||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||||
"github.com/zeromicro/go-zero/zrpc/internal"
|
||||
"github.com/zeromicro/go-zero/zrpc/internal/auth"
|
||||
"github.com/zeromicro/go-zero/zrpc/internal/serverinterceptors"
|
||||
@ -120,7 +121,12 @@ func setupInterceptors(server internal.Server, c RpcServerConf, metrics *stat.Me
|
||||
}
|
||||
|
||||
if c.Auth {
|
||||
authenticator, err := auth.NewAuthenticator(c.Redis.NewRedis(), c.Redis.Key, c.StrictControl)
|
||||
rds, err := redis.NewRedis(c.Redis.RedisConf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
authenticator, err := auth.NewAuthenticator(rds, c.Redis.Key, c.StrictControl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zeromicro/go-zero/core/discov"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
@ -16,12 +17,16 @@ import (
|
||||
)
|
||||
|
||||
func TestServer_setupInterceptors(t *testing.T) {
|
||||
rds, err := miniredis.Run()
|
||||
assert.NoError(t, err)
|
||||
defer rds.Close()
|
||||
|
||||
server := new(mockedServer)
|
||||
err := setupInterceptors(server, RpcServerConf{
|
||||
conf := RpcServerConf{
|
||||
Auth: true,
|
||||
Redis: redis.RedisKeyConf{
|
||||
RedisConf: redis.RedisConf{
|
||||
Host: "any",
|
||||
Host: rds.Addr(),
|
||||
Type: redis.NodeType,
|
||||
},
|
||||
Key: "foo",
|
||||
@ -35,10 +40,15 @@ func TestServer_setupInterceptors(t *testing.T) {
|
||||
Prometheus: true,
|
||||
Breaker: true,
|
||||
},
|
||||
}, new(stat.Metrics))
|
||||
}
|
||||
err = setupInterceptors(server, conf, new(stat.Metrics))
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 3, len(server.unaryInterceptors))
|
||||
assert.Equal(t, 1, len(server.streamInterceptors))
|
||||
|
||||
rds.SetError("mock error")
|
||||
err = setupInterceptors(server, conf, new(stat.Metrics))
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestServer(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user