观察者模式类图:
需求:
多线程执行过程中,线程的状态,线程出现异常,我们无法都感知,需要一种机制能够在线程运行的过程中主动将状态推送给我们(观察者),从而观察线程的生命周期。利用观察者模式实现该需求。
观察者接口:
/**
* 观察者接口
*/
public interface LifeCycleListener {
/**
* 观察者接收到主题发送的事件通知后的回调函数
* @param event 主题发送的事件类型
*/
void onEvent(ObservableRunnable.RunnableEvent event);
}
RunnableEvent类是主题封装的事件,用于封装相关信息通知观察者,定义在主题的内部:
/**
* 事件包装类
*/
public static class RunnableEvent {
// 线程状态
private final RunnableState state;
// 线程
private final Thread thread;
// 异常
private final Throwable cause;
public RunnableEvent(RunnableState state, Thread thread, Throwable cause) {
this.state = state;
this.thread = thread;
this.cause = cause;
}
public RunnableState getState() {
return state;
}
public Thread getThread() {
return thread;
}
public Throwable getCause() {
return cause;
}
}
定义线程的三个状态:
/**
* 线程状态
* 包装在RunnableEvent类里通知给观察者
*/
public enum RunnableState {
RUNNING, ERROR, DONE;
}
主题的定义,因为是要获得线程的运行状态,所以选择继承Runnable接口,扩展该接口,实现主题通知的功能,使用时创建线程则实例化该抽象类,传给Thread对象:
/**
* 抽象的主题
* 对Runnable接口的封装
*/
public abstract class ObservableRunnable implements Runnable {
/**
* 持有观察者的引用
* 如果定义为集合,则有多个观察者
*/
private LifeCycleListener listener;
/**
* 通过构造函数将观察者传入
*/
public ObservableRunnable(final LifeCycleListener listener) {
this.listener = listener;
}
/**
* 核心方法:主题状态改变通知观察者
* @param event 事件
*/
public void notifyChange(RunnableEvent event) {
// 观察者调用接收到通知的回调函数
listener.onEvent(event);
}
/**
* 线程状态
* 包装在RunnableEvent类里通知给观察者
*/
public enum RunnableState {
RUNNING, ERROR, DONE;
}
/**
* 事件包装类
*/
public static class RunnableEvent {
// 线程状态
private final RunnableState state;
// 线程
private final Thread thread;
// 异常
private final Throwable cause;
public RunnableEvent(RunnableState state, Thread thread, Throwable cause) {
this.state = state;
this.thread = thread;
this.cause = cause;
}
public RunnableState getState() {
return state;
}
public Thread getThread() {
return thread;
}
public Throwable getCause() {
return cause;
}
}
}
观察者的实现:
/**
* 具体的观察者实现
*/
public class ThreadLifeCycleObserver implements LifeCycleListener {
/**
* 显示锁
*/
private final Object LOCK = new Object();
/**
* 并发查询多个id的信息,一个id开启一个线程,需要得知查询线程的运行状态
*/
public void concurrentQuery(List<String> ids) {
if (ids == null || ids.isEmpty())
return;
// 新建Thread,传入Runnable的子类ObservableRunnable,即具体的主题
// 这里可以理解为观察者持有主题的引用
ids.stream().forEach(id -> new Thread(new ObservableRunnable(this) {
@Override
public void run() {
try {
// 发送运行中通知
notifyChange(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null));
// 模拟查询过程
System.out.println("query for the id " + id);
Thread.sleep(10_000);
// 发送运行结束通知
notifyChange(new RunnableEvent(RunnableState.DONE, Thread.currentThread(), null));
} catch (Exception e) {
// 出现异常,发送异常通知
notifyChange(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e));
}
}
}).start());
}
@Override
public void onEvent(ObservableRunnable.RunnableEvent event) {
// 获取到evnet,输出event相关的信息
// 根据业务需求自定义回调逻辑
synchronized (LOCK) {
System.out.println("The runnable [" + Thread.currentThread().getName() + "] data changed and state is [" + event.getState() + "]");
if (event.getCause() != null) {
System.out.println("The runnable [" + Thread.currentThread().getName() + "] process failed");
event.getCause().printStackTrace();
}
}
}
}
客户端测试:
public class ThreadLifeCycleClient {
public static void main(String[] args) {
// new一个观察者并发查询多个id相关的信息
// 观察者能得到查询线程的状态通知
new ThreadLifeCycleObserver().concurrentQuery(Arrays.asList("1", "2"));
}
}
运行结果:
-
正常运行场景的通知
- 异常场景的通知
/**
* 并发查询多个id的信息,一个id开启一个线程,需要得知查询线程的运行状态
*/
public void concurrentQuery(List<String> ids) {
if (ids == null || ids.isEmpty())
return;
// 新建Thread,传入Runnable的子类ObservableRunnable,即具体的主题
// 这里可以理解为观察者持有主题的引用
ids.stream().forEach(id -> new Thread(new ObservableRunnable(this) {
@Override
public void run() {
try {
// 发送运行中通知
notifyChange(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null));
// 模拟查询过程
System.out.println("query for the id " + id);
Thread.sleep(10_000);
// 模拟异常场景
if (id.equals("1")) {
int res = 10 / 0;
}
// 发送运行结束通知
notifyChange(new RunnableEvent(RunnableState.DONE, Thread.currentThread(), null));
} catch (Exception e) {
// 出现异常,发送异常通知
notifyChange(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e));
}
}
}).start());
}