异步任务执行的设计模式

参考:java的设计模式

异步执行方法回调的设计模式:异步方法调用是在等待任务结果时不阻塞调用线程的模式。该模式提供了多个独立的任务并行处理和取得任务结果或者等待所有任务结束。

  • 总览图如下


    image.png
  • 下面为代码示例,首先是执行器接口
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncExecutor.java
 * @Description: 执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback
 * 3:返回值result。它也是整个模式的核心部分
 * @version: v1.0.0
 */
public interface AsyncExecutor {

//   开始执行任务,未持有callback则说明客户端不需要对返回结果做额外判断。返回异步结果
      <T> AsyncResult<T> startProcess(Callable<T> task);

//       开始执行任务,持有callback则说明客户端自定义实现额外判断。返回异步结果
      <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);

//    结束异步任务,如果必要时阻塞当前的线程并返回结果结束任务
      <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
}
  • 异步执行返回结果接口
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncResult.java
 * @Description: executor执行器执行的返回结果。它应该提供执行状态、任务返回值、结果挂起
 * @version: v1.0.0
 */
public interface AsyncResult<T> {

//  线程任务是否完成
    boolean isCompleted();
//  获取任务的返回值
    T getValue() throws ExecutionException;

//    阻塞当前线程,直到异步任务完成,如果执行中断,抛出异常
    void await() throws InterruptedException;
}
  • 保存执行器executor执行结果(task任务状态,返回值),客户端可以进行自定义处理
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsynCallback.java
 * @Description: 保存执行器executor执行结果(task任务状态,返回值),可以由客户端进行自定义处理
 * @version: v1.0.0
 */
public interface AsynCallback<T> {

    //客户端实现,对executor执行结果后做自定义处理
    void onComplete(T val,Optional<Exception> ex);
}
  • 执行器的具体实现
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: ThreadAsyncExecutor.java
 * @Description: 
 * @version: v1.0.0
 */
public class ThreadAsyncExecutor implements AsyncExecutor {

//      为区别线程,为每个线程命名
      private final AtomicInteger idx = new AtomicInteger(0);

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task) {
        return startProcess(task, null);
      }

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
//      CompletableResult作为executor的返回结果,它会对callback传递参数让callback自行处理
        CompletableResult<T> result = new CompletableResult<>(callback);
//      启动一个线程去处理任务线程,并将任务线程的返回结果设置到result中
        new Thread(() -> {
          try {
            result.setValue(task.call());
          } catch (Exception ex) {
            result.setException(ex);
          }
        } , "executor-" + idx.incrementAndGet()).start();
        return result;
      }
//      结束任务,如果当前任务没有完成则让出cpu让其他任务使用。如果执行结束返回结果
      @Override
      public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
        if (!asyncResult.isCompleted()) {
          asyncResult.await();
        }
        return asyncResult.getValue();
      }

      /**
       * Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
       * exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
       *
       * @see java.util.concurrent.FutureTask
       * @see java.util.concurrent.CompletableFuture
       */
//   执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback,3:返回值result
//    异步执行的结果封装,持有callback对象(该对象可由客户端重写),这里是将执行的结果保存到callback中的value|exception
      private static class CompletableResult<T> implements AsyncResult<T> {
//      几种执行的状态
        static final int RUNNING = 1;
        static final int FAILED = 2;
        static final int COMPLETED = 3;
//      对象锁
        final Object lock;
//      Optional封装callback
        final Optional<AsyncCallback<T>> callback;
//      初始状态
        volatile int state = RUNNING;
//             执行结果
        T value;
//             执行异常情况
        Exception exception;

        CompletableResult(AsyncCallback<T> callback) {
          this.lock = new Object();
          this.callback = Optional.ofNullable(callback);
        }

        /**
         * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 封装任务的返回结果
         * @param value
         *          value of the evaluated task
         */
        void setValue(T value) {
          this.value = value;
          this.state = COMPLETED;
          this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
          synchronized (lock) {
            lock.notifyAll();
          }
        }

        /**
         * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 设置异常
         * @param exception
         *          exception of the failed task
         */
        void setException(Exception exception) {
          this.exception = exception;
          this.state = FAILED;
          this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
          synchronized (lock) {
            lock.notifyAll();
          }
        }
//      是否运行状态
        @Override
        public boolean isCompleted() {
          return state > RUNNING;
        }
//      取得任务结果
        @Override
        public T getValue() throws ExecutionException {
          if (state == COMPLETED) {
            return value;
          } else if (state == FAILED) {
            throw new ExecutionException(exception);
          } else {
            throw new IllegalStateException("Execution not completed yet");
          }
        }
//      未完成时不参与竞争
        @Override
        public void await() throws InterruptedException {
          synchronized (lock) {
            while (!isCompleted()) {
              lock.wait();
            }
          }
        }
      }
    }
  • 测试部分
public class App {
      public static void main(String[] args) throws Exception {
        // 新建一个executor执行器
        AsyncExecutor executor = new ThreadAsyncExecutor();

        // 开始执行一些任务
        AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
        AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
        AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
        AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
        AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));

        // emulate processing in the current thread while async tasks are running in their own threads
        Thread.sleep(350); // Oh boy I'm working hard here
        log("Some hard work done");

        // wait for completion of the tasks
        Integer result1 = executor.endProcess(asyncResult1);
        String result2 = executor.endProcess(asyncResult2);
        Long result3 = executor.endProcess(asyncResult3);
//      下面的执行结果挂起
        asyncResult4.await();
        asyncResult5.await();

        // 打印线程结果
        log("Result 1: " + result1);
        log("Result 2: " + result2);
        log("Result 3: " + result3);
      }

      /**
       * Creates a callable that lazily evaluates to given value with artificial delay.
       * 创建一个任务
       * @param value
       *          value to evaluate
       * @param delayMillis
       *          artificial delay in milliseconds
       * @return new callable for lazy evaluation
       */
      private static <T> Callable<T> lazyval(T value, long delayMillis) {
        return () -> {
          Thread.sleep(delayMillis);
          log("Task completed with: " + value);
          return value;
        };
      }

      /**
       * 客户端自定义callback
       */
      private static <T> AsyncCallback<T> callback(String name) {
//        返回一个callback重写 void onComplete(T value, Optional<Exception> ex)的实现类对象
        return (value, ex) -> {
          if (ex.isPresent()) {
            log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
          } else {
            log(name + ": " + value);
          }
        };
      }
//      日志方法
      private static void log(String msg) {
        System.out.println(msg);
      }
    }
Task completed with: test
Some hard work done
Task completed with: 20
Callback result 4: 20
Task completed with: 10
Task completed with: callback
Callback result 5: callback
Task completed with: 50
Result 1: 10
Result 2: test
Result 3: 50
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容