vuepress init
zero.gocn.vip init
3
.gitignore
vendored
@ -1 +1,4 @@
|
||||
**/.DS_Store
|
||||
.idea
|
||||
node_modules
|
||||
docs/.vuepress/dist
|
@ -9,7 +9,7 @@ go-zero针对文本的序列化和反序列化主要在三个地方使用
|
||||
本文假定读者已经定义过api文件以及修改过配置文件,如不熟悉,可参照
|
||||
|
||||
* [快速构建高并发微服务](shorturl.md)
|
||||
* [快速构建高并发微服务](bookstore.md)
|
||||
* [快速构建高并发微服务](../docs/frame/bookstore.md)
|
||||
|
||||
## 1. http api请求体的反序列化
|
||||
|
||||
|
99
docs/.vuepress/config.js
Normal file
@ -0,0 +1,99 @@
|
||||
const moment = require("moment");
|
||||
module.exports = {
|
||||
title: "GoZero",
|
||||
description: "集成各种工程实践的 WEB 和 RPC 框架",
|
||||
head: [
|
||||
["link", { rel: "icon", href: "/logo.png" }],
|
||||
[
|
||||
"meta",
|
||||
{
|
||||
name: "keywords",
|
||||
content: "Go,golang,zero,go-zero,micro service,gRPC",
|
||||
},
|
||||
],
|
||||
],
|
||||
|
||||
markdown: {
|
||||
lineNumbers: true, // 代码块显示行号
|
||||
},
|
||||
themeConfig: {
|
||||
nav: [
|
||||
{
|
||||
text: "首页",
|
||||
link: "/",
|
||||
},
|
||||
{
|
||||
text: "框架",
|
||||
link: "/frame/",
|
||||
},
|
||||
{ text: "GO中国", link: "https://gocn.vip/" },
|
||||
{
|
||||
text: "Zero",
|
||||
link: "https://github.com/tal-tech/go-zero",
|
||||
},
|
||||
],
|
||||
docsDir: "docs",
|
||||
docsBranch: "main",
|
||||
editLinks: true,
|
||||
editLinkText: "在github.com上编辑此页",
|
||||
sidebar: {
|
||||
"/summary/": [""], //这样自动生成对应文章
|
||||
"/frame/": [
|
||||
{
|
||||
title: "1 简介",
|
||||
collapsable: false, // 可选的, 默认值是 true,
|
||||
children: [
|
||||
"/frame/bookstore",
|
||||
],
|
||||
},
|
||||
{
|
||||
title: "2 核心",
|
||||
collapsable: false, // 可选的, 默认值是 true,
|
||||
children: [
|
||||
"/frame/core-logger",
|
||||
"/frame/core-bloom",
|
||||
"/frame/core-executors",
|
||||
"/frame/core-streamapi",
|
||||
"/frame/core-redis",
|
||||
],
|
||||
},
|
||||
], //这样自动生成对应文章
|
||||
"/awesome/": [
|
||||
{
|
||||
title: "扩展阅读", // 必要的
|
||||
collapsable: false, // 可选的, 默认值是 true,
|
||||
children: ["/awesome/register"],
|
||||
},
|
||||
],
|
||||
},
|
||||
sidebarDepth: 2,
|
||||
lastUpdated: "上次更新",
|
||||
serviceWorker: {
|
||||
updatePopup: {
|
||||
message: "发现新内容可用",
|
||||
buttonText: "刷新",
|
||||
},
|
||||
},
|
||||
},
|
||||
plugins: [
|
||||
[
|
||||
"@vuepress/last-updated",
|
||||
{
|
||||
transformer: (timestamp, lang) => {
|
||||
// 不要忘了安装 moment
|
||||
const moment = require("moment");
|
||||
moment.locale("zh-cn");
|
||||
return moment(timestamp).format("YYYY-MM-DD HH:mm:ss");
|
||||
},
|
||||
|
||||
dateOptions: {
|
||||
hours12: true,
|
||||
},
|
||||
},
|
||||
],
|
||||
"@vuepress/back-to-top",
|
||||
"@vuepress/active-header-links",
|
||||
"@vuepress/medium-zoom",
|
||||
"@vuepress/nprogress",
|
||||
],
|
||||
};
|
BIN
docs/.vuepress/public/logo.png
Normal file
After Width: | Height: | Size: 84 KiB |
18
docs/README.md
Normal file
@ -0,0 +1,18 @@
|
||||
---
|
||||
home: true
|
||||
heroImage: /logo.png
|
||||
actionText: 快速上手 →
|
||||
actionLink: /frame/
|
||||
heroText: Go Zero
|
||||
tagline: 集成各种工程实践的 WEB 和 RPC 框架
|
||||
sidebar: auto
|
||||
features:
|
||||
- title: 大道至简
|
||||
details: 极简的 API 描述,一键生成各端代码
|
||||
- title: 自动治理
|
||||
details: 内建级联超时控制、限流、自适应熔断、自适应降载等微服务治理能力
|
||||
- title: 工程哲学
|
||||
details: 微服务治理和工具包
|
||||
|
||||
footer: MIT Licensed | Copyright © 2020-present GoZero
|
||||
---
|
@ -33,7 +33,7 @@ Writting this bookstore service is to demonstrate the complete flow of creating
|
||||
|
||||
## 2. Architecture of shorturl microservice
|
||||
|
||||
<img src="images/bookstore-arch.png" alt="architecture" width="800" />
|
||||
<img src="../../doc/images/bookstore-arch.png" alt="architecture" width="800" />
|
||||
|
||||
## 3. goctl generated code overview
|
||||
|
||||
@ -41,15 +41,15 @@ All modules with green background are generated, and will be enabled when necess
|
||||
|
||||
* API Gateway
|
||||
|
||||
<img src="images/api-gen.png" alt="api" width="800" />
|
||||
<img src="../../doc/images/api-gen.png" alt="api" width="800" />
|
||||
|
||||
* RPC
|
||||
|
||||
<img src="images/rpc-gen.png" alt="rpc" width="800" />
|
||||
<img src="../../doc/images/rpc-gen.png" alt="rpc" width="800" />
|
||||
|
||||
* model
|
||||
|
||||
<img src="images/model-gen.png" alt="model" width="800" />
|
||||
<img src="../../doc/images/model-gen.png" alt="model" width="800" />
|
||||
|
||||
And now, let’s walk through the complete flow of quickly create a microservice with go-zero.
|
||||
|
||||
@ -603,7 +603,7 @@ Log:
|
||||
Level: error
|
||||
```
|
||||
|
||||
![Benchmark](images/bookstore-benchmark.png)
|
||||
![Benchmark](../../doc/images/bookstore-benchmark.png)
|
||||
|
||||
as shown above, in my MacBook Pro, the QPS is like 30K+.
|
||||
|
@ -1,4 +1,4 @@
|
||||
# 快速构建微服务-多RPC版
|
||||
# 1.1 快速构建微服务-多RPC版
|
||||
|
||||
[English](bookstore-en.md) | 简体中文
|
||||
|
||||
@ -33,7 +33,7 @@
|
||||
|
||||
## 2. 书店微服务架构图
|
||||
|
||||
<img src="images/bookstore-arch.png" alt="架构图" width="800" />
|
||||
<img src="../../doc/images/bookstore-arch.png" alt="架构图" width="800" />
|
||||
|
||||
## 3. goctl各层代码生成一览
|
||||
|
||||
@ -41,15 +41,15 @@
|
||||
|
||||
* API Gateway
|
||||
|
||||
<img src="images/bookstore-api.png" alt="api" width="800" />
|
||||
<img src="../../doc/images/bookstore-api.png" alt="api" width="800" />
|
||||
|
||||
* RPC
|
||||
|
||||
<img src="images/bookstore-rpc.png" alt="架构图" width="800" />
|
||||
<img src="../../doc/images/bookstore-rpc.png" alt="架构图" width="800" />
|
||||
|
||||
* model
|
||||
|
||||
<img src="images/bookstore-model.png" alt="model" width="800" />
|
||||
<img src="../../doc/images/bookstore-model.png" alt="model" width="800" />
|
||||
|
||||
下面我们来一起完整走一遍快速构建微服务的流程,Let’s `Go`!🏃♂️
|
||||
|
||||
@ -603,7 +603,7 @@ Log:
|
||||
Level: error
|
||||
```
|
||||
|
||||
![Benchmark](images/bookstore-benchmark.png)
|
||||
![Benchmark](../../doc/images/bookstore-benchmark.png)
|
||||
|
||||
可以看出在我的MacBook Pro上能达到3万+的qps。
|
||||
|
85
docs/frame/core-bloom.md
Normal file
@ -0,0 +1,85 @@
|
||||
# 2.2 bloom
|
||||
|
||||
|
||||
|
||||
go-zero微服务框架中提供了许多开箱即用的工具,好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错,实现代码风格的统一方便他人阅读等等,本系列文章将分别介绍go-zero框架中工具的使用及其实现原理
|
||||
|
||||
|
||||
## 布隆过滤器[bloom](https://github.com/tal-tech/go-zero/blob/master/core/bloom/bloom.go)
|
||||
|
||||
|
||||
在做服务器开发的时候,相信大家有听过布隆过滤器,可以判断某元素在不在集合里面,因为存在一定的误判和删除复杂问题,一般的使用场景是:防止缓存击穿(防止恶意攻击)、 垃圾邮箱过滤、cache digests 、模型检测器等、判断是否存在某行数据,用以减少对磁盘访问,提高服务的访问性能。 go-zero 提供的简单的缓存封装 bloom.bloom,简单使用方式如下
|
||||
|
||||
|
||||
```go
|
||||
// 初始化 redisBitSet
|
||||
store := redis.NewRedis("redis 地址", redis.NodeType)
|
||||
// 声明一个bitSet, key="test_key"名且bits是1024位
|
||||
bitSet := newRedisBitSet(store, "test_key", 1024)
|
||||
// 判断第0位bit存不存在
|
||||
isSetBefore, err := bitSet.check([]uint{0})
|
||||
|
||||
// 对第512位设置为1
|
||||
err = bitSet.set([]uint{512})
|
||||
// 3600秒后过期
|
||||
err = bitSet.expire(3600)
|
||||
|
||||
// 删除该bitSet
|
||||
err = bitSet.del()
|
||||
```
|
||||
|
||||
|
||||
bloom 简单介绍了最基本的redis bitset 的使用。下面是真正的bloom实现。
|
||||
对元素hash 定位
|
||||
```go
|
||||
// 对元素进行hash 14次(const maps=14),每次都在元素后追加byte(0-13),然后进行hash.
|
||||
// 将locations[0-13] 进行取模,最终返回locations.
|
||||
func (f *BloomFilter) getLocations(data []byte) []uint {
|
||||
locations := make([]uint, maps)
|
||||
for i := uint(0); i < maps; i++ {
|
||||
hashValue := hash.Hash(append(data, byte(i)))
|
||||
locations[i] = uint(hashValue % uint64(f.bits))
|
||||
}
|
||||
|
||||
return locations
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
向bloom里面add 元素
|
||||
```go
|
||||
// 我们可以发现 add方法使用了getLocations和bitSet的set方法。
|
||||
// 我们将元素进行hash成长度14的uint切片,然后进行set操作存到redis的bitSet里面。
|
||||
func (f *BloomFilter) Add(data []byte) error {
|
||||
locations := f.getLocations(data)
|
||||
err := f.bitSet.set(locations)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
检查bloom里面是否有某元素
|
||||
```go
|
||||
// 我们可以发现 Exists方法使用了getLocations和bitSet的check方法
|
||||
// 我们将元素进行hash成长度14的uint切片,然后进行bitSet的check验证,存在返回true,不存在或者check失败返回false
|
||||
func (f *BloomFilter) Exists(data []byte) (bool, error) {
|
||||
locations := f.getLocations(data)
|
||||
isSet, err := f.bitSet.check(locations)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !isSet {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
本文主要介绍了go-zero框架中的 core.bloom 工具,在实际的项目中非常实用。用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。
|
370
docs/frame/core-executors.md
Normal file
@ -0,0 +1,370 @@
|
||||
# 2.3 executors
|
||||
|
||||
在 `go-zero` 中,`executors` 充当任务池,做多任务缓冲,使用做批量处理的任务。如:`clickhouse` 大批量 `insert`,`sql batch insert`。同时也可以在 `go-queue` 也可以看到 `executors` 【在 `queue` 里面使用的是 `ChunkExecutor` ,限定任务提交字节大小】。
|
||||
|
||||
|
||||
所以当你存在以下需求,都可以使用这个组件:
|
||||
|
||||
|
||||
- 批量提交任务
|
||||
- 缓冲一部分任务,惰性提交
|
||||
- 延迟任务提交
|
||||
|
||||
|
||||
|
||||
具体解释之前,先给一个大致的概览图:
|
||||
![c42c34e8d33d48ec8a63e56feeae882a.png](https://cdn.nlark.com/yuque/0/2020/png/2623842/1601723457107-a4b762da-d737-456e-944c-374e4440dc3d.png#align=left&display=inline&height=1011&margin=%5Bobject%20Object%5D&name=c42c34e8d33d48ec8a63e56feeae882a.png&originHeight=1011&originWidth=1544&size=151855&status=done&style=none&width=1544)
|
||||
## 接口设计
|
||||
|
||||
|
||||
在 `executors` 包下,有如下几个 `executor` :
|
||||
|
||||
| Name | Margin value |
|
||||
| --- | --- |
|
||||
| `bulkexecutor` | 达到 `maxTasks` 【最大任务数】 提交 |
|
||||
| `chunkexecutor` | 达到 `maxChunkSize`【最大字节数】提交 |
|
||||
| `periodicalexecutor` | `basic executor` |
|
||||
| `delayexecutor` | 延迟执行传入的 `fn()` |
|
||||
| `lessexecutor` | |
|
||||
|
||||
|
||||
|
||||
你会看到除了有特殊功能的的 `delay`,`less` ,其余3个都是 `executor` + `container` 的组合设计:
|
||||
|
||||
|
||||
```go
|
||||
func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
|
||||
// 选项模式:在 go-zero 中多处出现。在多配置下,比较好的设计思路
|
||||
// https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
|
||||
options := newBulkOptions()
|
||||
for _, opt := range opts {
|
||||
opt(&options)
|
||||
}
|
||||
// 1. task container: [execute 真正做执行的函数] [maxTasks 执行临界点]
|
||||
container := &bulkContainer{
|
||||
execute: execute,
|
||||
maxTasks: options.cachedTasks,
|
||||
}
|
||||
// 2. 可以看出 bulkexecutor 底层依赖 periodicalexecutor
|
||||
executor := &BulkExecutor{
|
||||
executor: NewPeriodicalExecutor(options.flushInterval, container),
|
||||
container: container,
|
||||
}
|
||||
|
||||
return executor
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
而这个 `container`是个 `interface`:
|
||||
|
||||
|
||||
```go
|
||||
TaskContainer interface {
|
||||
// 把 task 加入 container
|
||||
AddTask(task interface{}) bool
|
||||
// 实际上是去执行传入的 execute func()
|
||||
Execute(tasks interface{})
|
||||
// 达到临界值,移除 container 中全部的 task,通过 channel 传递到 execute func() 执行
|
||||
RemoveAll() interface{}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
由此可见之间的依赖关系:
|
||||
|
||||
|
||||
- `bulkexecutor`:`periodicalexecutor` + `bulkContainer`
|
||||
- `chunkexecutor`:`periodicalexecutor` + `chunkContainer`
|
||||
|
||||
|
||||
|
||||
> 所以你想完成自己的 `executor`,可以实现 `container` 的这3个接口,再结合 `periodicalexecutor` 就行
|
||||
|
||||
|
||||
|
||||
所以回到👆那张图,我们的重点就放在 `periodicalexecutor`,看看它是怎么设计的?
|
||||
|
||||
|
||||
## 如何使用
|
||||
|
||||
|
||||
首先看看如何在业务中使用这个组件:
|
||||
|
||||
|
||||
现有一个定时服务,每天固定时间去执行从 `mysql` 到 `clickhouse` 的数据同步:
|
||||
|
||||
|
||||
```go
|
||||
type DailyTask struct {
|
||||
ckGroup *clickhousex.Cluster
|
||||
insertExecutor *executors.BulkExecutor
|
||||
mysqlConn sqlx.SqlConn
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
初始化 `bulkExecutor`:
|
||||
|
||||
|
||||
```go
|
||||
func (dts *DailyTask) Init() {
|
||||
// insertIntoCk() 是真正insert执行函数【需要开发者自己编写具体业务逻辑】
|
||||
dts.insertExecutor = executors.NewBulkExecutor(
|
||||
dts.insertIntoCk,
|
||||
executors.WithBulkInterval(time.Second*3), // 3s会自动刷一次container中task去执行
|
||||
executors.WithBulkTasks(10240), // container最大task数。一般设为2的幂次
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
> 额外介绍一下:`clickhouse` 适合大批量的插入,因为insert速度很快,大批量insert更能充分利用clickhouse
|
||||
|
||||
|
||||
|
||||
主体业务逻辑编写:
|
||||
|
||||
|
||||
```go
|
||||
func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
|
||||
for item := range ch {
|
||||
if r, vok := item.(*model.Task); !vok {
|
||||
continue
|
||||
}
|
||||
err := dts.insertExecutor.Add(r)
|
||||
if err != nil {
|
||||
r.Tag = sqlFromDb.Tag
|
||||
r.TagId = sqlFromDb.Id
|
||||
r.InsertId = genInsertId()
|
||||
r.ToRedis = toRedis == constant.INCACHED
|
||||
r.UpdateWay = sqlFromDb.UpdateWay
|
||||
// 1. Add Task
|
||||
err := dts.insertExecutor.Add(r)
|
||||
if err != nil {
|
||||
logx.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// 2. Flush Task container
|
||||
dts.insertExecutor.Flush()
|
||||
// 3. Wait All Task Finish
|
||||
dts.insertExecutor.Wait()
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
> 可能会疑惑为什么要 `Flush(), Wait()` ,后面会通过源码解析一下
|
||||
|
||||
|
||||
|
||||
使用上总体上3步:
|
||||
|
||||
|
||||
- `Add()`:加入task
|
||||
- `Flush()`:刷新 `container` 中的task
|
||||
- `Wait()`:等待全部的task执行完成
|
||||
|
||||
|
||||
|
||||
## 源码分析
|
||||
|
||||
|
||||
> 此处主要分析 `periodicalexecutor`,因为其他两个常用的 `executor` 都依赖它
|
||||
|
||||
|
||||
|
||||
### 初始化
|
||||
|
||||
|
||||
```go
|
||||
func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
|
||||
executor := &PeriodicalExecutor{
|
||||
commander: make(chan interface{}, 1),
|
||||
interval: interval,
|
||||
container: container,
|
||||
confirmChan: make(chan lang.PlaceholderType),
|
||||
newTicker: func(d time.Duration) timex.Ticker {
|
||||
return timex.NewTicker(interval)
|
||||
},
|
||||
}
|
||||
...
|
||||
return executor
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
- `commander`:传递 `tasks` 的 channel
|
||||
- `container`:暂存 `Add()` 的 task
|
||||
- `confirmChan`:阻塞 `Add()` ,在开始本次的 `executeTasks()` 会放开阻塞
|
||||
- `ticker`:定时器,防止 `Add()` 阻塞时,会有一个定时执行的机会,及时释放暂存的task
|
||||
|
||||
|
||||
|
||||
### Add()
|
||||
|
||||
|
||||
初始化完,在业务逻辑的第一步就是把 task 加入 `executor`:
|
||||
|
||||
|
||||
```go
|
||||
func (pe *PeriodicalExecutor) Add(task interface{}) {
|
||||
if vals, ok := pe.addAndCheck(task); ok {
|
||||
pe.commander <- vals
|
||||
<-pe.confirmChan
|
||||
}
|
||||
}
|
||||
|
||||
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
|
||||
pe.lock.Lock()
|
||||
defer func() {
|
||||
// 一开始为 false
|
||||
var start bool
|
||||
if !pe.guarded {
|
||||
// backgroundFlush() 会将 guarded 重新置反
|
||||
pe.guarded = true
|
||||
start = true
|
||||
}
|
||||
pe.lock.Unlock()
|
||||
// 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task
|
||||
if start {
|
||||
pe.backgroundFlush()
|
||||
}
|
||||
}()
|
||||
// 控制maxTask,>=maxTask 将container中tasks pop, return
|
||||
if pe.container.AddTask(task) {
|
||||
return pe.container.RemoveAll(), true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
`addAndCheck()` 中 `AddTask()` 就是在控制最大 tasks 数,如果超过就执行 `RemoveAll()` ,将暂存 `container` 的tasks pop,传递给 `commander` ,后面有goroutine循环读取,然后去执行 tasks。
|
||||
|
||||
|
||||
### backgroundFlush()
|
||||
|
||||
|
||||
开启一个后台协程,对 `container` 中的task,不断刷新:
|
||||
|
||||
|
||||
```go
|
||||
func (pe *PeriodicalExecutor) backgroundFlush() {
|
||||
// 封装 go func(){}
|
||||
threading.GoSafe(func() {
|
||||
ticker := pe.newTicker(pe.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
var commanded bool
|
||||
last := timex.Now()
|
||||
for {
|
||||
select {
|
||||
// 从channel拿到 []tasks
|
||||
case vals := <-pe.commander:
|
||||
commanded = true
|
||||
// 实质:wg.Add(1)
|
||||
pe.enterExecution()
|
||||
// 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入
|
||||
pe.confirmChan <- lang.Placeholder
|
||||
// 真正的执行 task 逻辑
|
||||
pe.executeTasks(vals)
|
||||
last = timex.Now()
|
||||
case <-ticker.Chan():
|
||||
if commanded {
|
||||
// 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
|
||||
// https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
|
||||
commanded = false
|
||||
} else if pe.Flush() {
|
||||
// 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
|
||||
last = timex.Now()
|
||||
} else if timex.Since(last) > pe.interval*idleRound {
|
||||
// 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush()
|
||||
// 只有这置反,才会开启一个新的 backgroundFlush() 后台协程
|
||||
pe.guarded = false
|
||||
// 再次刷新,防止漏掉
|
||||
pe.Flush()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
总体两个过程:
|
||||
|
||||
|
||||
- `commander` 接收到 `RemoveAll()` 传递来的tasks,然后做执行,并放开 `Add()` 的阻塞,得以继续 `Add()`
|
||||
- `ticker` 到时间了,如果第一步没有执行,则自动 `Flush()` ,也会去做task的执行
|
||||
|
||||
|
||||
|
||||
### Wait()
|
||||
|
||||
|
||||
在 `backgroundFlush()` ,提到一个函数:`enterExecution()`:
|
||||
|
||||
|
||||
```go
|
||||
func (pe *PeriodicalExecutor) enterExecution() {
|
||||
pe.wgBarrier.Guard(func() {
|
||||
pe.waitGroup.Add(1)
|
||||
})
|
||||
}
|
||||
|
||||
func (pe *PeriodicalExecutor) Wait() {
|
||||
pe.wgBarrier.Guard(func() {
|
||||
pe.waitGroup.Wait()
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
这样列举就知道为什么之前为什么在最后要带上 `dts.insertExecutor.Wait()`,当然要等待全部的 `goroutine task` 完成。
|
||||
|
||||
|
||||
## 思考
|
||||
|
||||
|
||||
在看源码中,思考了一些其他设计上的思路,大家是否也有类似的问题:
|
||||
|
||||
|
||||
- 在分析 `executors` 中,会发现很多地方都有 `lock`
|
||||
|
||||
|
||||
|
||||
> `go test` 存在竞态,使用加锁来避免这种情况
|
||||
|
||||
|
||||
|
||||
- 在分析 `confirmChan` 发现,在此次[提交](https://github.com/tal-tech/go-zero/commit/9d9399ad1014c171cc9bd9c87f78b5d2ac238ce4)才出现,为什么会这么设计?
|
||||
|
||||
|
||||
|
||||
> 之前是:`wg.Add(1)` 是写在 `executeTasks()` ;现在是:先`wg.Add(1)`,再放开 `confirmChan` 阻塞
|
||||
> 如果 `executor func` 执行阻塞,`Add task` 还在进行,因为没有阻塞,可能很快执行到 `Executor.Wait()`,这是就会出现 `wg.Wait()` 在 `wg.Add()` 前执行,这会 `panic`
|
||||
|
||||
|
||||
|
||||
具体可以看最新版本的`TestPeriodicalExecutor_WaitFast()` ,不妨跑在此版本上,就可以重现
|
||||
|
||||
|
||||
## 总结
|
||||
|
||||
|
||||
剩余还有几个 `executors` 的分析,就留给大家去看看源码。
|
||||
|
||||
|
||||
总之,整体设计上:
|
||||
|
||||
|
||||
- 遵循面向接口设计
|
||||
- 灵活使用 `channel` ,`waitgroup` 等并发工具
|
||||
- 执行单元+存储单元的搭配使用
|
||||
|
||||
|
||||
|
||||
在 `go-zero` 中还有很多实用的组件工具,用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。
|
256
docs/frame/core-logger.md
Normal file
@ -0,0 +1,256 @@
|
||||
# 2.1 日志
|
||||
|
||||
# 基本使用
|
||||
## Example
|
||||
|
||||
|
||||
logx 的简单使用如下:
|
||||
|
||||
|
||||
```go
|
||||
var c logx.LogConf
|
||||
// logx 根据配置初始化
|
||||
logx.MustSetup(c)
|
||||
|
||||
logx.Info("This is info!")
|
||||
logx.Infof("This is %s!", "info")
|
||||
|
||||
logx.Error("This is error!")
|
||||
logx.Errorf("this is %s!", "error")
|
||||
|
||||
logx.Close()
|
||||
```
|
||||
|
||||
|
||||
## 初始化
|
||||
|
||||
|
||||
logx 有很多可以配置项,可以参考 logx.LogConf 中的定义。目前可以使用
|
||||
|
||||
|
||||
```go
|
||||
logx.MustSetUp(c)
|
||||
```
|
||||
|
||||
|
||||
进行初始化配置,如果没有进行初始化配置,所有的配置将使用默认配置。
|
||||
|
||||
|
||||
## Level
|
||||
|
||||
|
||||
logx 支持的打印日志级别有:
|
||||
|
||||
|
||||
- info
|
||||
- error
|
||||
- server
|
||||
- fatal
|
||||
- slow
|
||||
- stat
|
||||
|
||||
|
||||
|
||||
可以使用对应的方法打印出对应级别的日志。
|
||||
|
||||
|
||||
同时为了方便调试,线上使用,可以动态调整日志打印级别,其中可以通过 **logx.SetLevel(uint32)** 进行级别设置,也可以通过配置初始化进行设置。目前支持的参数为:
|
||||
|
||||
|
||||
```go
|
||||
const (
|
||||
// 打印所有级别的日志
|
||||
InfoLevel = iotas
|
||||
// 打印 errors, slows, stacks 日志
|
||||
ErrorLevel
|
||||
// 仅打印 server 级别日志
|
||||
SevereLevel
|
||||
)
|
||||
```
|
||||
|
||||
|
||||
## 日志模式
|
||||
|
||||
|
||||
目前日志打印模式主要分为2种,一种文件输出,一种控制台输出。推荐方式,当采用 k8s,docker 等部署方式的时候,可以将日志输出到控制台,使用日志收集器收集导入至 es 进行日志分析。如果是直接部署方式,可以采用文件输出方式,logx 会自动在指定文件目录创建对应 5 个对应级别的的日志文件保存日志。
|
||||
|
||||
|
||||
```bash
|
||||
.
|
||||
├── access.log
|
||||
├── error.log
|
||||
├── severe.log
|
||||
├── slow.log
|
||||
└── stat.log
|
||||
```
|
||||
|
||||
|
||||
同时会按照自然日进行文件分割,当超过指定配置天数,会对日志文件进行自动删除,打包等操作。
|
||||
|
||||
|
||||
## 禁用日志
|
||||
|
||||
|
||||
如果不需要日志打印,可以使用 **logx.Close()** 关闭日志输出。注意,当禁用日志输出,将无法在次打开,具体可以参考 **logx.RotateLogger** 和 **logx.DailyRotateRule** 的实现。
|
||||
|
||||
|
||||
## 关闭日志
|
||||
|
||||
|
||||
因为 logx 采用异步进行日志输出,如果没有正常关闭日志,可能会造成部分日志丢失的情况。必须在程序退出的地方关闭日志输出:
|
||||
|
||||
|
||||
```go
|
||||
logx.Close()
|
||||
```
|
||||
|
||||
|
||||
框架中 rest 和 zrpc 等大部分地方已经做好了日志配置和关闭相关操作,用户可以不用关心。
|
||||
|
||||
|
||||
同时注意,当关闭日志输出之后,将无法在次打印日志了。
|
||||
|
||||
|
||||
推荐写法:
|
||||
|
||||
|
||||
```go
|
||||
import "github.com/tal-tech/go-zero/core/proc"
|
||||
|
||||
// grace close log
|
||||
proc.AddShutdownListener(func() {
|
||||
logx.Close()
|
||||
})
|
||||
```
|
||||
|
||||
|
||||
## Duration
|
||||
|
||||
|
||||
我们打印日志的时候可能需要打印耗时情况,可以使用 **logx.WithDuration(time.Duration)**, 参考如下示例:
|
||||
|
||||
|
||||
```go
|
||||
startTime := timex.Now()
|
||||
// 数据库查询
|
||||
rows, err := conn.Query(q, args...)
|
||||
duration := timex.Since(startTime)
|
||||
if duration > slowThreshold {
|
||||
logx.WithDuration(duration).Slowf("[SQL] query: slowcall - %s", stmt)
|
||||
} else {
|
||||
logx.WithDuration(duration).Infof("sql query: %s", stmt)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
会输出如下格式
|
||||
|
||||
|
||||
```json
|
||||
{"@timestamp":"2020-09-12T01:22:55.552+08","level":"info","duration":"3.0ms","content":"sql query:..."}
|
||||
{"@timestamp":"2020-09-12T01:22:55.552+08","level":"slow","duration":"500ms","content":"[SQL] query: slowcall - ..."}
|
||||
```
|
||||
|
||||
|
||||
这样就可以很容易统计出慢 sql 相关信息。
|
||||
|
||||
|
||||
## TraceLog
|
||||
|
||||
|
||||
tracingEntry 是为了链路追踪日志输出定制的。可以打印 context 中的 traceId 和 spanId 信息,配合我们的 **rest** 和 **zrpc** 很容易完成链路日志的相关打印。示例如下
|
||||
|
||||
|
||||
```go
|
||||
logx.WithContext(context.Context).Info("This is info!")
|
||||
```
|
||||
|
||||
|
||||
## SysLog
|
||||
|
||||
|
||||
应用中可能有部分采用系统 log 进行日志打印,logx 同样封装方法,很容易将 log 相关的日志收集到 logx 中来。
|
||||
|
||||
|
||||
```go
|
||||
logx.CollectSysLog()
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
# 日志配置相关
|
||||
|
||||
|
||||
**LogConf** 定义日志系统所需的基本配置
|
||||
|
||||
|
||||
完整定义如下:
|
||||
|
||||
|
||||
```go
|
||||
type LogConf struct {
|
||||
ServiceName string `json:",optional"`
|
||||
Mode string `json:",default=console,options=console|file|volume"`
|
||||
Path string `json:",default=logs"`
|
||||
Level string `json:",default=info,options=info|error|severe"`
|
||||
Compress bool `json:",optional"`
|
||||
KeepDays int `json:",optional"`
|
||||
StackCooldownMillis int `json:",default=100"`
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Mode
|
||||
|
||||
|
||||
**Mode** 定义了日志打印的方式。默认的模式是 **console**, 打印到控制台上面。
|
||||
|
||||
|
||||
目前支持的模式如下:
|
||||
|
||||
|
||||
- console
|
||||
- 打印到控制台
|
||||
- file
|
||||
- 打印到指定路径下的access.log, error.log, stat.log等文件里
|
||||
- volume
|
||||
- 为了在k8s内打印到mount进来的存储上,因为多个pod可能会覆盖相同的文件,volume模式自动识别pod并按照pod分开写各自的日志文件
|
||||
|
||||
|
||||
|
||||
## Path
|
||||
|
||||
|
||||
**Path** 定义了文件日志的输出路径,默认值为 **logs**。
|
||||
|
||||
|
||||
## Level
|
||||
|
||||
|
||||
**Level** 定义了日志打印级别,默认值为 **info**。
|
||||
目前支持的级别如下:
|
||||
|
||||
|
||||
- info
|
||||
- error
|
||||
- severe
|
||||
|
||||
|
||||
|
||||
## Compress
|
||||
|
||||
|
||||
**Compress** 定义了日志是否需要压缩,默认值为 **false**。在 Mode 为 file 模式下面,文件最后会进行打包压缩成 .gz 文件。
|
||||
|
||||
|
||||
## KeepDays
|
||||
|
||||
|
||||
**KeepDays** 定义日志最大保留天数,默认值为 0,表示不会删除旧的日志。在 Mode 为 file 模式下面,如果超过了最大保留天数,旧的日志文件将会被删除。
|
||||
|
||||
|
||||
## StackCooldownMillis
|
||||
|
||||
|
||||
**StackCooldownMillis** 定义了日志输出间隔,默认为 100 毫秒。// TODO:
|
141
docs/frame/core-redis.md
Normal file
@ -0,0 +1,141 @@
|
||||
# 2.5 redis lock
|
||||
|
||||
既然是锁,首先想到的一个作用就是:**防重复点击,在一个时间点只有一个请求产生效果**。
|
||||
|
||||
|
||||
而既然是 `redis`,就得具有排他性,同时也具有锁的一些共性:
|
||||
|
||||
|
||||
- 高性能
|
||||
- 不能出现死锁
|
||||
- 不能出现节点down掉后加锁失败
|
||||
|
||||
|
||||
|
||||
`go-zero` 中利用 redis `set key nx` 可以保证key不存在时写入成功,`px` 可以让key超时后自动删除「最坏情况也就是超时自动删除key,从而也不会出现死锁」
|
||||
|
||||
|
||||
## example
|
||||
|
||||
|
||||
```go
|
||||
redisLockKey := fmt.Sprintf("%v%v", redisTpl, headId)
|
||||
// 1. New redislock
|
||||
redisLock := redis.NewRedisLock(redisConn, redisLockKey)
|
||||
// 2. 可选操作,设置 redislock 过期时间
|
||||
redisLock.SetExpire(redisLockExpireSeconds)
|
||||
if ok, err := redisLock.Acquire(); !ok || err != nil {
|
||||
return nil, errors.New("当前有其他用户正在进行操作,请稍后重试")
|
||||
}
|
||||
defer func() {
|
||||
recover()
|
||||
// 3. 释放锁
|
||||
redisLock.Release()
|
||||
}()
|
||||
```
|
||||
|
||||
|
||||
和你在使用 `sync.Mutex` 的方式时一致的。加锁解锁,执行你的业务操作。
|
||||
|
||||
|
||||
## 获取锁
|
||||
|
||||
|
||||
```go
|
||||
lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
|
||||
return "OK"
|
||||
else
|
||||
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
|
||||
end`
|
||||
|
||||
func (rl *RedisLock) Acquire() (bool, error) {
|
||||
seconds := atomic.LoadUint32(&rl.seconds)
|
||||
// execute luascript
|
||||
resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{
|
||||
rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance)})
|
||||
if err == red.Nil {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())
|
||||
return false, err
|
||||
} else if resp == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
reply, ok := resp.(string)
|
||||
if ok && reply == "OK" {
|
||||
return true, nil
|
||||
} else {
|
||||
logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
先介绍几个 `redis` 的命令选项,以下是为 `set` 命令增加的选项:
|
||||
|
||||
|
||||
- `ex seconds` :设置key过期时间,单位s
|
||||
- `px milliseconds` :设置key过期时间,单位毫秒
|
||||
- `nx`:key不存在时,设置key的值
|
||||
- `xx`:key存在时,才会去设置key的值
|
||||
|
||||
|
||||
|
||||
其中 `lua script` 涉及的入参:
|
||||
|
||||
|
||||
|
||||
| **args** | **示例** | **含义** |
|
||||
| --- | --- | --- |
|
||||
| KEYS[1] | key$20201026 | redis key |
|
||||
| ARGV[1] | lmnopqrstuvwxyzABCD | 唯一标识:随机字符串 |
|
||||
| ARGV[2] | 30000 | 设置锁的过期时间 |
|
||||
|
||||
|
||||
|
||||
然后来说说代码特性:
|
||||
|
||||
|
||||
1. `Lua` 脚本保证原子性「当然,把多个操作在 Redis 中实现成一个操作,也就是单命令操作」
|
||||
1. 使用了 `set key value px milliseconds nx`
|
||||
1. `value` 具有唯一性
|
||||
1. 加锁时首先判断 `key` 的 `value` 是否和之前设置的一致,一致则修改过期时间
|
||||
|
||||
|
||||
|
||||
## 释放锁
|
||||
|
||||
|
||||
```go
|
||||
delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end`
|
||||
|
||||
func (rl *RedisLock) Release() (bool, error) {
|
||||
resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if reply, ok := resp.(int64); !ok {
|
||||
return false, nil
|
||||
} else {
|
||||
return reply == 1, nil
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
释放锁的时候只需要关注一点:
|
||||
|
||||
|
||||
**不能释放别人的锁,不能释放别人的锁,不能释放别人的锁**
|
||||
|
||||
|
||||
所以需要先 `get(key) == value「key」`,为 true 才会去 `delete`
|
||||
``
|
254
docs/frame/core-streamapi.md
Normal file
@ -0,0 +1,254 @@
|
||||
# 2.4 stream api
|
||||
|
||||
之前已经有文章介绍过 `MapReduce` ,这是一个并发处理的利器。而 `fx` 是一个完备的流式处理组件。
|
||||
|
||||
|
||||
和 `MapReduce` 类似的,`fx` 也存在并发处理的函数:`Parallel(fn, options)`。但同时它也不只有并发处理。`From(chan)` ,`Map(fn)`,`Filter(fn)`,`Reduce(fn)` 等,从数据源读取成流,到处理流数据,最后聚合流数据。是不是有点像 `Java Lambda` ,如果你之前是 `Java` 开发者,看到这也就明白整个基本设计。
|
||||
|
||||
|
||||
## 整体API
|
||||
|
||||
|
||||
还是从整体上概览 `fx` 到底是怎么构建的:
|
||||
![dc500acd526d40aabfe4f53cf5bd180a_tplv-k3u1fbpfcp-zoom-1.png](https://cdn.nlark.com/yuque/0/2020/png/2623842/1601801167610-6605f862-bdbf-469a-8aba-6d9d6ecca5a7.png#align=left&display=inline&height=602&margin=%5Bobject%20Object%5D&name=dc500acd526d40aabfe4f53cf5bd180a_tplv-k3u1fbpfcp-zoom-1.png&originHeight=602&originWidth=604&size=81255&status=done&style=none&width=604)
|
||||
被标注的部分就是整个 `fx` 最重要的部分:
|
||||
|
||||
|
||||
1. 由 `From(fn)` 这类API,产生数据流 `Stream`
|
||||
2. 对`Stream` 转换,聚合,求值的API集合
|
||||
|
||||
|
||||
|
||||
所以列举出目前支持的 `Stream API`:
|
||||
|
||||
| API | 作用 |
|
||||
| --- | --- |
|
||||
| `Distinct(fn)` | fn中选定特定item类型,对其去重 |
|
||||
| `Filter(fn, option)` | fn指定具体规则,满足规则的`element`传递给下一个 `stream` |
|
||||
| `Group(fn)` | 根据fn把`stream`中的element分到不同的组中 |
|
||||
| `Head(num)` | 取出`stream`中前 num 个element ,生成一个新的`stream` |
|
||||
| `Map(fn, option)` | 将每个ele转换为另一个对应的ele, 传递给下一个 `stream` |
|
||||
| `Merge()` | 将所有`ele`合并到一个`slice`中并生成一个新`stream` |
|
||||
| `Reverse()` | 反转`stream`中的element。【使用双指针】 |
|
||||
| `Sort(fn)` | 按照 fn 排序`stream`中的element |
|
||||
| `Tail(num)` | 取出`stream`最后的 num 个element,生成一个新 `stream`。【使用双向环状链表】 |
|
||||
| `Walk(fn, option)` | 把 fn 作用在 `source` 的每个元素。生成新的 `stream` |
|
||||
|
||||
|
||||
|
||||
不再生成新的 `stream`,做最后的求值操作:
|
||||
|
||||
| API | 作用 |
|
||||
| --- | --- |
|
||||
| `ForAll(fn)` | 按照fn处理`stream`,且不再产生`stream`【求值操作】 |
|
||||
| `ForEach(fn)` | 对 `stream` 中所有 element 执行fn【求值操作】 |
|
||||
| `Parallel(fn, option)` | 将给定的fn与给定的worker数量并发应用于每个`element`【求值操作】 |
|
||||
| `Reduce(fn)` | 直接处理`stream`【求值操作】 |
|
||||
| `Done()` | 啥也不做,等待所有的操作完成 |
|
||||
|
||||
|
||||
|
||||
## 如何使用
|
||||
|
||||
|
||||
```go
|
||||
result := make(map[string]string)
|
||||
fx.From(func(source chan<- interface{}) {
|
||||
for _, item := range data {
|
||||
source <- item
|
||||
}
|
||||
}).Walk(func(item interface{}, pipe chan<- interface{}) {
|
||||
each := item.(*model.ClassData)
|
||||
|
||||
class, err := l.rpcLogic.GetClassInfo()
|
||||
if err != nil {
|
||||
l.Errorf("get class %s failed: %s", each.ClassId, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
students, err := l.rpcLogic.GetUsersInfo(class.ClassId)
|
||||
if err != nil {
|
||||
l.Errorf("get students %s failed: %s", each.ClassId, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
pipe <- &classObj{
|
||||
classId: each.ClassId
|
||||
studentIds: students
|
||||
}
|
||||
}).ForEach(func(item interface{}) {
|
||||
o := item.(*classObj)
|
||||
result[o.classId] = o.studentIds
|
||||
})
|
||||
```
|
||||
|
||||
|
||||
1. `From()` 从一个 `slice` 生成 `stream`
|
||||
2. `Walk()` 接收与一个 `stream` ,对流中每个 `ele` 转换重组,生成新的 `stream`
|
||||
3. 最后由 `求值操作` 把 `stream` 输出(`fmt.Println`),存储(`map,slice`),持久化(`db操作`)
|
||||
|
||||
|
||||
|
||||
## 简要分析
|
||||
|
||||
|
||||
`fx` 中的函数命名语义化,开发者只需要知道业务逻辑需要什么样的转换,调用与之匹配的函数即可。
|
||||
|
||||
|
||||
所以这里只简要分析几个比较典型的函数。
|
||||
|
||||
|
||||
### Walk()
|
||||
|
||||
|
||||
`Walk()` 在整个 `fx` 被多个函数当成底层实现,`Map(), Filter()` 等。
|
||||
|
||||
|
||||
所以本质就是:`Walk()` 负责并发将传进来的函数作用在 **输入流** 的每个 `ele`,并 生成新的 `stream`。
|
||||
|
||||
|
||||
跟到源码,分成两个子函数:自定义 `worker` 数,默认 `worker` 数
|
||||
|
||||
|
||||
```go
|
||||
// 自定义 workder 数
|
||||
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
|
||||
pipe := make(chan interface{}, option.workers)
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
// channel<- 如果达到设定workers数,channel阻塞,从而达到控制并发数。
|
||||
// 简易的 goroutine pool
|
||||
pool := make(chan lang.PlaceholderType, option.workers)
|
||||
|
||||
for {
|
||||
// 每一次for循环都会开启一个goroutine。如果达到workers数,从而阻塞
|
||||
pool <- lang.Placeholder
|
||||
item, ok := <-p.source
|
||||
if !ok {
|
||||
<-pool
|
||||
break
|
||||
}
|
||||
// 使用 waitgroup 保证任务完成的完整性
|
||||
wg.Add(1)
|
||||
threading.GoSafe(func() {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
// 归还
|
||||
<-pool
|
||||
}()
|
||||
|
||||
fn(item, pipe)
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(pipe)
|
||||
}()
|
||||
|
||||
return Range(pipe)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
- 使用 `有缓冲channel` 做并发队列,限制并发数
|
||||
- `waitgroup` 保证任务完成的完整性
|
||||
|
||||
|
||||
|
||||
另外一个 `walkUnlimited()`:也使用了 `waitgroup` 做并发控制,因为没有自定义并发数限制,所以也就没有另外一个 `channel` 做并发数控制。
|
||||
|
||||
|
||||
### Tail()
|
||||
|
||||
|
||||
介绍这个主要是里面运用了 `ring` 这个双向链表,其中的简单算法还是很有意思的。
|
||||
|
||||
|
||||
```go
|
||||
func (p Stream) Tail(n int64) Stream {
|
||||
source := make(chan interface{})
|
||||
|
||||
go func() {
|
||||
ring := collection.NewRing(int(n))
|
||||
// “顺序”插入,源的顺序和ring的顺序一致
|
||||
for item := range p.source {
|
||||
ring.Add(item)
|
||||
}
|
||||
// 取出 ring 中全部的 item
|
||||
for _, item := range ring.Take() {
|
||||
source <- item
|
||||
}
|
||||
close(source)
|
||||
}()
|
||||
|
||||
return Range(source)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
至于为什么 `Tail()` 可以做到把源的后n个取出来,这个就留给大家去细品了。这里给出我的理解:
|
||||
![f93c621571074e44a2d403aa25e7db6f_tplv-k3u1fbpfcp-zoom-1.png](https://cdn.nlark.com/yuque/0/2020/png/2623842/1601801234503-b3155cae-e279-4240-b9c2-6c6580b467bc.png#align=left&display=inline&height=393&margin=%5Bobject%20Object%5D&name=f93c621571074e44a2d403aa25e7db6f_tplv-k3u1fbpfcp-zoom-1.png&originHeight=393&originWidth=828&size=82105&status=done&style=none&width=828)
|
||||
> 假设有以下这个场景,`Tail(5)`
|
||||
> - `stream size` :7
|
||||
> - `ring size`:5
|
||||
>
|
||||
这里可以使用把环状链表拉开的方式,**环转线**,此时以全部长度划分对称轴,翻转多余的元素,之后的元素就是 `Tail(5)` 需要的部分了。
|
||||
> 这里采用图的方式更清晰的表现,不过代码大家也要看看。算法要考的 ![](https://gw.alipayobjects.com/os/lib/twemoji/11.2.0/2/svg/1f528.svg#align=left&display=inline&height=18&margin=%5Bobject%20Object%5D&originHeight=150&originWidth=150&status=done&style=none&width=18)
|
||||
|
||||
|
||||
|
||||
### Stream Transform Design
|
||||
|
||||
|
||||
分析整个 `fx` ,会发现整体设计遵循一个设计模版:
|
||||
|
||||
|
||||
```go
|
||||
func (p Stream) Transform(fn func(item interface{}) interface{}) Stream {
|
||||
// make channel
|
||||
source := make(chan interface{})
|
||||
// goroutine worker
|
||||
go func() {
|
||||
// tranform
|
||||
for item := range p.source {
|
||||
...
|
||||
source <- item
|
||||
...
|
||||
}
|
||||
...
|
||||
// 关闭输入,但是依然可以从这个 stream 输出。防止内存泄漏
|
||||
close(source)
|
||||
}()
|
||||
// channel -> stream
|
||||
return Range(source)
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
- `channel` 作为流的容器
|
||||
- 开 `goroutine` 对 `source` 做转换,聚合,输送到 `channel`
|
||||
- 处理完毕,`close(outputStream)`
|
||||
- `channel -> stream`
|
||||
|
||||
|
||||
|
||||
## 总结
|
||||
|
||||
|
||||
到这就把 `fx` 基本介绍完了,如果你对其他API源码感兴趣,可以跟着上面的 API 列表挨个读一读。
|
||||
|
||||
|
||||
同时也建议大家把 `java stream` 的API大致看看,对这种 `stream call` 理解可以更加深 。
|
||||
|
||||
|
||||
同时在 `go-zero` 中还有很多实用的组件工具,用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。
|
||||
|
||||
|
||||
## 参考资料
|
||||
|
||||
|
||||
- [go-zero](https://github.com/tal-tech/go-zero)
|
||||
- [go-zero 文档](https://www.yuque.com/tal-tech/go-zero)
|
||||
- [Java Stream 详解](https://colobu.com/2016/03/02/Java-Stream/)
|
||||
- [Java 8中Stream API](https://mp.weixin.qq.com/s/xa98C-QUHRUK0BhWLzI3XQ)
|
127
docs/frame/readme.md
Normal file
@ -0,0 +1,127 @@
|
||||
# go-zero简介
|
||||
|
||||
|
||||
|
||||
## 0. go-zero介绍
|
||||
go-zero是一个集成了各种工程实践的web和rpc框架。通过弹性设计保障了大并发服务端的稳定性,经受了充分的实战检验。
|
||||
|
||||
|
||||
go-zero 包含极简的 API 定义和生成工具 goctl,可以根据定义的 api 文件一键生成 Go, iOS, Android, Kotlin, Dart, TypeScript, JavaScript 代码,并可直接运行。
|
||||
|
||||
|
||||
使用go-zero的好处:
|
||||
|
||||
|
||||
- 轻松获得支撑千万日活服务的稳定性
|
||||
- 内建级联超时控制、限流、自适应熔断、自适应降载等微服务治理能力,无需配置和额外代码
|
||||
- 微服务治理中间件可无缝集成到其它现有框架使用
|
||||
- 极简的API描述,一键生成各端代码
|
||||
- 自动校验客户端请求参数合法性
|
||||
- 大量微服务治理和并发工具包
|
||||
|
||||
![architecture.png](https://cdn.nlark.com/yuque/0/2020/png/2623842/1601207386607-cb4ad2bb-d1e6-4f1d-83b3-50b69883b6fa.png#align=left&display=inline&height=1422&margin=%5Bobject%20Object%5D&name=architecture.png&originHeight=1422&originWidth=2230&size=341219&status=done&style=none&width=2230)
|
||||
## 1. go-zero框架背景
|
||||
18年初,晓黑板后端在经过频繁的宕机后,决定从`Java+MongoDB`的单体架构迁移到微服务架构,经过仔细思考和对比,我们决定:
|
||||
|
||||
|
||||
- 基于Go语言
|
||||
- 高效的性能
|
||||
- 简洁的语法
|
||||
- 广泛验证的工程效率
|
||||
- 极致的部署体验
|
||||
- 极低的服务端资源成本
|
||||
- 自研微服务框架
|
||||
- 有过很多微服务框架自研经验
|
||||
- 需要有更快速的问题定位能力
|
||||
- 更便捷的增加新特性
|
||||
|
||||
|
||||
|
||||
## 2. go-zero框架设计思考
|
||||
对于微服务框架的设计,我们期望保障微服务稳定性的同时,也要特别注重研发效率。所以设计之初,我们就有如下一些准则:
|
||||
|
||||
|
||||
- 保持简单
|
||||
- 高可用
|
||||
- 高并发
|
||||
- 易扩展
|
||||
- 弹性设计,面向故障编程
|
||||
- 尽可能对业务开发友好,封装复杂度
|
||||
- 尽可能约束做一件事只有一种方式
|
||||
|
||||
|
||||
|
||||
我们经历不到半年时间,彻底完成了从`Java+MongoDB`到`Golang+MySQL`为主的微服务体系迁移,并于18年8月底完全上线,稳定保障了晓黑板后续增长,确保了整个服务的高可用。
|
||||
|
||||
|
||||
## 3. go-zero项目实现和特点
|
||||
go-zero是一个集成了各种工程实践的包含web和rpc框架,有如下主要特点:
|
||||
|
||||
|
||||
- 强大的工具支持,尽可能少的代码编写
|
||||
- 极简的接口
|
||||
- 完全兼容net/http
|
||||
- 支持中间件,方便扩展
|
||||
- 高性能
|
||||
- 面向故障编程,弹性设计
|
||||
- 内建服务发现、负载均衡
|
||||
- 内建限流、熔断、降载,且自动触发,自动恢复
|
||||
- API参数自动校验
|
||||
- 超时级联控制
|
||||
- 自动缓存控制
|
||||
- 链路跟踪、统计报警等
|
||||
- 高并发支撑,稳定保障了晓黑板疫情期间每天的流量洪峰
|
||||
|
||||
|
||||
|
||||
如下图,我们从多个层面保障了整体服务的高可用:
|
||||
![resilience.jpg](https://cdn.nlark.com/yuque/0/2020/jpeg/2623842/1601207450697-941ebb46-e89a-46d8-9378-55c446f8b8f2.jpeg#align=left&display=inline&height=902&margin=%5Bobject%20Object%5D&name=resilience.jpg&originHeight=902&originWidth=2218&size=153038&status=done&style=none&width=2218)
|
||||
## 4. Installation
|
||||
在项目目录下通过如下命令安装:
|
||||
|
||||
|
||||
```shell
|
||||
GOPROXY=https://goproxy.cn/,direct go get -u github.com/tal-tech/go-zero
|
||||
```
|
||||
|
||||
|
||||
## 5. Quick Start
|
||||
|
||||
1. 安装goctl工具
|
||||
`goctl`读作`go control`,不要读成`go C-T-L`。`goctl`的意思是不要被代码控制,而是要去控制它。其中的`go`不是指`golang`。在设计`goctl`之初,我就希望通过`她`来解放我们的双手👈```shell
|
||||
GO111MODULE=on GOPROXY=https://goproxy.cn/,direct go get -u github.com/tal-tech/go-zero/tools/goctl
|
||||
```
|
||||
|
||||
确保goctl可执行
|
||||
1. 快速生成api服务```shell
|
||||
goctl api new greet
|
||||
cd greet
|
||||
go run greet.go -f etc/greet-api.yaml
|
||||
```
|
||||
|
||||
默认侦听在8888端口(可以在配置文件里修改),可以通过curl请求:```shell
|
||||
curl -i http://localhost:8888/greet/from/you
|
||||
```
|
||||
|
||||
返回如下:```http
|
||||
HTTP/1.1 200 OK
|
||||
Date: Sun, 30 Aug 2020 15:32:35 GMT
|
||||
Content-Length: 0
|
||||
```
|
||||
|
||||
编写业务代码:
|
||||
- api文件定义了服务对外暴露的路由,可参考[api规范](https://github.com/tal-tech/zero-doc/blob/main/doc/goctl.md)
|
||||
- 可以在servicecontext.go里面传递依赖给logic,比如mysql, redis等
|
||||
- 在api定义的get/post/put/delete等请求对应的logic里增加业务处理逻辑
|
||||
3. 可以根据api文件生成前端需要的Java, TypeScript, Dart, JavaScript代码```shell
|
||||
goctl api java -api greet.api -dir greet
|
||||
goctl api dart -api greet.api -dir greet
|
||||
...
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
## 6. go-zero背后的设计思考
|
||||
go-zero背后的思考,从整体上来理解微服务系统设计:
|
||||
[https://www.bilibili.com/video/BV1rD4y127PD/](https://www.bilibili.com/video/BV1rD4y127PD/)
|
BIN
docs/images/api-gen.png
Normal file
After Width: | Height: | Size: 96 KiB |
BIN
docs/images/architecture-en.png
Normal file
After Width: | Height: | Size: 286 KiB |
BIN
docs/images/architecture.png
Normal file
After Width: | Height: | Size: 333 KiB |
BIN
docs/images/balancer.png
Normal file
After Width: | Height: | Size: 110 KiB |
BIN
docs/images/benchmark.png
Normal file
After Width: | Height: | Size: 26 KiB |
BIN
docs/images/bookstore-api.png
Normal file
After Width: | Height: | Size: 104 KiB |
BIN
docs/images/bookstore-arch.png
Normal file
After Width: | Height: | Size: 159 KiB |
BIN
docs/images/bookstore-benchmark.png
Normal file
After Width: | Height: | Size: 112 KiB |
BIN
docs/images/bookstore-model.png
Normal file
After Width: | Height: | Size: 77 KiB |
BIN
docs/images/bookstore-rpc.png
Normal file
After Width: | Height: | Size: 92 KiB |
BIN
docs/images/concurrent_denpendency.png
Normal file
After Width: | Height: | Size: 61 KiB |
BIN
docs/images/datasource.png
Normal file
After Width: | Height: | Size: 72 KiB |
BIN
docs/images/fx_log.png
Normal file
After Width: | Height: | Size: 72 KiB |
BIN
docs/images/fx_middle.png
Normal file
After Width: | Height: | Size: 65 KiB |
BIN
docs/images/fx_reverse.png
Normal file
After Width: | Height: | Size: 41 KiB |
BIN
docs/images/fx_step_result.png
Normal file
After Width: | Height: | Size: 85 KiB |
BIN
docs/images/go-zero.png
Normal file
After Width: | Height: | Size: 84 KiB |
BIN
docs/images/interceptor.png
Normal file
After Width: | Height: | Size: 124 KiB |
BIN
docs/images/model-gen.png
Normal file
After Width: | Height: | Size: 86 KiB |
BIN
docs/images/mr.png
Normal file
After Width: | Height: | Size: 62 KiB |
BIN
docs/images/mr_time.png
Normal file
After Width: | Height: | Size: 104 KiB |
BIN
docs/images/panel.png
Normal file
After Width: | Height: | Size: 457 KiB |
BIN
docs/images/prom_up.png
Normal file
After Width: | Height: | Size: 109 KiB |
BIN
docs/images/prometheus.png
Normal file
After Width: | Height: | Size: 95 KiB |
BIN
docs/images/qps.png
Normal file
After Width: | Height: | Size: 121 KiB |
BIN
docs/images/qps_panel.png
Normal file
After Width: | Height: | Size: 99 KiB |
BIN
docs/images/qq.jpg
Normal file
After Width: | Height: | Size: 122 KiB |
BIN
docs/images/random_pseudo.png
Normal file
After Width: | Height: | Size: 25 KiB |
BIN
docs/images/resilience-en.png
Normal file
After Width: | Height: | Size: 170 KiB |
BIN
docs/images/resilience.jpg
Normal file
After Width: | Height: | Size: 150 KiB |
BIN
docs/images/resolver.png
Normal file
After Width: | Height: | Size: 154 KiB |
BIN
docs/images/rpc-gen.png
Normal file
After Width: | Height: | Size: 83 KiB |
BIN
docs/images/shedding_flying.jpg
Normal file
After Width: | Height: | Size: 158 KiB |
BIN
docs/images/shorturl-api.png
Normal file
After Width: | Height: | Size: 104 KiB |
BIN
docs/images/shorturl-arch.png
Normal file
After Width: | Height: | Size: 126 KiB |
BIN
docs/images/shorturl-benchmark.png
Normal file
After Width: | Height: | Size: 111 KiB |
BIN
docs/images/shorturl-model.png
Normal file
After Width: | Height: | Size: 77 KiB |
BIN
docs/images/shorturl-rpc.png
Normal file
After Width: | Height: | Size: 92 KiB |
BIN
docs/images/timewheel-run.png
Normal file
After Width: | Height: | Size: 660 KiB |
BIN
docs/images/timewheel-struct.png
Normal file
After Width: | Height: | Size: 1.1 MiB |
BIN
docs/images/trie.png
Normal file
After Width: | Height: | Size: 91 KiB |
BIN
docs/images/variables.png
Normal file
After Width: | Height: | Size: 90 KiB |
BIN
docs/images/wechat.jpg
Normal file
After Width: | Height: | Size: 134 KiB |
BIN
docs/images/zrpc.png
Normal file
After Width: | Height: | Size: 162 KiB |
26
package.json
Normal file
@ -0,0 +1,26 @@
|
||||
{
|
||||
"name": "go-zero-website",
|
||||
"version": "1.0.0",
|
||||
"description": "go zero website",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1",
|
||||
"docs:dev": "vuepress dev docs",
|
||||
"docs:build": "vuepress build docs"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git@github.com:tal-tech/zero-doc.git"
|
||||
},
|
||||
"keywords": [],
|
||||
"author": "",
|
||||
"dependencies": {
|
||||
"@vuepress/plugin-active-header-links": "^1.2.0",
|
||||
"@vuepress/plugin-back-to-top": "^1.2.0",
|
||||
"@vuepress/plugin-medium-zoom": "^1.2.0",
|
||||
"@vuepress/plugin-nprogress": "^1.2.0",
|
||||
"@vuepress/plugin-pwa": "^1.2.0",
|
||||
"moment": "^2.24.0",
|
||||
"vuepress": "^1.7.1"
|
||||
}
|
||||
}
|
@ -97,7 +97,7 @@ go get -u github.com/tal-tech/go-zero
|
||||
|
||||
[Rapid development of microservice systems](doc/shorturl-en.md)
|
||||
|
||||
[Rapid development of microservice systems - multiple RPCs](doc/bookstore-en.md)
|
||||
[Rapid development of microservice systems - multiple RPCs](docs/frame/bookstore-en.md)
|
||||
|
||||
1. install goctl
|
||||
|
||||
@ -207,4 +207,4 @@ go get -u github.com/tal-tech/go-zero
|
||||
## 8. Documents (adding)
|
||||
|
||||
* [Rapid development of microservice systems](doc/shorturl-en.md)
|
||||
* [Rapid development of microservice systems - multiple RPCs](doc/bookstore-en.md)
|
||||
* [Rapid development of microservice systems - multiple RPCs](docs/frame/bookstore-en.md)
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
## 1. 项目地址
|
||||
|
||||
项目官网:http://zero.gocn.vip
|
||||
[https://github.com/tal-tech/go-zero](https://github.com/tal-tech/go-zero)
|
||||
|
||||
## 2. 项目文档
|
||||
@ -22,7 +23,7 @@
|
||||
|
||||
* awesome系列
|
||||
* [快速构建高并发微服务](doc/shorturl.md)
|
||||
* [快速构建高并发微服务-多RPC版](doc/bookstore.md)
|
||||
* [快速构建高并发微服务-多RPC版](docs/frame/bookstore.md)
|
||||
* [goctl使用帮助](doc/goctl.md)
|
||||
* [通过MapReduce降低服务响应时间](doc/mapreduce.md)
|
||||
* [关键字替换和敏感词过滤工具](doc/keywords.md)
|
||||
|