亲宝软件园·资讯

展开

使用JCTools实现Java并发程序 怎样使用JCTools实现Java并发程序

老K 人气:0
想了解怎样使用JCTools实现Java并发程序的相关内容吗,老K在本文为您仔细讲解使用JCTools实现Java并发程序的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:Java,并发程序,JCTools实现Java并发,下面大家一起来学习吧。

概述

在本文中,我们将介绍JCTools(Java并发工具)库。

简单地说,这提供了许多适用于多线程环境的实用数据结构。

非阻塞算法

传统上,在可变共享状态下工作的多线程代码使用锁来确保数据一致性和发布(一个线程所做的更改对另一个线程可见)。

这种方法有许多缺点:

另一种方法是使用非阻塞算法,即任何线程的故障或挂起都不会导致另一个线程的故障或挂起的算法。

如果所涉及的线程中至少有一个能够在任意时间段内取得进展,即在处理过程中不会出现死锁,则非阻塞算法是无锁的。

此外,如果保证每个线程的进程,这些算法是无等待的。

下面是一个非阻塞堆栈示例,它定义了基本状态:

public class ConcurrentStack<E> {

  AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

  private static class Node <E> {
    public E item;
    public Node<E> next;

    // standard constructor
  }
}

还有一些API方法:

public void push(E item){
  Node<E> newHead = new Node<E>(item);
  Node<E> oldHead;
  
  do {
    oldHead = top.get();
    newHead.next = oldHead;
  } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
  Node<E> oldHead;
  Node<E> newHead;
  do {
    oldHead = top.get();
    if (oldHead == null) {
      return null;
    }
    newHead = oldHead.next;
  } while (!top.compareAndSet(oldHead, newHead));
  
  return oldHead.item;
}

我们可以看到,该算法使用细粒度比较和交换(CAS)指令,并且是无锁的(即使多个线程调用top.compareAndSet()同时,它们中的一个保证会成功)但不能无等待,因为不能保证CAS最终会对任何特定线程成功。

依赖

首先,让我们将JCTools依赖项添加到pom.xml文件:

<dependency>
  <groupId>org.jctools</groupId>
  <artifactId>jctools-core</artifactId>
  <version>2.1.2</version>
</dependency>

请注意,Maven Central上提供了最新的可用版本。

JCTools队列

该库提供了许多队列以在多线程环境中使用,即一个或多个线程以线程安全的无锁方式写入队列,一个或多个线程以线程安全的无锁方式从队列中读取。

所有队列实现的通用接口是org.jctools.queues.MessagePassingQueue。

队列类型
所有队列都可以根据其生产者/消费者策略进行分类:

需要注意的是,在内部没有策略检查,也就是说,如果使用不正确,队列可能会无声地发生故障。

例如,下面的测试从两个线程填充单个生产者队列并通过,即使不能保证使用者看到来自不同生产者的数据:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

队列实现

总结以上分类,以下是JCTools队列列表:

原子队列

前面提到的所有队列都使用sun.misc.Unsafe. 然而,随着java9和JEP-260的出现,这个API在默认情况下变得不可访问。

因此,有其他队列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能较差)而不是sun.misc.Unsafe.

它们是从上面的队列生成的,它们的名称中间插入了单词Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。

如果可能,建议使用“常规”队列,并且仅在sun.misc.Unsafe像Hot Java9+和JRockit一样被禁止/无效。

容量

所有JCTools队列也可能具有最大容量或未绑定。当队列已满且受容量限制时,它将停止接受新元素。

在以下示例中,我们:

请注意,为了可读性,删除了几个代码语句。

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
  IntStream.range(0, queue.capacity()).forEach(i -> {
    assertThat(queue.offer(i)).isTrue();
  });
  assertThat(queue.offer(queue.capacity())).isFalse();
  startConsuming.countDown();
  awakeProducer.await();
  assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
 IntStream.range(0, 17).boxed().collect(toSet()));

其他数据结构工具

JCTools还提供了一些非队列数据结构。

它们都列在下面:

性能测试

让我们使用JMH来比较JDK的ArrayBlockingQueue和JCTools队列的性能。JMH是Sun/Oracle JVM gurus提供的一个开源微基准框架,它保护我们不受编译器/JVM优化算法的不确定性的影响。

请注意,为了提高可读性,下面的代码段遗漏了几个语句。

public class MpmcBenchmark {

  @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
  public volatile String implementation;

  public volatile Queue<Long> queue;

  @Benchmark
  @Group(GROUP_NAME)
  @GroupThreads(PRODUCER_THREADS_NUMBER)
  public void write(Control control) {
    // noinspection StatementWithEmptyBody
    while (!control.stopMeasurement && !queue.offer(1L)) {
      // intentionally left blank
    }
  }

  @Benchmark
  @Group(GROUP_NAME)
  @GroupThreads(CONSUMER_THREADS_NUMBER)
  public void read(Control control) {
    // noinspection StatementWithEmptyBody
    while (!control.stopMeasurement && queue.poll() == null) {
      // intentionally left blank
    }
  }
}

结果:

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

我们可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了两倍。

使用JCTools的缺点

使用JCTools有一个重要的缺点——不可能强制正确使用库类。例如,考虑在我们的大型成熟项目中开始使用MpscArrayQueue的情况(注意,必须有一个使用者)。

不幸的是,由于项目很大,有可能有人出现编程或配置错误,现在从多个线程读取队列。这个系统看起来像以前一样工作,但现在有可能消费者错过了一些信息。这是一个真正的问题,可能会有很大的影响,是很难调试。

理想情况下,应该可以运行具有特定系统属性的系统,该属性强制JCTools确保线程访问策略。例如,本地/测试/暂存环境(而不是生产环境)可能已启用它。遗憾的是,JCTools没有提供这样的属性。

另一个需要考虑的问题是,尽管我们确保JCTools比JDK的对应工具快得多,但这并不意味着我们的应用程序获得了与我们开始使用自定义队列实现时相同的速度。大多数应用程序不会在线程之间交换很多对象,而且大多是I/O绑定的。

结论

现在,我们对JCTools提供的实用程序类有了基本的了解,并了解了它们在重载下与JDK的对应类相比的性能。

总之,只有当我们在线程之间交换大量对象时,才有必要使用该库,即使这样,也有必要非常小心地保留线程访问策略。

以上示例的完整源代码地址:https://github.com/eugenp/tutorials/tree/master/libraries-5

JCTools git地址:https://github.com/JCTools/JCTools

加载全部内容

相关教程
猜你喜欢
用户评论