sentinel流控规则校验之源码分析
努力工作的小码农 人气:0前言:
上节给大家把sentinel流控整个执行大致过了,但涉及到最核心的流控算法还没有讲,先提前说明一下 sentinel用的流控算法是令牌桶算法,参考了Guava的RateLimiter,有读过RateLimiter源码再理解sentinel限流算法会更容易,本节依然以源码为主给大家拨开sentinel流控算法的原理
接着上节没有讲到的FlowSlot来看,先来看对应流控规则配置
FlwSlot
/***********************************************FlowSlot***********************************************/ public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final FlowRuleChecker checker; public FlowSlot() { this(new FlowRuleChecker()); } FlowSlot(FlowRuleChecker checker) { AssertUtil.notNull(checker, "flow checker should not be null"); this.checker = checker; } @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); } void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { checker.checkFlow(ruleProvider, resource, context, node, count, prioritized); } } public class FlowRuleChecker { public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } // 拿到当前资源对应的流控规则 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //通行校验 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) { return canPassCheck(rule, context, node, acquireCount, false); } public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); // 对应控制台配置的来源 if (limitApp == null) { return true; } if (rule.isClusterMode()) { // 集群模式校验 return passClusterCheck(rule, context, node, acquireCount, prioritized); } // 本地应用校验 return passLocalCheck(rule, context, node, acquireCount, prioritized); } private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 针对配置的流控模式拿到对应的node Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } // 针对配置的流控效果来作校验 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); } static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { String limitApp = rule.getLimitApp(); int strategy = rule.getStrategy(); String origin = context.getOrigin(); if (limitApp.equals(origin) && filterOrigin(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // 配置的来源和当前相同 流控模式为直接 return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // 配置来源为default 流控模式为直接 return node.getClusterNode(); } return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { // 配置来源为other 流控模式为直接 return context.getOriginNode(); } return selectReferenceNode(rule, context, node); } return null; } static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) { // 关联资源 String refResource = rule.getRefResource(); int strategy = rule.getStrategy(); if (StringUtil.isEmpty(refResource)) { return null; } // 流控模式为关联 if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } // 流控模式为链路 if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } return node; } // No node. return null; } }
流控类型只针对qps或线程数限制
流控模式分别有三种直接:originNode,关联ClusterNode,链路EntranceNode,针对这几个node的区别上节已经做过说明,不清楚的读者打开上节node树形结构一看便知
流控效果对应四种,具体实现由TrafficShappingController的实现类完成
- 快速失败(DefaultController):如果是限制qps会抛出异常,线程数返回false不通过
- Warm Up(WarmUpController):预热,防止流量突然暴增,导致系统负载过重,有时候系统设置的最大负载是在理想状态达到的,当系统长时间处于冷却状态 需要通过一定时间的预热才能达到最大负载,比如跟数据库建立连接
- 排队等待(RateLimiterController):当前没有足够的令牌通过,会进行睡眠等待,直接能拿到足够的令牌数
- WarmUpRateLimiterController:第二种和第三种的结合
来看TrafficShappingController具体的实现类
DefaultController
public class DefaultController implements TrafficShapingController { private static final int DEFAULT_AVG_USED_TOKENS = 0; //阀值 private double count; // 1=qps,0=ThreadNum private int grade; public DefaultController(double count, int grade) { this.count = count; this.grade = grade; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 获取当前node的ThreadNum或QPS int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) {
// 设置当前流量为优先级和流控模式为QPS if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); // 算出拿到当前令牌数的等待时间(ms) waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); // OccupyTimeoutProperty.getOccupyTimeout = 500ms // 如果流量具有优先级,会获取未来的令牌数 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
// 添加占用未来的QPS,会调用OccupiableBucketLeapArray.addWaiting(long time, int acquireCount) node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } // 控制的是线程数返回false return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); } private void sleep(long timeMillis) { try { Thread.sleep(timeMillis); } catch (InterruptedException e) { // Ignore. } } }
在上节讲滑动窗口的时候,还有一个秒维度的窗口OccupiableBucketLeapArray没有讲解,它同样继承LeapArray,但它还有额外的概念 未来占用,在DefaultController中当前令牌数不够并且流量具有优先级,那么会提前获取未来的令牌,因为阀值固定每秒能获取的令牌数也固定,既然占用了未来的令牌数,那等到时间到了这个未来时间点,当前可获取的令牌数=阀值—之前占用的令牌
OccupiableBucketLeapArray有个FutureBucketLeapArray就是来存储占用未来的窗口数据
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> { private final FutureBucketLeapArray borrowArray; public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) { // This class is the original "CombinedBucketArray". super(sampleCount, intervalInMs); this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs); } @Override // LeapArray.currentWindow(long timeMillis)添加新窗口时会调用 public MetricBucket newEmptyBucket(long time) { MetricBucket newBucket = new MetricBucket(); // 已被之前占用,将之前占用的数据添加到当前新的窗口中 MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { newBucket.reset(borrowBucket); } return newBucket; } @Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) { // Update the start time and reset value. w.resetTo(time); // 重置时当前窗口数据时,也需要考虑被之前占用的情况 MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { w.value().reset(); w.value().addPass((int)borrowBucket.pass()); } else { w.value().reset(); } return w; } @Override public long currentWaiting() { // 获取当前时间被之前占用的qps borrowArray.currentWindow(); long currentWaiting = 0; List<MetricBucket> list = borrowArray.values(); for (MetricBucket window : list) { currentWaiting += window.pass(); } return currentWaiting; } @Override public void addWaiting(long time, int acquireCount) { // 添加到未来窗口 WindowWrap<MetricBucket> window = borrowArray.currentWindow(time); window.value().add(MetricEvent.PASS, acquireCount); } }
当新窗口添加或重置旧窗口数据都需要考虑之前占用的情况,然后把之前占用的窗口数据添加进去
RateLimiterController
跟Guava中SmoothBursty原理类似
public class RateLimiterController implements TrafficShapingController { // 最大等待时间 private final int maxQueueingTimeMs; // 阀值 private final double count; // 最新一次拿令牌的时间 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // Pass when acquire count is less or equal than 0. if (acquireCount <= 0) { return true; } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); // 计算获取acquireCount需要多少毫秒 long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 预期获取令牌的时间 long expectedTime = costTime + latestPassedTime.get(); // 如果小于当前时间 则直接返回 if (expectedTime <= currentTime) { // Contention may exist here, but it's okay. latestPassedTime.set(currentTime); return true; } else { // 否则进行等待 // Calculate the time to wait. long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { return false; } else { // oldTime 为当前拿到令牌的时长 是更新后的值 long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; } }
前面两种流控策略都比较简单,主要来看WarmUpController
WarmUpController
它参考了Guava中SmoothWarmingUp的设计,但具体策略不一样,先来看它的源码,后续讲解它们的不同
public class WarmUpController implements TrafficShapingController { // qps阈值 protected double count; // 负载因子 用来控制预热速度 默认为3 private int coldFactor; // 进入冷却状态的警戒线 protected int warningToken = 0; // 最大的令牌数 private int maxToken; // 斜率 protected double slope; // 当前令牌容量 protected AtomicLong storedTokens = new AtomicLong(0); // 上一次添加令牌时间 protected AtomicLong lastFilledTime = new AtomicLong(0); public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } public WarmUpController(double count, int warmUpPeriodInSec) { construct(count, warmUpPeriodInSec, 3); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; // thresholdPermits = 0.5 * warmupPeriod / stableInterval. // warningToken = 100; warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); // / maxPermits = thresholdPermits + 2 * warmupPeriod / // (stableInterval + coldInterval) // maxToken = 200 maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // slope // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits- thresholdPermits); slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 当前窗口通过的qps long passQps = (long) node.passQps(); // 上一个窗口的qps long previousQps = (long) node.previousPassQps(); // 更新当前令牌数 syncToken(previousQps); // 开始计算它的斜率 // 如果进入了警戒线,开始调整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,但是要比慢 // current interval = restToken*slope+1/count 算出当前进入预热状态能得到最大的qps double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { // 这里之所以用count来判断而不是storedTokens来,是因为storedTokens是每次添加都是以count倍速来添加 count自然不会大过storedTokens return true; } } return false; } protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); // 因为每一个秒维度的总窗口间隔为1s,减去当前时间的毫秒数 即可得到当前窗口的开始时间 currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { // 说明还处于当前窗口 return; } // 进入到这里代表第一次进入新的窗口 需要添加当前令牌容量 // 令牌数量的旧值 long oldValue = storedTokens.get(); // 算出新的令牌数 旧值+(根据上一个窗口的qps 算出从上次添加令牌的时间到当前时间需要添加的令牌数) long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { // 减去上个窗口的qps,然后设置storedTokens最新值 long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } } private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的判断前提条件: // 当令牌的消耗程度远远低于警戒线的时候 if (oldValue < warningToken) { // 当前令牌数小于警戒值 可正常添加令牌 这里只是添加令牌 并不考虑添加令牌后大于warningToken情况,因为后面拿令牌还会重新判断 // currentTime - lastFilledTime.get() = 与上一次添加令牌间隔多少毫秒 newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { // 如果当前passQps > (count / coldFactor) 也就是说当前系统消耗令牌速度大于冷却速度 则不需要继续添加令牌 if (passQps < (int)count / coldFactor) { // 如果消耗速度小于冷却速度 则正常添加令牌 newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); } }
大致流程:
先根据当前窗口时间间隔时间添加令牌数,正常情况是以count*(last interval second) 来添加令牌数,
如果添加令牌的时候当前令牌容量已经达到警戒值,需要根据当前窗口passqps来判断 当前系统消耗令牌的速率是否大于冷却速率(也就是系统负载状态不需要再进行预热),大于冷却速率则不需要继续添加令牌
添加完令牌后减去上一个窗口的qps得到最新的令牌数, 再判断最新令牌数是否到了警戒值,到了警戒值,通过slope算出目前窗口能得到最大的qps,passQps+acquireCount 不允许大于它
我们再简单说说 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的区别
Guava在于控制获取令牌的速度,根据时间推进增加令牌,通过当前令牌容量判断获取令牌下一个时间点,如当前令牌容量超过了阀值 会进行预热 增加等待时长
Sentinel在于控制QPS,根据时间推进增加令牌,根据通过QPS减少令牌,如果QPS持续下降,当前storeTokens会持续增加,直到超过warningTokens阀值,越过阀值会根据进入预热状态后能提供最大的QPS来做限制
WarmUpRateLimiterController
看完RateLimiterController WarmUpController实现后,再来看这个就非常简单了,看名称就知道是它两的结合
public class WarmUpRateLimiterController extends WarmUpController { private final int timeoutInMs; private final AtomicLong latestPassedTime = new AtomicLong(-1); public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) { super(count, warmUpPeriodSec, coldFactor); this.timeoutInMs = timeOutMs; } @Override public boolean canPass(Node node, int acquireCount) { return canPass(node, acquireCount, false); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long previousQps = (long) node.previousPassQps(); syncToken(previousQps); long currentTime = TimeUtil.currentTimeMillis(); long restToken = storedTokens.get(); long costTime = 0; long expectedTime = 0; if (restToken >= warningToken) { // 在预热范围拿到的token long aboveToken = restToken - warningToken; // current interval = restToken*slope+1/count double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000); } else { costTime = Math.round(1.0 * (acquireCount) / count * 1000); } // 拿到令牌的时间 expectedTime = costTime + latestPassedTime.get(); if (expectedTime <= currentTime) { latestPassedTime.set(currentTime); return true; } else { long waitTime = costTime + latestPassedTime.get() - currentTime; if (waitTime > timeoutInMs) { return false; } else { long oldTime = latestPassedTime.addAndGet(costTime); try { waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > timeoutInMs) { latestPassedTime.addAndGet(-costTime); return false; } if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; } }
加载全部内容