Java多线程 Worker Thread Java多线程之Worker Thread模式
冬日毛毛雨 人气:0一.Worker Thread模式
Worker
的意思是工作的人,在Worker Thread
模式中,工人线程Worker thread
会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来。
Worker Thread
模式也被成为Background Thread
(背景线程)模式,另外,如果从保存多个工人线程的场所这一点看,我们也可以称这种模式为Thread Pool
模式。
二 .Worker Thread模式中的角色
1.Client(委托者)
创建表示工作请求的Request
并将其传递给Channel
。在示例程序中,ClientThread
相当于该角色。
2.Channel(通信线路)
Channel
角色接受来自于Client
的Request
,并将其传递给Worker
。在示例程序中,Channel
相当于该角色。
3.Worker(工人)
Worker
角色从Channel
中获取Request
,并进行工作,当一项工作完成后,它会继续去获取另外的Request
,在示例程序中,WorkerThread
相当于该角色。
4.Request(请求)
Request
角色是表示工作的角色,Request
角色中保存了进行工作所必须的信息,在示例程序中,Request
相当于该角色。
三.Worker Thread使用场景
想象一个场景,一个工厂在生产玩具,在一个车间里,有几个工人,每次生产部件准备好车间外的人就将部件放到车间的一个桌子上,工人每次做完一个玩具就从桌子上取部件。在这里,注意到,部件并不是直接交给工人的,另外一点,工人并不是做完一个部件就回家换个新人,后者在现实有点滑稽,但是在程序中却对应一个典型的线程使用方法:线程池。
所谓线程池,就是对线程的复用,当线程执行完任务之后就继续取其他任务执行,而不是销毁启动新线程执行其他任务。因为线程的启动对于系统性能开销比较大,所以这样对于系统性能的提高很有好处。
四.Worker Thread模式程序示例
首先是请求,即玩具的部件
public class Request { private final String name; private final int number; public Request(String name, int number) { this.name = name; this.number = number; } public void execute(){ System.out.println(Thread.currentThread().getName()+" executed "+this); } @Override public String toString() { return "Request=> " + "No." + number + " Name." + name; } }
也就是拥有name
和number
并且execute
的时候打印出字段的一个简单类。
ClientThread
,负责将请求放入RequestQueue
中,即将部件放到桌子上。
public class ClientThread extends Thread { private static final Random random = new Random(System.currentTimeMillis()); private final Channel channel; public ClientThread(String name, Channel channel) { super(name); this.channel = channel; } @Override public void run() { try { for (int i = 0; true; i++) { Request request = new Request(getName(),i); this.channel.put(request); Thread.sleep(random.nextInt(1_000)); } } catch (Exception e) { } } }
Channel类,可以当做车间
public class Channel { private final static int MAX_REQUEST = 100; private final Request[] requestQueue; private final WorkerThread[] workerPool; private int head; private int tail; private int count; public Channel(int workers) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; this.workerPool = new WorkerThread[workers]; this.init(); } private void init() { for (int i = 0; i < workerPool.length; i++) { workerPool[i] = new WorkerThread("Worker-" + i, this); } } /** * push switch to start all of worker to work */ public void startWorker() { Arrays.asList(workerPool).forEach(WorkerThread::start); // List<WorkerThread> workerThreads = Arrays.asList(workerPool); // // workerThreads.stream().forEach(WorkerThread::start); } public synchronized void put(Request request) { while (count >= requestQueue.length) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.requestQueue[tail] = request; this.tail = (tail + 1) % requestQueue.length; this.count++; this.notifyAll(); } public synchronized Request take() { while (count <= 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Request request = this.requestQueue[head]; this.head = (this.head + 1) % this.requestQueue.length; this.count--; this.notifyAll(); return request; } }
Requestqueue
可以当做桌子,是一个数量有限的请求队列。threadPool
是一个工人线程的数组,这就是一个线程池。在这里提供了putRequest
和takeRequest
方法,分别是往请求队列放入请求和取出请,这里使用了上一篇博文讲到的生产者消费者模式 java
多线程设计模式之消费者生产者模式。确保了WorkerThread
和ClientThread
之间可以友好合作。
工人线程:
public class WorkerThread extends Thread { private static final Random random = new Random(System.currentTimeMillis()); private final Channel channel; public WorkerThread(String name, Channel channel) { super(name); this.channel = channel; } @Override public void run() { while (true) { channel.take().execute(); try { Thread.sleep(random.nextInt(1_000)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
这里就是一个不断从请求队列中取出请求然后执行请求的过程,保证了工人线程的复用,并不会执行完一个请求任务就销毁。
最后是Main:
public class WorkerClient { public static void main(String[] args) { final Channel channel = new Channel(5); channel.startWorker(); new ClientThread("Alex", channel).start(); new ClientThread("Jack", channel).start(); new ClientThread("William", channel).start(); } }
结果:
Worker-4 executed Request=> No.0 Name.Alex
Worker-2 executed Request=> No.0 Name.Jack
Worker-3 executed Request=> No.0 Name.William
Worker-4 executed Request=> No.1 Name.Jack
Worker-0 executed Request=> No.1 Name.William
Worker-3 executed Request=> No.2 Name.Jack
Worker-2 executed Request=> No.1 Name.Alex
Worker-4 executed Request=> No.2 Name.William
Worker-1 executed Request=> No.3 Name.Jack
Worker-3 executed Request=> No.2 Name.Alex
Worker-4 executed Request=> No.3 Name.William
Worker-0 executed Request=> No.4 Name.Jack
Worker-0 executed Request=> No.3 Name.Alex
Worker-1 executed Request=> No.5 Name.Jack
Worker-3 executed Request=> No.4 Name.William
Worker-1 executed Request=> No.6 Name.Jack
Worker-2 executed Request=> No.4 Name.Alex
Worker-3 executed Request=> No.7 Name.Jack
Worker-0 executed Request=> No.5 Name.William
Worker-1 executed Request=> No.5 Name.Alex
Worker-4 executed Request=> No.8 Name.Jack
Worker-2 executed Request=> No.6 Name.Alex
Worker-0 executed Request=> No.7 Name.Alex
Worker-4 executed Request=> No.8 Name.Alex
Worker-2 executed Request=> No.6 Name.William
省略...
可以看出线程执行任务的线程就是WorkerThread1,2,3,4,5
五个,它们不断执行来自ClientThread Alex
,Jack
,William
的请求任务。
加载全部内容