Java阻塞队列
码出地球 人气:0Java常用阻塞队列
ArrayBlockingQueue
内部由一个固定长度的数组来实现阻塞队列
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); /** 定长数组 */ this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
提供了两个入队操作方法,offer()和put()
offer方法不会阻塞,但有返回值,如果队列满了,那么直接返回false,否则插入数据并返回true。
/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
put()会在队列满了的时候会阻塞生产者线程,知道有消费者线程消费后将其唤醒。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 出队后唤醒生产者线程 return x; }
LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
使用 BlockingQueue 实现生产者消费者问题
public class ProducerConsumer { private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5); private static class Producer extends Thread { @Override public void run() { try { queue.put("product"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.print("produce.."); } } private static class Consumer extends Thread { String product = queue.take(); System.out.print("consume.."); } public static void main(String[] args) { for (int i = 0; i < 2; i++) { Producer producer = new Producer(); producer.start(); for (int i = 0; i < 5; i++) { Consumer consumer = new Consumer(); consumer.start(); for (int i = 0; i < 3; i++) { output: produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
加载全部内容