亲宝软件园·资讯

展开

深入了解Android Okio的超时机制

程序员小北 人气:0

Okio是一个IO库,底层基于Java原生的输入输出流实现。但原生的输入输出流并没有提供超时的检测机制。而Okio实现了这个功能。建议读者先阅读 Android | 彻底理解 Okio 之源码篇 ,然后再阅读本篇内容会更好理解。

Timeout 类的设计

探讨超时机制,首先要了解Timeout这个类。Timeout实现了Okio的同步超时检测,这里的同步指的是“任务执行”和“超时检测”是同步的,有顺序的。同步超时不会直接中断任务执行,它首先会检查是否发生超时,然后决定是否中断任务执行。throwIfReached就是一个同步超时检测的方法。

理解 timeout 与 deadline 的区别

timeout中文意为“超时”,deadline中文意为“最后期限”,它们是有明显区别的。 Timeout类中有一系列的timeoutXxx方法,timeoutXxx是用来设置**一次操作完成的最大等待时间。若这个操作在等待时间内没有结束,则认为超时。 deadlineXxx系列方法则是用来设置一项任务完成的最大等待时间。**意味着在未来多长时间内,需要将这项任务完成,否则认为超时。它可能包含一次或多次的操作。

读取文件的例子

回顾下之前Okio读取文件例子。

public void readFile() {
    try {
        FileInputStream fis = new FileInputStream("test.txt");
        okio.Source source = Okio.source(fis);
        BufferedSource bs = Okio.buffer(source);
        source.timeout().deadline(1, TimeUnit.MILLISECONDS);
        String res = bs.readUtf8();
        System.out.println(res);
    } catch (Exception e){
        e.printStackTrace();
    }
}

在这个例子中,我们使用deadline设置了超时时间为1ms,这意味着从现在开始,读取文件的这项任务,必须在未来的1ms内完成,否则认为超时。而读取文件的这项任务,就包含了多次的文件读取操作。

摇骰子的例子

我们再来看下面这个摇骰子的程序。Dice是一个骰子类,roll方法表示摇骰子,摇出来的点数latestTotal不会超过12。rollAtFixedRate会开启一个线程,每隔一段时间调用roll方法摇一次骰子。awaitTotal方法会当骰子的点数与我们传递进去的total值一样或者超时而结束。

private class Dice {
    Random random = new Random();
    int latestTotal;

    // 摇骰子
    public synchronized void roll() {
        latestTotal = 2 + random.nextInt(6) + random.nextInt(6);
        System.out.println("Rolled " + latestTotal);
        notifyAll();
    }

    // 开启一个线程,每隔一段时间执行 roll 方法
    public void rollAtFixedRate(int period, TimeUnit timeUnit) {
      Executors.newScheduledThreadPool(0).scheduleAtFixedRate(new Runnable() {
        public void run() {
          roll();
         }
      }, 0, period, timeUnit);
    }

    // 超时检测
    public synchronized void awaitTotal(Timeout timeout, int total) throws InterruptedIOException {
      while (latestTotal != total) {
        timeout.waitUntilNotified(this);
      }
   }
}

timeout()是一个测试骰子类的方法,在主线程中运行。该程序设置每隔3s摇一次骰子,主线程设置超时时间为6s,期望摇到的点数是20。因为设置的超时是timeoutXxx系列的方法,所以这里超时的意思是“只要我摇一次骰子的时间不超过6s,那么我就不会超时,可以一直摇骰子”。因为摇出骰子的最大点数是12,而期望值是20,永远也摇不出来20这个点数,且摇一次骰子的时间是3s多,也不满足超时的时间。所以主线程就会一直处于等待状态。

public void timeout(){
    try {
        Dice dice = new Dice();
        dice.rollAtFixedRate(3, TimeUnit.SECONDS);
        Timeout timeout = new Timeout();
        timeout.timeout(6, TimeUnit.SECONDS);
        dice.awaitTotal(timeout, 20);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

现在将timeout()方法修改一下,将timeout.timeout(6, TimeUnit.SECONDS)改为timeout.deadline(6, TimeUnit.SECONDS),之前我们说过deadlineXxx设置的超时**意味着在未来多长时间内,需要将这项任务完成。**在摇骰子这里的意思就是“从现在开始,我只可以摇6s的骰子。超过这个时间你还在摇,则认为超时”。它关注的是可以摇多久的骰子,而不是摇一次骰子不能超过多久的时间。

public void timeout(){
    try {
        Dice dice = new Dice();
        dice.rollAtFixedRate(3, TimeUnit.SECONDS);
        Timeout timeout = new Timeout();
        timeout.deadline(6, TimeUnit.SECONDS);
        dice.awaitTotal(timeout, 20);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

上述程序,主线程会在6s后因超时而停止等待,结束运行。

等待直到唤醒

前面举了两个例子让大家理解Okio中timeoutdeadline的区别。在摇骰子的例子中用到了waitUntilNotified这个方法来检测超时,中文意思为“等待直到唤醒”。也就是Java多线程中经典的“等待-唤醒”机制,该机制常常用于多线程之间的通信。调用waitUntilNotified方法的线程会一直处于等待状态,除非被唤醒或者因超时而抛出异常。下面是该方法的源码。

public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
    try {
      boolean hasDeadline = hasDeadline();
      long timeoutNanos = timeoutNanos();

      // 若没有设置 deadline && timeout,则一直等待直到唤醒
      if (!hasDeadline && timeoutNanos == 0L) {
        monitor.wait(); // There is no timeout: wait forever.
        return;
      }

      // Compute how long we'll wait.
      // 计算等待的时长,若同时设置了deadline 和 timeout,则 deadline 优先
      long waitNanos;
      long start = System.nanoTime();
      if (hasDeadline && timeoutNanos != 0) {
        long deadlineNanos = deadlineNanoTime() - start;
        waitNanos = Math.min(timeoutNanos, deadlineNanos);
      } else if (hasDeadline) {
        waitNanos = deadlineNanoTime() - start;
      } else {
        waitNanos = timeoutNanos;
      }

      // Attempt to wait that long. This will break out early if the monitor is notified.
      long elapsedNanos = 0L;
      if (waitNanos > 0L) {
        long waitMillis = waitNanos / 1000000L;
        // 等待 waitNanos
        monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
        // 计算从等待 waitNanos 到唤醒所用时间
        elapsedNanos = System.nanoTime() - start;
      }

      // Throw if the timeout elapsed before the monitor was notified.
      // 若等待了 waitNanos 还没唤醒,认为超时
      if (elapsedNanos >= waitNanos) {
        throw new InterruptedIOException("timeout");
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt(); // Retain interrupted status.
      throw new InterruptedIOException("interrupted");
    }
}

查看waitUntilNotified的源码,我们发现该方法基于“等待-通知”机制,添加了多线程之间的超时检测功能,一个线程用来执行具体的任务,一个线程调用该方法来检测超时。在Okio中的管道就使用了waitUntilNotified这个方法。

AsyncTimeout 类的设计

AsyncTimeout内部维护一个单链表,节点的类型是AsyncTimeout,以到超时之前的剩余时间升序排序,即超时的剩余时间越大,节点就在链表越后的位置。对链表的操作,使用了synchronized关键字加类锁,保证在同一时间,只有一个线程可以对链表进行修改访问操作。

AsyncTimeout实现了Okio的异步超时检测。这里的异步指的是“任务执行”和“超时检测”是异步的,在执行任务的同时,也在进行任务的“超时检测”。你会觉得这和上面摇骰子的例子很像,一个线程执行任务,一个线程检测超时。事实上,AsyncTimeout也正是这样实现的,它内部的Watchdog线程就是用来检测超时的。当我们要对一次操作或一项任务设置超时,使用成对的enter()exit(),模板代码如下。

enter();
// do something
exit();

若上面do something的操作超时,timedOut()方法将会在Watchdog线程被回调。可以看见,这种包裹性的模板代码,灵活性很大,我们几乎可以在其中放置任何想要检测超时的一个或多个操作。

AsyncTimeout 成员变量

下面是AsyncTimeout类主要的成员变量。

private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
static @Nullable AsyncTimeout head;
private boolean inQueue;
private @Nullable AsyncTimeout next;
private long timeoutAt;

AsyncTimeout 成员方法

scheduleTimeout 有序的将超时节点加入到链表中

scheduleTimeout方法可以将一个超时节点按照超时的剩余时间有序的插入到链表当中。注意该方法使用synchronized修饰,是一个同步方法,可以保证对链表的操作是线程安全的。

private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
    // 若 head 节点为 null, 初始化 head 并启动 Watchdog 线程
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

    // 计算 node 节点的 timeoutAt 值
    long now = System.nanoTime();
    if (timeoutNanos != 0 && hasDeadline) {
      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
      // Math.min() is undefined for absolute values, but meaningful for relative ones.
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      node.timeoutAt = node.deadlineNanoTime();
    } else {
      throw new AssertionError();
    }

    // Insert the node in sorted order.
    // 返回 node 节点的超时剩余时间
    long remainingNanos = node.remainingNanos(now);
    // 从 head 节点开始遍历链表, 将 node 节点插入到合适的位置
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      // 若当前遍历的节点下一个节点为 null 或者 node 节点的超时剩余时间小于下一个节点
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        // 将 node 节点插入到链表
        node.next = prev.next;
        prev.next = node;
        // 若当前遍历的节点是 head, 唤醒 watchdog 线程
        if (prev == head) {
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break;
      }
    }
}

Watchdog 线程

scheduleTimeout方法中,若headnull,则会初始化head并启动Watchdog线程。Watchdog是一个守护线程,因此它会随着JVM进程的结束而结束。前面我们说过Watchdog线程是用来检测超时的,它会逐个检查链表中的超时节点是否超时,直到链表中所有节点检查完毕后结束运行。

private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          // 超时的节点
          AsyncTimeout timedOut;
          // 加锁,同步代码块
          synchronized (AsyncTimeout.class) {
            // 等待节点超时
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            // 当前该节点没有超时,继续检查
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            // 链表中已经没有超时节点,结束运行
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          // timedOut 节点超时,回调 timedOut() 方法
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
}

awaitTimeout 等待节点超时

Watchdog线程中会调用awaitTimeout方法来等待检测的节点超时,若检测的节点没有超时,该方法返回null。否则返回超时的节点。

static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    // 检测的节点
    AsyncTimeout node = head.next;

    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    // 若链表为空
    if (node == null) {
      long startNanos = System.nanoTime();
      // Watchdog 线程等待 60s,期间会释放类锁
      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
      // 等待 60s 后若链表还为空则返回 head,否则返回 null
      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
              ? head  // The idle timeout elapsed.
              : null; // The situation has changed.
    }
    // node 节点超时剩余的时间
    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    // node 节点超时剩余的时间 > 0,说明 node 还未超时,继续等待 waitNanos 后返回 null
    if (waitNanos > 0) {
      // Waiting is made complicated by the fact that we work in nanoseconds,
      // but the API wants (millis, nanos) in two arguments.
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }

    // The head of the queue has timed out. Remove it.
    // node 节点超时了,将 node 从链表中移除并返回
    head.next = node.next;
    node.next = null;
    return node;
}

enter 进入超时检测

分析完上面三个方法后再来看enter就非常的简单了,enter内部调用了scheduleTimeout方法来添加一个超时节点到链表当中,而Watchdog线程随即会开始检测超时。

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    // 更新 inQueue 为 true
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
}

exit 退出超时检测

前面说过,enterexit在检测超时是需要成对出现的。它们之间的代码就是需要检测超时的代码。exit方法的返回值表示enterexit中间检测的代码是否超时。

public final boolean exit() {
    if (!inQueue) return false;
    // 更新 inQueue 为 false
    inQueue = false;
    return cancelScheduledTimeout(this);
}

cancelScheduledTimeout方法会将当前的超时节点从链表中移除。为了保证对链表的操作是线程安全的,该方法也是一个同步方法。我们知道在awaitTimeout方法中,若某个节点超时了会将它从链表中移除。那么当调用cancelScheduledTimeout发现node不在链表中,则一定表明node超时了。

private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // Remove the node from the linked list.
    // 若 node 在链表中,将其移除。
    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
      if (prev.next == node) {
        prev.next = node.next;
        node.next = null;
        return false;
      }
    }

    // The node wasn't found in the linked list: it must have timed out!
    // node 不在链表中,则 node 一定超时了,返回 true
    return true;
}

总结

本文详细讲解了Okio中超时机制的实现原理,主要是TimeoutAsyncTimeout类的源码分析与解读。相信大家已经掌握了这部分知识,现总结一下文中要点。

加载全部内容

相关教程
猜你喜欢
用户评论