利用LockSupport实现简单Future

前言

上篇文章已经讲到了LockSupport提供的功能,以及如何使用LockSupport实现锁的语义,本文将介绍Future的语义以及如何利用LockSupport实现Future。

Future语义

大概应该是从Java1.5开始引入了Future接口,其他很多框架或者工具包都或多或少的引入了Future或者Promise(其他很多语言也引入了Future或者Promise的类似语义)。Future本身体现的可能是一种异步的思想。大体可以描述成“在将来的某个时候可以获取某个结果,可以获取任务的完成状态”。Future的接口定义如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}

接口的语义描述如下:

  • boolean cancel(boolean mayInterruptIfRunning); 取消该任务,mayInterruptIfRunning表示的是如何处理该任务的线程还在执行,会尝试interrupt该线程(Java里的thread.interrupt只是会将interrupted状态置位)
  • boolean isCancelled(); 任务是否已经被取消
  • V get() throws InterruptedException, ExecutionException; 该方法是个阻塞方法,线程阻塞到任务完成或者取消(可以多个线程进行阻塞等待该任务完成),如果等待的线程被Interrupt了会抛出InterruptedException异常,如果task执行异常,会抛出ExecutionException
  • V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException; 该方法是个阻塞方法,线程阻塞一定时间等待任务完成,如果任务没完成将抛出TimeoutException, 如果等待的线程被Interrupt了会抛出InterruptedException异常,如果task执行异常,会抛出ExecutionException

如何实现Future

类似于锁一样,由于Future语义中包含了多个线程同时可以调用get()方法进行阻塞等待,所以Future中需要维护一个等待线程的队列。单纯针对于Future接口来说,其中并没有指定set方法,所以Future实现中需要提供set()方法来描述任务的处理完成(可能是由于出现异常提前完成),而且Future中还需要维护是否被取消,是否完成等状态。

对于java1.5以后来说,JUC中已经提供了FutureTask的实现,如FutureTask名字描述的一样,FutureTask是一个拥有Future特性的task,该task是一个可执行的Runnable对象,所以FutureTask天然知道自己什么时候被处理完成(Runnable执行完成或者出现异常),所以FutureTask不需要透出任何set形式的方法,而且FutureTask天然了解自己是被哪个线程所执行,所以当执行cancel(boolean mayInterruptIfRunning)方法时可以中断这个执行线程。

下文中,基于LockSupport实现了一个简单的Future,该Future不是一个Runnable,只是一个helper,可以用来给某些class加上Future语义。例如在RPC实现中通常会有
ResponseFuture request(Request request) throws RpcException这种方法,这其中的ResponseFuture就可以借助以下的Future实现Future语义。

public class FutureUsingLockSupport<V> implements Future<V> {

    private final AtomicInteger state = new AtomicInteger(NEW);

    public static final int NEW = 0;
    public static final int COMPLETED = 1;
    public static final int CANCELLED = 3;
    public static final int EXCEPTIONAL = 2;

    private final ConcurrentLinkedQueue<Thread> waitThreads = new ConcurrentLinkedQueue<Thread>();

    private volatile Object result;//may be Throwable

    //取消的语义,没相应中断
    public boolean cancel(boolean mayInterruptIfRunning) {
        int s = state.get();
        if (!(s == NEW && state.compareAndSet(NEW, CANCELLED))) {
            return false;
        } else {
            //这里本该需要interrupt运行的task,但是此场景下没法获知Running的Thread
            finish();
        }
        return true;
    }

    //是否被取消
    public boolean isCancelled() {
        return (state.get() >= CANCELLED);
    }

    //是否完成
    public boolean isDone() {
        return (state.get() > NEW);
    }

    //可能阻塞,处理InterruptedException
    public V get() throws InterruptedException, ExecutionException {
        int s = state.get();
        if (s < COMPLETED) {
            s = awaitDone(false, 0L);
        }
        return report(s);
    }

    //可能阻塞及超时,处理InterruptedException
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        int s = state.get();
        if (s < COMPLETED && (s = awaitDone(true, unit.toNanos(timeout))) < COMPLETED  ) {
            throw new TimeoutException();
        }
        return report(s);
    }

    public void set(V value) {
        if (state.compareAndSet(NEW, COMPLETED)) {
            //
            result = value;

            finish();
        }
    }

    private void finish() {
        //记性unpark操作
        if (!waitThreads.isEmpty()) {
            for (Thread thread : waitThreads) {
                LockSupport.unpark(thread);
            }
        }
    }

    public void setException(Throwable throwable) {
        if (state.compareAndSet(NEW, EXCEPTIONAL)) {
            result = throwable;

            finish();
        }
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        if (s == COMPLETED) {
            return (V) result;
        }

        if (s >= CANCELLED) {
            throw new CancellationException();
        }

        throw new ExecutionException((Throwable) result);
    }

    //等待一定时间
    private int awaitDone(boolean timeout, long nanos) throws InterruptedException {
        Thread currentThread = Thread.currentThread();

        long deadLine = timeout ? nanos + System.nanoTime() : 0L;

        boolean added = waitThreads.contains(currentThread);
        while (true) {
            //首先检查是否interrupt()
            if (Thread.interrupted()) {
                waitThreads.remove(currentThread);
                throw new InterruptedException();
            }

            if (state.get() >= COMPLETED) {
                //已经完成
                return state.get();
            } else if (!added) {
                added = waitThreads.add(currentThread);
            } else if (timeout) {
                long left = deadLine - System.nanoTime();
                if (left <= 0L) {
                    //超时
                    waitThreads.remove(currentThread);
                    return state.get();
                }
                LockSupport.parkNanos(left);
            } else {
                LockSupport.park();
            }
        }
    }
}

总结

本文利用LockSupport实现了一个简单的Future,可以作为Helper类帮助其他类完成Future语义。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容