亲宝软件园·资讯

展开

sentinel流控规则校验之源码分析

努力工作的小码农 人气:0

前言:

  上节给大家把sentinel流控整个执行大致过了,但涉及到最核心的流控算法还没有讲,先提前说明一下 sentinel用的流控算法是令牌桶算法,参考了Guava的RateLimiter,有读过RateLimiter源码再理解sentinel限流算法会更容易,本节依然以源码为主给大家拨开sentinel流控算法的原理

接着上节没有讲到的FlowSlot来看,先来看对应流控规则配置

 

 FlwSlot

/***********************************************FlowSlot***********************************************/
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    public FlowSlot() {
        this(new FlowRuleChecker());
    }

    FlowSlot(FlowRuleChecker checker) {
        AssertUtil.notNull(checker, "flow checker should not be null");
        this.checker = checker;
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }
}

public class FlowRuleChecker {

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        // 拿到当前资源对应的流控规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                //通行校验
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node,
                                                    int acquireCount) {
        return canPassCheck(rule, context, node, acquireCount, false);
    }

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        // 对应控制台配置的来源
        if (limitApp == null) {
            return true;
        }
        if (rule.isClusterMode()) {
            // 集群模式校验
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        // 本地应用校验
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 针对配置的流控模式拿到对应的node                                  
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        // 针对配置的流控效果来作校验
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
    
    static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // 配置的来源和当前相同 流控模式为直接
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // 配置来源为default 流控模式为直接
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // 配置来源为other 流控模式为直接
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }

    static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        // 关联资源
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }
        // 流控模式为关联
        if (strategy == RuleConstant.STRATEGY_RELATE) {
            return ClusterBuilderSlot.getClusterNode(refResource);
        }
        // 流控模式为链路
        if (strategy == RuleConstant.STRATEGY_CHAIN) {
            if (!refResource.equals(context.getName())) {
                return null;
            }
            return node;
        }
        // No node.
        return null;
    }
}

流控类型只针对qps或线程数限制

流控模式分别有三种直接:originNode,关联ClusterNode,链路EntranceNode,针对这几个node的区别上节已经做过说明,不清楚的读者打开上节node树形结构一看便知

流控效果对应四种,具体实现由TrafficShappingController的实现类完成

  1. 快速失败(DefaultController):如果是限制qps会抛出异常,线程数返回false不通过
  2. Warm Up(WarmUpController):预热,防止流量突然暴增,导致系统负载过重,有时候系统设置的最大负载是在理想状态达到的,当系统长时间处于冷却状态 需要通过一定时间的预热才能达到最大负载,比如跟数据库建立连接
  3. 排队等待(RateLimiterController):当前没有足够的令牌通过,会进行睡眠等待,直接能拿到足够的令牌数
  4. WarmUpRateLimiterController:第二种和第三种的结合

来看TrafficShappingController具体的实现类

DefaultController

public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;

    //阀值
    private double count;
    
    // 1=qps,0=ThreadNum
    private int grade;

    public DefaultController(double count, int grade) {
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 获取当前node的ThreadNum或QPS
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) { 
        // 设置当前流量为优先级和流控模式为QPS if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); // 算出拿到当前令牌数的等待时间(ms) waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); // OccupyTimeoutProperty.getOccupyTimeout = 500ms // 如果流量具有优先级,会获取未来的令牌数 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
            // 添加占用未来的QPS,会调用OccupiableBucketLeapArray.addWaiting(long time, int acquireCount) node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } // 控制的是线程数返回false return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps()); } private void sleep(long timeMillis) { try { Thread.sleep(timeMillis); } catch (InterruptedException e) { // Ignore. } } }

在上节讲滑动窗口的时候,还有一个秒维度的窗口OccupiableBucketLeapArray没有讲解,它同样继承LeapArray,但它还有额外的概念 未来占用,在DefaultController中当前令牌数不够并且流量具有优先级,那么会提前获取未来的令牌,因为阀值固定每秒能获取的令牌数也固定,既然占用了未来的令牌数,那等到时间到了这个未来时间点,当前可获取的令牌数=阀值—之前占用的令牌

OccupiableBucketLeapArray有个FutureBucketLeapArray就是来存储占用未来的窗口数据

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    private final FutureBucketLeapArray borrowArray;

    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        // This class is the original "CombinedBucketArray".
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

    @Override
    // LeapArray.currentWindow(long timeMillis)添加新窗口时会调用
    public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();
        // 已被之前占用,将之前占用的数据添加到当前新的窗口中
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            newBucket.reset(borrowBucket);
        }

        return newBucket;
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // Update the start time and reset value.
        w.resetTo(time);
        // 重置时当前窗口数据时,也需要考虑被之前占用的情况
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            w.value().reset();
            w.value().addPass((int)borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

    @Override
    public long currentWaiting() {
        // 获取当前时间被之前占用的qps
        borrowArray.currentWindow();
        long currentWaiting = 0;
        List<MetricBucket> list = borrowArray.values();

        for (MetricBucket window : list) {
            currentWaiting += window.pass();
        }
        return currentWaiting;
    }

    @Override
    public void addWaiting(long time, int acquireCount) {
        // 添加到未来窗口
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
    }
}

当新窗口添加或重置旧窗口数据都需要考虑之前占用的情况,然后把之前占用的窗口数据添加进去

RateLimiterController

跟Guava中SmoothBursty原理类似

public class RateLimiterController implements TrafficShapingController {

    // 最大等待时间
    private final int maxQueueingTimeMs;
    // 阀值
    private final double count;

    // 最新一次拿令牌的时间
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public RateLimiterController(int timeOut, double count) {
        this.maxQueueingTimeMs = timeOut;
        this.count = count;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();
        // 计算获取acquireCount需要多少毫秒
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // 预期获取令牌的时间
        long expectedTime = costTime + latestPassedTime.get();

        // 如果小于当前时间 则直接返回
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // 否则进行等待
            // Calculate the time to wait.
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                // oldTime 为当前拿到令牌的时长 是更新后的值
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

}

前面两种流控策略都比较简单,主要来看WarmUpController

WarmUpController

它参考了Guava中SmoothWarmingUp的设计,但具体策略不一样,先来看它的源码,后续讲解它们的不同

public class WarmUpController implements TrafficShapingController {

    // qps阈值
    protected double count;
    // 负载因子 用来控制预热速度 默认为3
    private int coldFactor;
    // 进入冷却状态的警戒线
    protected int warningToken = 0;
    // 最大的令牌数
    private int maxToken;
    // 斜率
    protected double slope;

    // 当前令牌容量
    protected AtomicLong storedTokens = new AtomicLong(0);
    // 上一次添加令牌时间
    protected AtomicLong lastFilledTime = new AtomicLong(0);

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        // maxToken = 200
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits- thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 当前窗口通过的qps
        long passQps = (long) node.passQps();

        // 上一个窗口的qps
        long previousQps = (long) node.previousPassQps();
        // 更新当前令牌数
        syncToken(previousQps);

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count 算出当前进入预热状态能得到最大的qps
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                // 这里之所以用count来判断而不是storedTokens来,是因为storedTokens是每次添加都是以count倍速来添加 count自然不会大过storedTokens
                return true;
            }
        }

        return false;
    }

    protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        // 因为每一个秒维度的总窗口间隔为1s,减去当前时间的毫秒数 即可得到当前窗口的开始时间
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            // 说明还处于当前窗口
            return;
        }
        // 进入到这里代表第一次进入新的窗口 需要添加当前令牌容量
        // 令牌数量的旧值
        long oldValue = storedTokens.get();
        // 算出新的令牌数 旧值+(根据上一个窗口的qps 算出从上次添加令牌的时间到当前时间需要添加的令牌数)
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            // 减去上个窗口的qps,然后设置storedTokens最新值
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 添加令牌的判断前提条件:
        // 当令牌的消耗程度远远低于警戒线的时候
        if (oldValue < warningToken) {
            // 当前令牌数小于警戒值 可正常添加令牌 这里只是添加令牌 并不考虑添加令牌后大于warningToken情况,因为后面拿令牌还会重新判断
            // currentTime - lastFilledTime.get() = 与上一次添加令牌间隔多少毫秒
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            // 如果当前passQps > (count / coldFactor) 也就是说当前系统消耗令牌速度大于冷却速度 则不需要继续添加令牌
            if (passQps < (int)count / coldFactor) {
                // 如果消耗速度小于冷却速度 则正常添加令牌
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

}

大致流程:
先根据当前窗口时间间隔时间添加令牌数,正常情况是以count*(last interval second) 来添加令牌数,
如果添加令牌的时候当前令牌容量已经达到警戒值,需要根据当前窗口passqps来判断 当前系统消耗令牌的速率是否大于冷却速率(也就是系统负载状态不需要再进行预热),大于冷却速率则不需要继续添加令牌
添加完令牌后减去上一个窗口的qps得到最新的令牌数, 再判断最新令牌数是否到了警戒值,到了警戒值,通过slope算出目前窗口能得到最大的qps,passQps+acquireCount 不允许大于它


我们再简单说说 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的区别

Guava在于控制获取令牌的速度,根据时间推进增加令牌,通过当前令牌容量判断获取令牌下一个时间点,如当前令牌容量超过了阀值 会进行预热 增加等待时长

Sentinel在于控制QPS,根据时间推进增加令牌,根据通过QPS减少令牌,如果QPS持续下降,当前storeTokens会持续增加,直到超过warningTokens阀值,越过阀值会根据进入预热状态后能提供最大的QPS来做限制

WarmUpRateLimiterController

看完RateLimiterController WarmUpController实现后,再来看这个就非常简单了,看名称就知道是它两的结合

public class WarmUpRateLimiterController extends WarmUpController {

    private final int timeoutInMs;
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
        super(count, warmUpPeriodSec, coldFactor);
        this.timeoutInMs = timeOutMs;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();

        long restToken = storedTokens.get();
        long costTime = 0;
        long expectedTime = 0;
        if (restToken >= warningToken) {
            // 在预热范围拿到的token
            long aboveToken = restToken - warningToken;

            // current interval = restToken*slope+1/count
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        // 拿到令牌的时间
        expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
}

 

加载全部内容

相关教程
猜你喜欢
用户评论