高并发之观察者模式
顾鹏pen 人气:11.Observable 类
这个类的主要作用是设计我们需要的观察值,和获取观察值的函数
public interface Observable { /** * 这里是写需要观察的变量和对于提取需要的观察函数。 */ enum Cycle{ STARTED,RUNNING,DONE,ERROR } Cycle getCycle(); void start(); void interrupt(); }
从这里可以看出我们需要观察的是线程的生命周期,因此有 STARTED,RUNNING,DONE,ERROR四个,同时还定义了获取状态的函数getCycle。
2.TaskLifecycle类
这个类的作用类似于一个响应器,也就是在我们需要observer的变量发生变化时,就会做出响应。
public interface TaskLifecycle<T> { /** * 这里实际上相当于观察者一旦观察到变化后进行的相应 * @param thread */ void onStart(Thread thread); void onRunning(Thread thread); void onFinish(Thread thread); void onError(Thread thread); class EmptyTaskLisfcycle<T> implements TaskLifecycle { @Override public void onStart(Thread thread) { } @Override public void onRunning(Thread thread) { } @Override public void onFinish(Thread thread) { } @Override public void onError(Thread thread) { } } }
从这里可以看出一个很有趣的东西,在这里设计了一个 EmptyTaskLisfcycle的类,这个类的作用就是在于可以重写 EmptyTaskLisfcycle,从而实现自定义,而且可以实现自定义。
3.Task类
这个类主要的作用是实现业务逻辑。
@FunctionalInterface public interface Task<T> { T call(); }
4. ObservableThread类
这个类是Thread的实现类,首先这个类需要实现Observable接口。
public class ObservableThread <T> extends Thread implements Observable {}
同时它还有三个成员
private final TaskLifecycle<T> lifecycle; private final Task<T> task; private Cycle cycle;
task是用于实现业务逻辑,因此一定是需要的。而生命周期的响应是可有可无的,因此提供两个构造函数,当lifecycle没有的时候,实现一个空的lifecycle
public ObservableThread(Task<T> task) { //一个emptyTaskLisfcycle的实现 this(new TaskLifecycle.EmptyTaskLisfcycle<>(), task); } public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) { super(); if (task == null) { throw new IllegalArgumentException("Ths task is required"); } this.lifecycle = lifecycle; this.task = task; }
而我们要实现一个cycle状态变化,需要一个update函数,这个函数是将各个类进行了一个交融。
private void update(Cycle cycle, T result, Exception e) { this.cycle = cycle; if (lifecycle == null) { return; } try { switch (cycle) { case STARTED: this.lifecycle.onStart(currentThread()); break; case RUNNING: this.lifecycle.onRunning(currentThread()); break; case DONE: this.lifecycle.onFinish(currentThread()); break; case ERROR: this.lifecycle.onError(currentThread()); break; } } catch (Exception ex) { ex.printStackTrace(); if (cycle == Cycle.ERROR) { throw ex; } } }
首先update 将成员变量的cycle改变,然后判断成员的cycle变成了什么,利用switch,执行onStart,onRunning,onFinish等进行回调。
最后将是Thread的run函数,run函数起到的作用便是跟踪变量,而不是执行业务逻辑,所有的业务逻辑放在Task里面。
@Override public void run() { System.out.println("Run函数执行了"); this.update(Cycle.STARTED, null, null); try { this.update(Cycle.RUNNING, null, null); T result = this.task.call(); this.update(Cycle.DONE, result, null); System.out.println("run函数结束了"); } catch (Exception e) { this.update(Cycle.ERROR, null, e); } }
5. 测试类
public class ObservableTest {
public static void main(String[] args) {
Task task = new Task() {
@Override
public Object call() {
System.out.println("Task 执行了");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished it ");
return null;
}
};
final TaskLifecycle<String> lifecycle = new TaskLifecycle.EmptyTaskLisfcycle<String >(){
@Override
public void onFinish(Thread thread) {
System.out.println("Thread "+Thread.currentThread().getName()+" is finished");
}
};
Observable observableThread = new ObservableThread(lifecycle,task);
observableThread.start();
}
}
参考资料:
1. 深入理解JAVA虚拟机
2. Java高并发编程详解(多线程和架构设计)汪文君
加载全部内容