何为热点
热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制,比如:
- 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
- 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
版本
本文基于 1.8.0
如何使用
1.pom 中引入如下
1 | <dependency> |
2.定义 ParamFlowRule
1 | private static void loadRules() { |
这里的配置属性后文讲源码的时候都会看到,所以要重点关注一下
- Rule 本身可以定义一个限流阈值,每个热点参数也可以定义自己的限流阈值
- 还可以为限流阀值设置一个单位时间
3.调用
1 | try { |
完整 demo 参考:传送门
之前有用过 Sentinel 的同学的话其实很好理解。配置方面的话 Rule 属性有些不同,调用方面,需要添加上本次调用相关的参数
举个例子,我们配置了对商品 ID = 1 的限流规则,每次请求商品接口之前调用 Sentinel 的限流 API,指定 Resource
并传入当前要访问的商品 ID。
如果 Sentinel 能找到 Resource 对应的 Rule,则根据 Rule 进行限流。Rule 中如果找到 arg
对应的热点参数配置,则使用热点参数的阈值进行限流。找不到的话,则使用 Rule 中的阈值。
实现原理
Sentinel 整体采用了责任链的设计模式(类似 Servlet Filter),每次调用 SphU.entry
时,都会经历一系列功能插槽(slot chain)。不同的 Slot 职责不同,有的是负责收集信息,有的是负责根据不同的算法策略进行熔断限流操作,关于整体流程大家可以阅读下 官网 中对 Sentinel 工作流程的介绍。
ParamFlowSlot
关于热点参数限流的逻辑在 com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
中
1 | public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { |
ParamFlowSlot 中代码不多,也没做什么事。参考注释的话应该很好理解。咱们直接挑干的讲,来看下 ParamFlowChecker 中是如何实现限流的
ParamFlowChecker 数据结构
热点参数限流使用的算法为令牌桶算法,首先来看一下数据结构是如何存储的
1 | public class ParameterMetric { |
Sentinel 中 Resource 代表当前要访问的资源(方法或者api接口),一个 Resource 可以对应多个 Rule,这些 Rule 可以是相同的 class。
现在再来看 ParameterMetric 的结构,每个 Resource 对应一个 ParameterMetric 对象,上述 CacheMap<Object, AtomicLong>
的 Key 代表热点参数的值,Value 则是对应的计数器。
所以这里数据结构的关系是这样的
- 一个 Resource 有一个 ParameterMetric
- 一个 ParameterMetric 统计了多个 Rule 所需要的限流指标数据
- 每个 Rule 又可以配置多个热点参数
CacheMap 的默认实现,包装了
com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
使用该类的主要原因是为了实现热点参数的 LRU
详细解释一下,这三个变量
- ruleTimeCounters :记录令牌桶的最后添加时间,用于 QPS 限流
- ruleTokenCounter :记录令牌桶的令牌数量,用于 QPS 限流
- threadCountMap :用于线程级别限流,这个其实和令牌桶算法没有关系了,线程限流只是在 Rule 中定义了最大线程数,请求时判断一下当前的线程数是否大于最大线程,具体的应用在
ParamFlowChecker#passSingleValueCheck
实际使用 ParameterMetric 时,使用 ParameterMetricStorage 获取 Resource 对应的 ParameterMetric
1 | public final class ParameterMetricStorage { |
ParamFlowChecker 执行逻辑
ParamFlowChecker 中 QPS 级限流支持两种策略
- CONTROL_BEHAVIOR_RATE_LIMITER :请求速率限制,对应的方法
ParamFlowChecker#passThrottleLocalCheck
- DEFAULT :只要桶中还有令牌,就可以通过,对应的方法
ParamFlowChecker#passDefaultLocalCheck
接下来我们将以 passDefaultLocalCheck 为例,进行分析。但是在这之前,先来捋一下,从 ParamFlowSlot#checkFlow
到 ParamFlowChecker#passDefaultLocalCheck
这中间都经历了什么,详见👇
1 | // 伪代码,忽略了一些参数传递 |
上面提到了一个集群限流,和上一篇中说到的集群限流实现原理是一样的,选出一台 Server 来做限流决策,所有客户端的限流请求都咨询 Server,由 Server 来决定。由于不是本文重点,就不多说了。
ParamFlowChecker 限流核心代码
铺垫了这么多,终于迎来了我们的主角 ParamFlowChecker#passDefaultLocalCheck
,该方法中实现了简单的令牌桶算法,用于热点参数限流
1 | static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, |
令牌桶算法核心思想如下图所示,结合这个图咱们再来理解理解代码
核心逻辑在 while 循环中,咱们直接挑干的讲
先回顾一下上面说过 tokenCounters 和 timeCounters,在默认限流实现中,这两个参数分别代表最后添加令牌时间,令牌剩余数量
while 逻辑:
- 首先如果当前 value 对应的令牌桶为空,则执行初始化
计算当前时间到上次添加 token 时间经历了多久,即
passTime = currentTime - lastAddTokenTime.get()
用于判断是否需要添加 token2.1)
if (pass > rule 中设定的限流单位时间)
,则使用原子操作为令牌桶补充 token(具体补充 token 的逻辑详见上面代码注释)2.2)
else 不需要补充 token
,使用原子操作扣减令牌
可以看到关于 token 的操作全是使用原子操作(CAS),保证了线程安全。如果原子操作更新失败,则会继续执行。
速率限制的实现
再顺便叨咕下上面说过CONTROL_BEHAVIOR_RATE_LIMITER
速率限制策略是如何实现的,只简单说说思路,具体细节大家可以自己看下源码
该策略中,仅使用 timeCounters,该参数存储的数据变成了 lastPassTime
(最后通过时间),所以这个实现和令牌桶也没啥关系了
新的请求到来时,首先根据 Rule 中定义时间范围,count 计算 costTime
,代表每隔多久才能通过一个请求1
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
只有 lastPassTime + costTime <= currentTime
,请求才有可能成功通过,lastPassTime + costTime 过大会导致限流。
最后
如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助