本文作为,其他文章链接使用,单独参考意义不大。
Master-Worker
框架如下,首先实现的Master
线程,主要用作分配任务,和返回结果集。
/**
* Created by Joker on 2015/3/9.
*/
public class Master {
//任务队列
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
//线程队列
protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
//子任务处理结果集
protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
public Master(Worker worker, int workerCount) {
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
if (workerCount > 5)
workerCount = 5;
/*根据workerCount,创建指定数量的工作线程Worker*/
for (int i = 0; i < workerCount; i++) {
threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
}
}
/**
* 检查子任务是否全部执行完毕
*/
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
/**
* 提交任务
*/
public void submit(Object object) {
workQueue.add(object);
}
/**
* 获取子任务结果集
*
* @return 结果集
*/
public Map<String, Object> getResultMap() {
return resultMap;
}
/**
* 执行所有工作线程worker,处理任务。
*/
public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
Worker
作为子线程,实现在Handle
方法中实现具体业务逻辑,并加处理结果放到结果集中。
/**
* Created by Joker on 2015/3/9.
*/
public class Worker implements Runnable {
//子任务队列,用于过去子任务
protected Queue<Object> workQueue;
//结果集
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
/**
* 执行具体业务逻辑
* 模拟立方和计算
*
* @param object
* @return
*/
public Object handle(Object object) {
Integer i = (Integer) object;
return i * i * i;
}
@Override
public void run() {
while (true) {
//获取任务
Object work = workQueue.poll();
if (work == null)
break;
resultMap.put(Integer.toString(work.hashCode()), this.handle(work));
}
}
}
需要调用的地方,进行立方和计算。
/**
* 计算立方和1~100的立方和
*/
public void executeCubic() {
//创建三个Worker线程,执行任务
Master master = new Master(new Worker(), 3);
for (int i = 0; i < 100; i++) {
master.submit(i);
}
//执行任务
master.execute();
//计算结果,初始化
int result = 0;
//计算结果集
Map<String, Object> resultMap = master.getResultMap();
while (resultMap.size() > 0 || !master.isComplete()) {
String key = null;
Integer i = null;
for (String k : resultMap.keySet()) {
key = k;
break;
}
if (key != null) {
i = (Integer) resultMap.get(key);
//从结果集中移除倍计算过的key
resultMap.remove(key);
}
if (i != null) {
result += i;
}
}
}
提交100个任务后由3个Worker
线程进行计算,Master
并不等待所有Worker
线程执行完毕,就开始访问子结果集,进行相加计算,直到子结果集中的所有数据处理完毕,并且3个活跃Worker
线程全部停止,才给出最终的立方总和。result
最终结果便是1~100立方和。