typora/note/Go/限流.md
2024-12-12 10:48:55 +08:00

473 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

> 笔记来自微信公众号
> 常用限流算法的应用场景和实现原理
> 原创 KevinYan11 网管叨bi叨 2020-12-20 10:28
> [常用限流算法的应用场景和实现原理](https://mp.weixin.qq.com/s?__biz=MzUzNTY5MzU2MA==&mid=2247486937&idx=1&sn=d4ea6ebb38c52e8004e73f235bde9848&scene=21#wechat_redirect)
### 场景
- 在高并发业务场景下,保护系统时,常用的"三板斧"有:"熔断、降级和限流"。
- 业务代码中的逻辑限流
### 常见限流算法
- 计数器
- 滑动敞口
- 漏桶
- 令牌桶
### 计数器
- 计数器是一种比较简单粗暴的限流算法,思想是在固定时间窗口内对请求进行计数,与阀值进行比较判断是否需要限流
- 一旦到了时间临界点,将计数器清零
#### 缺陷
- 可以在清零的前一秒和后一秒两秒内发送阈值乘2的请求
- 一旦请求数量瞬间变多,还是会有崩溃的风险
![](https://blog-heysq-1255479807.cos.ap-beijing.myqcloud.com/blog/wiki/go/count.png)
#### 简单代码实现
```go
type LimitRate struct {
rate int //阀值
begin time.Time //计数开始时间
cycle time.Duration //计数周期
count int //收到的请求数
lock sync.Mutex //锁
}
func (limit *LimitRate) Allow() bool {
limit.lock.Lock()
defer limit.lock.Unlock()
// 判断收到请求数是否达到阀值
if limit.count == limit.rate-1 {
now := time.Now()
// 达到阀值后,判断是否是请求周期内
if now.Sub(limit.begin) >= limit.cycle {
limit.Reset(now)
return true
}
return false
} else {
limit.count++
return true
}
}
func (limit *LimitRate) Set(rate int, cycle time.Duration) {
limit.rate = rate
limit.begin = time.Now()
limit.cycle = cycle
limit.count = 0
}
func (limit *LimitRate) Reset(begin time.Time) {
limit.begin = begin
limit.count = 0
}
```
### 滑动窗口
- 滑动窗口算法将一个大的时间窗口分成多个小窗口,每次大窗口向后滑动一个小窗口,并保证大的窗口内流量不会超出最大值
- 比固定窗口的流量曲线更加平滑
- 滑动时间窗口我们可以把1s的时间窗口划分成10个小窗口或者想象窗口有10个时间插槽time slot, 每个time slot统计某个100ms的请求数量。每经过100ms有一个新的time slot加入窗口早于当前时间1s的time slot出窗口
- 窗口内最多维护10个time slot
#### 缺陷
- 滑动窗口算法是固定窗口的一种改进,但从根本上并没有真正解决固定窗口算法的临界突发流量问题
#### 代码实现
```go
package main
import (
"fmt"
"sync"
"time"
)
type timeSlot struct {
timestamp time.Time // 这个timeSlot的时间起点
count int // 落在这个timeSlot内的请求数
}
// 统计整个时间窗口中已经发生的请求次数
func countReq(win []*timeSlot) int {
var count int
for _, ts := range win {
count += ts.count
}
return count
}
type SlidingWindowLimiter struct {
mu sync.Mutex // 互斥锁保护其他字段
SlotDuration time.Duration // time slot的长度
WinDuration time.Duration // sliding window的长度
numSlots int // window内最多有多少个slot
windows []*timeSlot
maxReq int // 大窗口时间内允许的最大请求数
}
func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
SlotDuration: slotDuration,
WinDuration: winDuration,
numSlots: int(winDuration / slotDuration),
maxReq: maxReq,
}
}
func (l *SlidingWindowLimiter) validate() bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
// 已经过期的time slot移出时间窗
timeoutOffset := -1
for i, ts := range l.windows {
if ts.timestamp.Add(l.WinDuration).After(now) {
break
}
timeoutOffset = i
}
if timeoutOffset > -1 {
l.windows = l.windows[timeoutOffset+1:]
}
// 判断请求是否超限
var result bool
if countReq(l.windows) < l.maxReq {
result = true
}
// 记录这次的请求数
var lastSlot *timeSlot
if len(l.windows) > 0 {
lastSlot = l.windows[len(l.windows)-1]
if lastSlot.timestamp.Add(l.SlotDuration).Before(now) {
// 如果当前时间已经超过这个时间插槽的跨度,那么新建一个时间插槽
lastSlot = &timeSlot{timestamp: now, count: 1}
l.windows = append(l.windows, lastSlot)
} else {
lastSlot.count++
}
} else {
lastSlot = &timeSlot{timestamp: now, count: 1}
l.windows = append(l.windows, lastSlot)
}
return result
}
func (l *SlidingWindowLimiter) LimitTest() string {
if l.validate() {
return "Accepted"
} else {
return "Ignored"
}
}
func main() {
limiter := NewSliding(100*time.Millisecond, time.Second, 10)
for i := 0; i < 5; i++ {
fmt.Println(limiter.LimitTest())
}
time.Sleep(100 * time.Millisecond)
for i := 0; i < 5; i++ {
fmt.Println(limiter.LimitTest())
}
fmt.Println(limiter.LimitTest())
for _, v := range limiter.windows {
fmt.Println(v.timestamp, v.count)
}
fmt.Println("moments later...")
time.Sleep(time.Second)
for i := 0; i < 7; i++ {
fmt.Println(limiter.LimitTest())
}
for _, v := range limiter.windows {
fmt.Println(v.timestamp, v.count)
}
}
```
### 漏桶
#### 算法思想
- 漏桶算法是首先想象有一个木桶桶的容量是固定的。当有请求到来时先放到木桶中处理请求的worker以固定的速度从木桶中取出请求进行相应
- 如果木桶已经满了,直接返回请求频率超限的错误码或者页面
#### 使用场景
- 漏桶算法是流量最均匀的限流实现方式,一般用于流量“整形”
- 例如保护数据库的限流先把对数据库的访问加入到木桶中worker再以db能够承受的qps从木桶中取出请求去访问数据库
#### 缺陷
- 木桶流入请求的速率是不固定的,但是流出的速率是恒定的。能保护系统资源不被打满
- 面对突发流量时会有大量请求失败,不适合电商抢购和微博出现热点事件等场景的限流
#### 简单代码实现
```go
// 一个固定大小的桶,请求按照固定的速率流出
// 请求数大于桶的容量,则抛弃多余请求
type LeakyBucket struct {
rate float64 // 每秒固定流出速率
capacity float64 // 桶的容量
water float64 // 当前桶中请求量
lastLeakMs int64 // 桶上次漏水微秒数
lock sync.Mutex // 锁
}
func (leaky *LeakyBucket) Allow() bool {
leaky.lock.Lock()
defer leaky.lock.Unlock()
now := time.Now().UnixNano() / 1e6
// 计算剩余水量,两次执行时间中需要漏掉的水
leakyWater := leaky.water - (float64(now-leaky.lastLeakMs) * leaky.rate / 1000)
leaky.water = math.Max(0, leakyWater)
leaky.lastLeakMs = now
if leaky.water+1 <= leaky.capacity {
leaky.water++
return true
} else {
return false
}
}
func (leaky *LeakyBucket) Set(rate, capacity float64) {
leaky.rate = rate
leaky.capacity = capacity
leaky.water = 0
leaky.lastLeakMs = time.Now().UnixNano() / 1e6
}
```
### 令牌桶
#### 算法思想
- 倒着的漏桶
- 以恒定的速率向桶中添加令牌
- 木桶满了则不再加入令牌。服务收到请求时尝试从木桶中取出一个令牌
- 令牌桶空闲时,可以攒着最高的限额数的令牌
- 由于木桶内只要有令牌,请求就可以被处理,所以令牌桶算法可以支持突发流量
![](https://blog-heysq-1255479807.cos.ap-beijing.myqcloud.com/blog/wiki/go/lingpaitong.png)
#### 参数设置
- 木桶的容量 - 考虑业务逻辑的资源消耗和机器能承载并发处理多少业务逻辑。
- 生成令牌的速度 - 太慢的话起不到“攒”令牌应对突发流量的效果
#### 简单代码实现
```go
type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
capacity int64 //桶的容量
tokens int64 //桶中当前token数量
lastTokenSec int64 //上次向桶中放令牌的时间的时间戳,单位为秒
lock sync.Mutex
}
func (bucket *TokenBucket) Take() bool {
bucket.lock.Lock()
defer bucket.lock.Unlock()
now := time.Now().Unix()
bucket.tokens = bucket.tokens + (now-bucket.lastTokenSec)*bucket.rate // 先添加令牌
if bucket.tokens > bucket.capacity {
bucket.tokens = bucket.capacity
}
bucket.lastTokenSec = now
if bucket.tokens > 0 {
// 还有令牌,领取令牌
bucket.tokens--
return true
} else {
// 没有令牌,则拒绝
return false
}
}
func (bucket *TokenBucket) Init(rate, cap int64) {
bucket.rate = rate
bucket.capacity = cap
bucket.tokens = 0
bucket.lastTokenSec = time.Now().Unix()
}
```
### 官方限流器
- `golang.org/x/time/rate`
- 基于令牌桶实现
```go
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
```
#### 初始化
- `limiter := rate.NewLimiter(10, 100);`
- 两个参数
+ 第一个参数每秒向桶中放令牌的个数
+ 令牌桶的容量,令牌最多的个数
- 还可以用`every`方法指定向桶中放置token的间隔
```go
limit := rate.Every(100 * time.Millisecond);
limiter := rate.NewLimiter(limit, 100);
```
#### 动态调整
- `SetLimit(Limit)` 改变放入 Token 的速率
- `SetBurst(int)` 改变 Token 桶大小
#### 使用
- 三类方法供程序消费 token
- 可以同步等待token生成也可以没有token时返回token获取失败
- `Wait/WaitN`
- `Allow/AllowN`
- `Reserve/ReserveN`
#### Wait/WaitN
- Wait 相当于 WaitN(ctx, 1)
- 如果此时桶内 Token 数组不足 (小于 N) Wait 方法将会阻塞一段时间,直至 Token 满足条件
- 如果充足则直接返回
- 可以设置 context 的 Deadline 或者 Timeout来决定此次 Wait 的最长时间
#### Allow/AllowN
- Allow 实际上就是对 AllowN(time.Now(),1) 进行简化的函数
- AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true同时从桶中消费 n 个 token。反之不消费桶中的Token返回false
- 对应线上的使用场景是,如果请求速率超过限制,就直接丢弃超频后的请求
#### Reserve/ReserveN
- Reserve 相当于 ReserveN(time.Now(), 1)。
- ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 Token 是否充足,都会返回一个 *Reservation 对象
- 可以调用该对象的Delay()方法该方法返回的参数类型为time.Duration反映了需要等待的时间必须等到等待时间之后才能进行接下来的工作
- 如果不想等待可以调用Cancel()方法,该方法会将 Token 归还
#### 主要逻辑代码
```go
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
defer lim.mu.Unlock()
// Inf 一个特别大的值,产生令牌的速率最大,代表一直有令牌
if lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
// 不产生令牌,桶内的用光就没有了
} else if lim.limit == 0 {
var ok bool
if lim.burst >= n {
ok = true
lim.burst -= n
}
return Reservation{
ok: ok,
lim: lim,
tokens: lim.burst,
timeToAct: now,
}
}
// 运行检查看是不是需要生成令牌和limit生成令牌的时间
// now 就是传进去的时间
// last 如果为now本轮没有生成令牌否则生成新令牌了
// tokens 本轮令牌总数
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
// 扣除需要使用的令牌
tokens -= float64(n)
// Calculate the wait duration
// 根据令牌数机选需要等待时间
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
// 本轮是不是能拿到令牌
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
return r
}
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed.
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
if limit <= 0 {
return InfDuration
}
seconds := tokens / float64(limit)
return time.Duration(float64(time.Second) * seconds)
}
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
if limit <= 0 {
return 0
}
return d.Seconds() * float64(limit)
}
```