Java 阻塞队列 Java并发编程之阻塞队列深入详解
春风~十一载 人气:01. 什么是阻塞队列
阻塞队列是一种特殊的队列,和数据结构中普通的队列一样,也遵守先进先出的原则同时,阻塞队列是一种能保证线程安全的数据结构,并且具有以下两种特性:当队列满的时候,继续向队列中插入元素就会让队列阻塞,直到有其他线程从队列中取走元素;当队列为空的时候,继续出队列也会让队列阻塞,直到有其他线程往队列中插入元素
补充:线程阻塞的意思指代码此时不会被执行,即操作系统在此时不会把这个线程调度到CPU上去执行了
2. 阻塞队列的代码使用
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.BlockingDeque; public class Test { public static void main(String[] args) throws InterruptedException { //不能直接newBlockingDeque,因为它是一个接口,要向上转型 //LinkedBlockingDeque内部是基于链表方式来实现的 BlockingDeque<String> queue=new LinkedBlockingDeque<>(10);//此处可以指定一个具体的数字,这里的的10代表队列的最大容量 queue.put("hello"); String elem=queue.take(); System.out.println(elem); elem=queue.take(); System.out.println(elem); } }
注意: put方法带有阻塞功能,但是offer不具有,所以一般用put方法(能使用offer方法的原因是 BlockingDeque
继承了Queue
)
打印结果如上所示,当打印了hello后,队列为空,代码执行到elem=queue.take();
就不会继续往下执行了,此时线程进入阻塞等待状态,什么也不会打印了,直到有其他线程给队列中放入新的元素为止
3. 生产者消费者模型
生产者消费者模型是在服务器开发和后端开发中比较常用的编程手段,一般用于解耦合和削峰填谷。
高耦合度:两个代码模块的关联关系比较高
高内聚:一个代码模块内各个元素彼此结合的紧密
因此,我们一般追求高内聚低耦合,这样会加快执行效率,而使用生产者消费者模型就可以解耦合
(1)应用一:解耦合
我们以实际生活中的情况为例,这里有两台服务器:A服务器和B服务器,当A服务器传输数据给B时,要是直接传输的话,那么不是A向B推送数据,就是B从A中拉取数据,都是需要A和B直接交互,所以A和B存在依赖关系(A和B的耦合度比较高)。未来如果服务器需要扩展,比如加一个C服务器,让A给C传数据,那么改动就比较复杂,且会降低效率。这时我们可以加一个队列,这个队列为阻塞队列,如果A把数据写到队列里,B从中取,那么队列相当于是中转站(或者说交易场所),A相当于生产者(提供数据),B相当于消费者(接收数据),此时就构成了生产者消费者模型,这样会让代码耦合度更低,维护更方便,执行效率更高。
在计算机中,生产者充当其中一组线程,而消费者充当另一组线程,而交易场所就可以使用阻塞队列了
(2)应用二:削峰填谷
实际生活中
在河道中大坝算是一个很重要的组成部分了,如果没有大坝,大家试想一下结果:当汛期来临后上游的水很大时,下游就会涌入大量的水发生水灾让庄稼被淹没;而旱期的话下游的水会很少可能会引发旱灾。若有大坝的话,汛期时大坝把多余的水存到大坝中,关闸蓄水,让上游的水按一定速率往下流,避免突然一波大雨把下游淹了,这样下游不至于出现水灾。旱期时大坝把之前储存好的水放出来,还是让让水按一定速率往下流,避免下流太缺水,这样既可以避免汛期发生洪涝又可以避免旱期发生旱灾了。
峰:相当于汛期
谷:相当于旱期
计算机中
这样的情况在计算机中也是很典型的,尤其是在服务器开发中,网关通常会把互联网中的请求转发给业务服务器,比如一些商品服务器,用户服务器,商家服务器(存放商家的信息),直播服务器。但因为互联网过来的请求数量是多是少不可控,相当于上游的水,如果突然来了一大波请求,网关即使能扛得住,后续的很多服务器收到很多请求也就会崩溃(处理一个请求涉及到一系列的数据库操作,因为数据库相关操作效率本身比较低,这样请求多了就处理不过来了,因此就会崩溃)
所以实际情况中网关和业务服务器之间往往用一个队列来缓冲,这个队列就是阻塞队列(交易场所),用这个队列来实现生产者(网关)消费者(业务服务器)模型,把请求缓存到队列中,后面的消费者(业务服务器)按照自己固定的速率去读请求。这样当请求很多时,虽然队列服务器可能会稍微受到一定压力,但能保证业务服务器的安全。
(3)相关代码
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class TestDemo { public static void main(String[] args) { // 使用一个 BlockingQueue 作为交易场所 BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); // 此线程作为消费者 Thread customer = new Thread() { @Override public void run() { while (true) { // 取队首元素 try { Integer value = queue.take(); System.out.println("消费元素: " + value); } catch (InterruptedException e) { e.printStackTrace(); } } } }; customer.start(); // 此线程作为生产者 Thread producer = new Thread() { @Override public void run() { for (int i = 1; i <= 10000; i++) { System.out.println("生产了元素: " + i); try { queue.put(i); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; producer.start(); try { customer.join(); producer.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
打印如上(此代码是让生产者通过sleep每过1秒生产一个元素,而消费者不使用sleep,所以每当生产一个元素时,消费者都会立马消费一个元素)
4.阻塞队列和生产者消费者模型功能的实现
在学会如何使用BlockingQueue
后,那么如何自己去实现一个呢?
主要思路:
- 1.利用数组
- 2.head代表队头,tail代表队尾
- 3.head和tail重合后到底是空的还是满的判断方法:专门定义一个size记录当前队列元素个数,入队列时size加1出队列时size减1,当size为0表示空,为数组最大长度就是满的(也可以浪费一个数组空间用head和tail重合表示空,用tail+1和head重合表示满,但此方法较为麻烦,上一个方法较为直观,因此我们使用上一个方法)
public class Test2 { static class BlockingQueue { private int[] items = new int[1000]; // 此处的1000相当于队列的最大容量, 此处暂时不考虑扩容的问题. private int head = 0;//定义队头 private int tail = 0;//定义队尾 private int size = 0;//数组大小 private Object locker = new Object(); // put 用来入队列 public void put(int item) throws InterruptedException { synchronized (locker) { while (size == items.length) { // 队列已经满了,阻塞队列开始阻塞 locker.wait(); } items[tail] = item; tail++; // 如果到达末尾, 就回到起始位置. if (tail >= items.length) { tail = 0; } size++; locker.notify(); } } // take 用来出队列 public int take() throws InterruptedException { int ret = 0; synchronized (locker) { while (size == 0) { // 对于阻塞队列来说, 如果队列为空, 再尝试取元素, 就要阻塞 locker.wait(); } ret = items[head]; head++; if (head >= items.length) { head = 0; } size--; // 此处的notify 用来唤醒 put 中的 wait locker.notify(); } return ret; } } public static void main(String[] args) throws InterruptedException { BlockingQueue queue = new BlockingQueue(); // 消费者线程 Thread consumer = new Thread() { @Override public void run() { while (true) { try { int elem = queue.take(); System.out.println("消费元素: " + elem); } catch (InterruptedException e) { e.printStackTrace(); } } } }; consumer.start(); // 生产者线程 Thread producer = new Thread() { @Override public void run() { for (int i = 1; i < 10000; i++) { System.out.println("生产元素: " + i); try { queue.put(i); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; producer.start(); consumer.join(); producer.join(); } }
运行结果如上。
注意:
- 1.wait和notify的正确使用
- 2.put和take都会产生阻塞情况,但阻塞条件是对立的,wait不会同时触发(put唤醒take阻塞,take唤醒put阻塞)
加载全部内容