概述
接受异步消息的主动对象。
Java的java.lang.Thread类的实例就是一种主动对象。
主动对象可以从外部接收和处理异步消息并根据需要返回处理结果。
Active Object模式中的主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。
Active Object模式有时也称Actor模式。
示例程序
使用的类
- Main 测试程序行为的类
- MakerClientThread 发出“生成字符串”请求的线程
- DisplayClientThread 发出“显示字符串”请求的线程
- ActiveObject 定义“主动对象”的接口(API)的接口
- ActiveObjectFactory 创建“主动对象”的类
- Proxy 将方法调用转换为MethodRequest对象的类(实现了ActiveObject的接口)
- SchedulerThread 调用execute 方法处理 MethodRequest 对象的类
- ActivationQueue 按顺序保存MethodRequest对象的类
- MethodRequest 表示请求的抽象类
- MakeStringRequest makeString方法(生成字符串)对应的类。MethodRequest类的子类
- DisplayStringRequest displayString方法(显示字符串)对应的类。MethodRequest类的子类
- Result 表示执行结果的抽象类
- FutureResult 在Future 模式中表示执行结果的类
- RealResult 表示实际的执行结果的类
- Servant 执行实际处理的类(实现了ActiveObject接口)
Main 类
import activeobject.ActiveObject;
import activeobject.ActiveObjectFactory;
public class Main {
public static void main(String[] args) {
ActiveObject activeObject = ActiveObjectFactory.createActiveObject();
new MakerClientThread("Alice", activeObject).start();
new MakerClientThread("Bobby", activeObject).start();
new DisplayClientThread("Chris", acvtiveObject).start();
}
}
MakerClientThread 类
import activeobject.ActiveObject;
import activeobject.Result;
public class MakerClientThread extends Thread {
private final ActiveObject activeObject;
private final char fillchar;
public MakerClientThread(String name, ActiveObject activeObject){
super(name);
this.activeObject = activeObject;
this.fillchar = name.charAt(0);
}
public void run(){
try {
for(int i = 0; true; i++){
Result<String> result = activeObject.makeString(i, fillchar);
Thread.sleep(10);
String value = result.getResultValue();
System.out.println(Thread.currentThread().getName() + ": value = " + value);
}
} catch (InterruptedException e){}
}
}
DisplayClientThread 类
import activeobject.ActiveObject;
import activeobject.Result;
public class DisplayClientThread extends Thread {
private final ActiveObject activeObject;
public DisplayClientThread(String name, ActiveObject activeObject){
super(name);
this.activeObject = activeObject;
}
public void run() {
try {
for( int i = 0; true; i++){
String string = Thread.currentThread().getName() + " " + i;
activeObject.displayString(string);
Thread.sleep(200);
}
} catch(InterruptedException e) {}
}
}
ActiveObject 接口
package activeobject;
public interface ActiveObject {
public abstract Result<String> makeString (int count, char fillchar);
public abstract void displayString(String string);
}
ActiveObjectFactory 类
package activeobject;
public class ActiveObjectFactory {
public static ActiveObject createActiveObject() {
Servant servant = new Servant();
ActivationQueue queue = new ActivationQueue();
SchedulerThread scheduler = new SchedulerThread(queue);
Proxy proxy = new Proxy(scheduler, servant);
scheduler.start();
return proxy;
}
}
Proxy 类
package activeobject;
class Proxy implements ActiveObject {
private final SchedulerThread scheduler;
private final Servant servant;
public Proxy(SchedulerThread scheduler, Servant servant) {
this.scheduler = scheduler;
this.servant = servant;
}
public Result<String> makeString(int count, char fillchar){
FutureResult<String> future = new FutureResult<String>();
scheduler.invoke(new MakeStringRequest<servant, future, count, fillchar));
return future;
}
public void displayString(String string){
scheduler.invoke(new DisplayStringRequest(servant, string));
}
}
ScheduelrThread 类
package activeobject;
class SchedulerThread extends Thread {
private final ActivationQueue queue;
private SchedulerThread(ActivationQueue queue){
this.queue = queue;
}
public void invoke(MethodRequest request){
queue.putRequest(request);
}
public void run(){
while (true) {
MethodRequest request = queue.takeRequest();
request.execute();
}
}
}
ActivationQueue 类
package activeobject;
class ActivationQueue {
private static final int MAX_METHOD_REQUEST = 100;
private final MethodRequest[] requestQueue;
private int tail;
private int head;
private int count;
public ActivationQueue(){
this.requestQueue = new MethodRequest[MAX_METHOD_REQUEST];
this.head = 0;
this.tail = 0;
this.count = 0;
}
public synchronized void putRequest(MethodRequest request){
while(count >= requestQueue.length){
try{
wait();
} catch (InterruptedException e) {}
}
requestQueue[tail] = request;
tail = (tail + 1) % requestQueue.length;
count++;
notifyAll();
}
public synchronized MethodRequest takeRequest() {
while (count <= 0 ){
try{
wait();
} catch (InterruptedException e){}
}
MethodRequest request = requestQueue[head];
head = (head + 1) % requestQueue.length;
count--;
notifyAll();
return request;
}
}
主动对象方 MethodRequest类`
package activeobject;
abstract class MethodRequest<T> {
protected final Servant servant;
protected final FutureResult<T> future;
protected MethodRequest(Servant servant, FutureResult<T> future){
this.servant = servant;
this.future = future;
}
public abstract void execute():
}
主动对象方 MakeStringRequest 类
package activeobject;
class MakeStringRequest extends MethodRequest<String> {
private final int count;
private final char fillchar;
public MakeStringRequest(Servant servant, FutureResult<String> future, int Count, char fillchar) {
super(servant, future);
this.count = count;
this.fillchar = fillchar;
}
public void execute() {
Result<String> result = servant.makeString(Count, fillchar);
future.setResult(result);
}
}
主动对象方 DisplayStringRequest 类
package activeobject;
class DisplayStringRequest extends MethodRequest<Object> {
private final String string;
public DisplayStringRequest(Servant servant, String string){
super(servant, null);
this.string =string;
}
public void execute() {
servant.displayString(string);
}
}
主动对象方 Result 类
package activeobject;
public abstract class Result<T> {
public abstract T getResultValue();
}
主动对象方 FutureResult 类
package activeobject;
class FutureResult<T> extends Result<T> {
private Result<T> result;
private boolean ready = false;
public synchronized void setResult(Result<T> result) {
this.result = result;
this.ready = true;
notifyAll();
}
public synchronized T getResultValue() {
while ( !ready ) {
try {
wait();
} catch (InterruptedException e) {}
}
return result.getResultValue();
}
}
主动对象方 Servant 类
package activeobject;
class Servant implements ActiveObject {
public Result<String> makeString(int count , char fillchar) {
char[] buffer = new char[Count];
for (int i = 0 ; i < count; i++) {
buffer[i] = fillchar;
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
return new RealResult<String>(new String(buffer));
}
public void displayString(String string){
try {
System.out.print("displayString: " + string);
} catch(InterruptedException e) {}
}
}
ActiveObject 模式中的角色
- Client(委托者)
Client角色调用ActiveObject对象的方法来委托处理,它能够调用的只有ActiveObject 角色提供的方法。调用这些方法后,程序控制权会立即返回。 - ActiveObject(主动对象)
ActiveObject对象定义了主动对象向Client角色提供的接口(API)。 - Proxy(代理人)
Proxy角色负责将方法调用转换为MethodRequest角色的对象。转换后的MethodReques角色会传递给Scheduler角色。 - Scheduler
Scheduler角色负责将Proxy角色传递来的MethodRequest角色传递给ActivationQueue角色,以及从ActivationQueue角色取出并执行MethodRequest角色这两项工作。 - MethodRequest
MethodRequest角色是与来自Client角色的请求对应的角色。MethodRequest定义了负责执行处理的Servant角色,以及负责设置返回值的Future角色和负责执行请求的方法(execute)。 - ConcreteMethodRequest
ConcreteMethodRequest角色是使用MethodRequest角色与具体的方法相对应的角色。 - Servant(仆人)
Servant角色负责实际地处理请求。
调用Servant角色的是Scheduler角色的线程。Scheduler角色会从ActivationQueue角色取出一个MethodRequest角色并执行它。此时,Scheduler角色调用的是Servant角色的方法。
Servant角色实现了ActiveObject角色定义的接口(API)。
Proxy角色会将请求转换为MethodRequest角色,而Servant角色则会实际地执行该请求。Scheduler角色介于Proxy角色和Servant角色之间,负责管理按照什么顺序执行请求。
ActivationQueue(主动队列)
ActivationQueue 角色是保存MethodRequest角色的类。
调用putRequest方法的是Client角色的线程,而调用takeRequest方法的是Scheduler角色的线程。这里使用了Producer-Consumer模式。VirtualResult(虚拟结果)
VirtualResult角色与Future角色、RealResult角色共同构成了Future模式。
Client角色在获取处理结果时会调用VirtualResult角色的getResultValue方法。Future(期货)
Future角色时Client角色在获取处理结果时实时调用的角色。当处理结果还没出来的时候,它会使用Guarded Suspension模式让Client角色的线程等待结果出来。RealResult(真实结果)
RealResult角色是表示处理结果的角色。Servant角色会创建一个RealResult角色作为处理结果。然后调用Future角色的setRealResult方法将其设置到Future角色中。