亲宝软件园·资讯

展开

dubbo与trivial超时机制的深入思考

Allen没有青春 人气:0

说在前面

trivial是根据之前设计的RPC框架而来的(还在增进当中),其中较为不同的一个点为,在客户端去掉了业务线程池,因为既然都要等待,不必要再加一层。

 

进入正题

有在网上看到这样的信息,“之前有简单提到过, dubbo默认采用了netty做为网络组件,它属于一种NIO的模式。消费端发起远程请求后,线程不会阻塞等待服务端的返回,而是马上得到一个ResponseFuture,消费端通过不断的轮询机制判断结果是否有返回。因为是通过轮询,轮询有个需要特别注要的就是避免死循环,所以为了解决这个问题就引入了超时机制,只在一定时间范围内做轮询,如果超时时间就返回超时异常”。

我认为这种说法是错误。

1.以上说法只关注结果,但是如果只关注结果的话何不阻塞等待?还需要轮询判断,耗费cpu资源?超时机制绝不是为了让轮询在一定时间内结束!

问题1:超时机制有什么作用?

2.上述说“消费端通过不断的轮询机制判断结果是否有返回”,没有指明是消费端的什么线程,但是容易让人误以为是调用者线程(下称caller)。而事实上是由一个deamon线程去扫描判断所有的caller发起的调用是否超时。

问题2:为什么不让caller自己去轮询?

 

问题1个人观点:

在正常情况下,即caller发起调用,而后只需阻塞等待服务提供方的结果即可,因为在正常情况下是能收到的。

那要是因为某些原因而收不到呢?比如,服务提供方的处理线程意外结束了,那caller岂不是要一直等下去?

所以要有超时。

dubbo中超时后重试的请求是路由到其他机器上的。咋一看合情合理,再细想大有学问(有可能是我想多了)。

除了刚刚说的因为处理线程意外结束使得caller得不到结果这种情况之外,有些人会想到另一种情况——在网络中丢失?这种情况也是不适合再发送到同一个机器的,因为有tcp的重传,这样你重试的请求若要到同一个机器,便到了协议栈同一个缓冲区,那么最先发送成功的依然是上一次的请求,再按正常情况,首次收到的依然是上一次请求的结果,相当于重试没有作用。

事实上,这只是我的猜想,对于tcp串行传输,并行传输什么的还没有去了解,这里只算是提出一个问题来思考,如果有错误还望指出!

 

问题2个人观点:

假设是由caller自己轮询(有10个),那么每个cpu时间片结束后,都会从运行态转到就绪态(同样有上下文的切换)。适合短时间轮询

假设是由超时扫描线程扫描,这10个caller直接一次进入java线程的等待状态(linux的阻塞态?),结束后由他人唤醒。适合较长时间轮询

前者每次状态切换耗费资源少,但次数多。

后者每次状态切换耗费资源多,但只有一次。

所以多短算短,多长算长呢?未经测试。

同样我并不知道dubbo是怎么考虑的,但我自己是这样想的,所以再次强调这是个人观点,可能有错误。

 

dubbo超时细节

超时扫描线程

static {
    Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); //扫描超时
    th.setDaemon(true);
    th.start();
}

 

DefaultFuture的get方法

@Override
public Object get() throws RemotingException {
    return get(timeout);
}

@Override
public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    if (!isDone()) {
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (!isDone()) { // wait应该在循环当中
                // 在调用的时候需要等待
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        
        if (!isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

 

扫描线程细节

private static class RemotingInvocationTimeoutScan implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    // 扫描DefaultFuture列表
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        // 如果future未完成且超时
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            Response timeoutResponse = new Response(future.getId());
                            // 设置超时状态
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

可以看到该线程用于扫描所有caller注册的调用信息,检查超时。值得注意的一个细节是,“Thread.sleep(30)”,也是在说明while(true)是不让出cpu的吗?

 

trivial超时细节

超时观察者watcher

private class Watcher extends Thread{
        @Override
        public void run() {
            while(!RPCClient.shutdown){//每次循环检查是否已经关闭,同样会让出cpu
                try {
                    CountDownNode head=waiterQueue.take();//阻塞获取头
                    if(System.currentTimeMillis()-head.createTime <RPCClient.timeout)
                        waiterQueue.add(head);//如果没有超时再加回到队尾
                    else{//如果超时了
                        long callerId=head.message.getCallerId();
                        long count=head.message.getCount();
                        if(countMap.get(callerId)==null
                                || countMap.get(callerId)!=count) continue;//实际上已经成功返回
                        if(head.retryNum>0){
                            head.retryNum--;
                            log.error("线程——"+callerId+" 第 "+count +" 次调用超时,即将进行第 "
                                    +(RPCClient.retryNum-head.retryNum)+" 次重试");
                            context.writeAndFlush(head.message);//重发信息
                            continue;
                        }
                        resultMap.put(callerId,"调用超时");
                        log.error("线程—— "+callerId+" 第 "+count
                                +"次调用超时,已重试 "+RPCClient.retryNum+" 次,即将返回超时提示");
                        LockSupport.unpark(waiterMap.get(callerId));
                        waiterMap.remove(callerId);
                        countMap.remove(callerId);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.info("超时观察者退出");
        }
    }

大致上是差不多的,都是要一个线程去扫描,但有一点较为不同的是,

dubbo的超时扫描线程虽然每次循环sleep(30),但即使没有caller发起调用也会一直扫描,耗费cpu资源;

而trivial则会阻塞地从阻塞队列中获取,如果没有caller发起调用则阻塞,不耗费cpu资源。

在频繁发起调用的时候两者差不多的,因为后者也不会总是进入阻塞,但在偶发调用时,或许trivial较好。当然取决于真实情况。

 

最后,如果有兴趣的话,可以了解一下这个平凡的RPC框架,https://github.com/AllenDuke/trivial。

加载全部内容

相关教程
猜你喜欢
用户评论