我是如何进行限流的

前言

以前做ERP系统,QPS这个概念离我着实有些遥远,毕竟ERP系统主要还是公司内部特定业务的一些使用,QPS并不会很高,也就不会出现流量洪峰把服务打跪的情况,但是目前本人在做的系统QPS已经接近百万级别,如何防止流量高峰把服务打挂就是要关注的重点了,当然防止服务雪崩其实有很多方法,比如Lb、Cache、扩容等等,但是以上这些操作其实都离不开一个主题那就是资源,如果不考虑资源问题这些都是很好的方法,但是如果当我们的资源已经匮乏了,这时就需要下面这种保护服务的方法了— 《限流器》,限流是微服务中必不缺少的一个环节,可以很好的起到保护下游服务,防止服务过载等作用。当然限流从功能上来说也分两种,一种是框架层的限流,像springcloud中的histrix亦或者我现在使用的polaris等等,框架层限流通过半开、全开、熔断的方式进行实现,另外一种就是今天我们要讨论的业务限流,接下来就介绍下本人在项目实战中是如何进行限流的,其实使用非常简单,但我们要知其然还要知其所以然~

概念

从目前市面上的主流限流方式来看,大致可以分为漏桶和令牌桶两种限流方式,下面先对这两类限流算法进行一下简单介绍。

漏桶

什么是漏桶了?举个通俗点的例子吧,本人开个一间澡堂子,澡堂子有一个等待大厅可以容纳10个人,晚上6点来了8个人洗澡,这时候我会安排这8个人在等待大厅等候,然后每隔一分钟就放一个人进去洗澡,当我放了3个人进去后,这时等待大厅还有5个人,此时又来了6个人要洗澡,但是现在等待大厅最多能容纳5个人,那多出来的那个人咋办?两个选择要么等着要么就先行离开待会再来,这样其实就是一个简易的漏桶系统了;
其实总结下来漏桶原则就是以任意速率接受请求,然后以恒定速率放行请求,如下图所示:

优点:无法击垮服务、能很好的保证服务的健康;
缺点:因为当流出速度固定,大规模持续突发量,无法多余处理,浪费网络带宽;

令牌桶

那什么是令牌桶了?还是上面的澡堂子场景,晚上六点来了100号大汉来洗澡,那本人就站在门口接待并且每隔1分钟放10个大汉进去等待大厅,进入大厅等待的大汉自行决定要不要立刻去洗澡,可以全部人一次性全部进去洗澡,也可以慢慢悠悠坐会儿再清洗,而我则根据等待大厅的实际空位情况进行人员放行。如此便是一个简易的令牌桶系统了;
其实总结下来令牌桶就是以恒定速率接受请求,然后以任意速率放行请求,当然上限就是桶大小,如下图:

优点:支持大的并发,有效利用网络带宽,可合理处理流量激增的场景;
缺点:如果设计不合理在流量突增情况下限流效果不明显,还是有可能出现压垮服务的情况(下面会解释这种情况);

令牌桶的原理

其实上面两种算法孰优孰劣,这里我不发表评论,因为我一贯的思想是没有最有的算法只有最优的选择,我们需要根据实际的场景进行最佳的选择,因为本人的项目是需要支持大的并发以及一些流量突增场景的,所以就选择了令牌桶算法;

一般在网上搜索令牌桶的时候,都会有一张这样的原理图:

通常看到这张图,我的第一反应是需要两个核心组件 : 一个 定时器 和两个 阻塞队列。定时器 恒定速率往令牌桶(阻塞队列2) 中放 token。用户侧请求则阻塞在阻塞队列1中排队从令牌桶(阻塞队列2)中获取token。这么做也是该图的直观反应,但是这样做不禁效率低下而且过滤复杂,不仅要维护一个 定时器 和 两个 阻塞队列,而且还耗费了一些不必要的内存。
如果我们是按照每秒往令牌桶中放token就会出现上面说到的缺点,假设我们的服务最大处理能力是150QPS,现在设置的限流器是每秒放置100个token进入令牌桶,这样看来服务会很安全,但是实际上了?来看下面的这种情况 :

如果突然有200个请求从第一秒的后0.5秒打过来,此时这200个请求毫无疑问都是会打到后端服务的,这时已经远远超过了后端服务的负载上限了,结果可想而知。

那怎样才能规避这样的问题了?不应担心,Golang 的 timer/rate使用的lazyload的方式就合理规避了上述风险,没有计时器没有阻塞队列,直到每次消费之前才根据时间差更新 token 数目,仅仅是通过计数的方式。

token 的生成和消费

1
2
3
4
5
6
7
8
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
​ limit: r,
​ burst: b,
​ }
}

time/rate 中,NewLimiter 的第一个参数就是速率 Limit,表示一秒钟可以产生多少 token,这里强调一下,1秒产生的token只是一个概念,并不是真正的按照1秒进行生产,不然就会有上面描述的临界时间节点的问题。
这里换算一下,就可以知道生产一个 token 的实际时间间隔是多少了,根据这个生成单个token的时间间隔,就可以得到以下的两个核心数据:
1. 生成 N 个新的 token 一共需要多久。durationFromtokens
2. 给定一段时长,这段时间一共可以生成多少个 token。tokensFromDuration

好了,有了以上的两个值后,接下来的流程就比较明朗了:

  • 计算从上次取 token 的时间到当前时刻,期间一共新产生了多少 token:
    只在取 token 之前生成新的 token,也就意味着每次取 token 的间隔,实际上也是生成 token 的间隔。我们可以利用 tokensFromDuration, 轻易的算出这段时间一共产生 token 的数目。
    那么,当前 token 数目 = 新产生的 token 数目 + 之前剩余的 token 数目 - 要消费的 token 数目。
  • 如果消费后剩余 token 数目大于零,说明此时 token 桶内仍不为空,此时 token 充足,无需调用侧等待。
    如果 token 数目小于零,则需等待一段时间。
    那么这个时候,我们可以利用 durationFromtokens 将当前负值的 token 数转化为需要等待的时间。
  • 将需要等待的时间等相关结果返回给调用方。
  • 根据实际的需要等待时间决定是否放弃该请求。

可以看出,其实整个过程就是利用了 token 数可以和时间相互转化 的原理。而如果 token 数为负,则需要等待相应时间即可。

注意 如果当获取token时,令牌桶中的 token 数目已经为负值了,依然可以按照上述流程进行消费。随着负值越来越小,等待的时间将会越来越长。当然,读写共享内存的行为为了保证线程安全,更新令牌桶相关数据时都用了 mutex 加锁。

举一个实际例子来看下token桶是如何运转的吧:

  1. 初始化后,桶内 token 数为 5, 此时 A 线程请求 4 个 token。那么此时桶内 token 是充足的,因此 A 线程不需要等待,直接获取token。且此时桶内 token 数变为 1。
  2. 同时,B 线程请求 5 个 token,此时桶内 token 数为 1,所以此时 B 线程需要等待 -1 + 5 = 4 个 token 的时间,且此时桶内 token 数变为 -4。
  3. 最后是C线程也在此时请求1个token,此时桶内的token数为-4,所以此时C线程需要等待 4 + 1 = 5个token的事件,且此时桶内的token数变为 -5。后面的情况就以此类推了,是不是很简单。

通常在实际的常用手法中有两个比较重要的方法就是Allow和Wait两个:

  • Allow :只需判断需要等待的时间是否为 0 即可,如果大于 0 说明需要等待,则会立即返回 False,反之立即返回 True。
  • Wait :需要根据t := time.NewTimer(delay),等待对应的时间,如果等待里时间没ctx没有超时则ok,饭制则fail。

float 精度问题

这其实是一个语言的数值问题,并非是限流器的设计问题,就是在 token 和时间的相互转化函数 durationFromtokenstokensFromDuration 中,会涉及到 float64 的乘除运算。但凡遇到 float 的乘除,丢失精度问题就如吃饭喝水般常见。

Golang 在这里也踩了坑,以下是 tokensFromDuration 最初的实现版本

1
2
3
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}

这个操作看起来一点问题都没:每秒生成的 token 数乘于秒数。
然而,这里的问题在于,d.Seconds() 已经是小数了。两个小数相乘,会带来精度的损失。

修改后新的版本如下:

1
2
3
4
5
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}

time.Durationint64 的别名,代表纳秒。分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。

数值溢出问题

回到上面的实现,计算从当前时间到上次取 token 的时刻,期间一共新产生了多少 token,同时也可得出当前的 token 是多少。

老实说,对于这个需求我的第一实现思路是:

1
2
3
4
5
6
7
8
9
// gapTime 表示过去的时间差
gapTime := now.Sub(limit.last)
// gapTokens 表示这段时间一共新产生了多少 token
gapTokens = tokensFromDuration(now.Sub(limit.last))

nowTokens := limit.tokens + gapTokens
if(nowTokens > limit.burst){
nowTokens = limit.burst
}

limit.tokens 表示当前剩余的 token数,limit.last 是上次取 token 的时间。limit.burst 是 token 桶的固定容量。
使用 tokensFromDuration 计算出距上次获取token后的时间间隔内新生成了多少 token,累加起来后,不超过桶的固定容量即可。

这个方法乍一看确实挑不出什么毛病,但它是不是真的没什么问题了?先来看一下在 time/rate 里面是怎么做的吧,如下:

1
2
3
4
5
6
7
8
9
10
11
12
maxElapsed := lim.limit.durationFromtokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}

delta := lim.limit.tokensFromDuration(elapsed)

tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}

发现没与上面我的实现思路是有不一样的,它并没有直接使用 now.Sub(lim.last) 来计算这个时间差内应该生产的token数,而是
用了 lim.limit.durationFromtokens(float64(lim.burst) - lim.tokens)做兜底,计算把桶填满的时间 maxElapsed。
取 elapsed 和 maxElapsed 二者的最小值。这里的实现着实有些惊讶,但好像又在情理之中;

现在回过头来再思考一下,对于我的设计思路,如果直接使用now.Sub(lim.last)这个值,当now的值非常大或者last的值非常小的时候,这个间隔时间就会变得异常的大 ,如果 lim.limit 也被设置的非常大后,直接将二者相乘,计算结果有可能就会溢出。当然如果你一定要说lim.limit.durationFromtokens(float64(lim.burst) - lim.tokens)这个值也有可能非常大也没毛病,不反驳-_-!!!。

总结,time/rate 就是使用了lim.limit.durationFromtokens(float64(lim.burst) - lim.tokens)来规避了数值溢出的问题。

使用

使用相对就比较简单了,通常在项目初始化时会构建一个limiter放入内存中

1
2
3
4
5
6
func registerGconfig(gConfig *Config) {
GConfig = gConfig
tempObtainURLLimiter := rate.NewLimiter(rate.Limit(gConfig.ObtainURLTokenPerSecond),
gConfig.ObtainURLTokenPerSecond)
ObtainURLLimiter = tempObtainURLLimiter
}

然后再业务函数中直接使用即可

1
2
3
4
5
6
if err := g.ObtainURLLimiter.Wait(ctx); err != nil {
metrics.Counter("ObtainDownloadURLWaitErrCount").Incr()
log.InfoContextf(ctx, "Vdc_Operation | DoObtainDownloadURL | LimiterWaitErr:[%+v]", err)
servererr.ErrRspObtainDownloadURL(rsp, servererr.NewWithMsg(servererr.ErrDoObtainDownloadURL, err.Error()))
return nil
}

最后为了更动态的控制速率,我们会引入服务配置平台,通过长链接和watch机制实现,在配置平台调整速率后下发配置,服务就会自动更新一个新的limiter放入内存中,这样我们就可以根据实时服务负载监控动态的对限流速率进行调整了。

来看一张使用了限流器的服务吞吐量监控图,会发现服务在流量高峰的吞吐量都是非常平稳的~ :

总结

令牌桶是非常适合互联网突发式请求场景的,请求 token 时并不是像漏桶一样严格的限制为固定的速率,而是使用一个完整的桶容量作为缓冲。只要桶中还有 token,请求就可以一直进行,也就可以处理相当于整个桶容量的流量高峰。但是当流量激增到一定程度后,就会按照设置的token放置速率进行处理,通常情况下我们会将桶容量和速率调整为同一个数值。

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1x761d0b26y42