【第6篇】Netty对Executor实现机制分析

ThreadPerTaskExecutor

  • ThreadPerTaskExecutor每一个任务的执行器(代理和命令模式)线程解耦(执行线程和创建线程)
public final class ThreadPerTaskExecutor implements Executor {
   //定义一个私有的线程工厂
    private final ThreadFactory threadFactory;
   //定义一个ThreadPerTaskExecutor构造方法
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

  //这个是一个命令模式,这个execute方法的作用是:在将来的某个时间执行给定的命令。 该命令可以在Executor实现的判断下在新线程,池化线程或调用线程中执行
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

ExecutorService

  • ExecutorService 继承了Executor接口,增加了对自身生命周期管理的方法,同时提供了一个Future给命令者去获取命令的执行结果
public interface ExecutorService extends Executor {

    /**
     * 启动一个有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 如果已经关闭,调用没有其他影响。此方法不会等待先前提交的任务完成执行。 使用awaitTermination来做到这一点。
     */
    void shutdown();

    /**
     * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。此方法不等待主动执行任务终止。 使用awaitTermination来做到这一点。除尽力尝试停止处理主动执行任务之外,没有任何保证。 例如,典型的实现将通过Thread.interrupt取消,因此任何无法响应中断的任务都可能永远不会终止。
     */
    List<Runnable> shutdownNow();

    /**
     * 如果此执行程序已关闭,则返回true。
     */
    boolean isShutdown();

    /**
     * 如果关闭后所有任务都已完成,则返回true。 请注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真。
     */
    boolean isTerminated();

    /**
     * 阻止所有任务在关闭请求之后完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交值返回任务以执行并返回表示任务的挂起结果的Future。 Future的get方法将在成功完成后返回任务的结果。
如果您想立即阻止等待任务,可以使用结构形式为result = exec.submit(aCallable).get();
注意:Executors类包含一组方法,这些方法可以将一些其他常见的类似闭包的对象(例如,java.security.PrivilegedAction)转换为Callable形式,以便可以提交它们。
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交Runnable任务以执行并返回表示该任务的Future。 Future的get方法将在成功完成后返回给定的结果。
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交Runnable任务以执行并返回表示该任务的Future。 Future的get方法将在成功完成后返回null。
     */
    Future<?> submit(Runnable task);

    /*
执行给定的任务,返回完成所有状态和结果的Futures列表。 对于返回列表的每个元素,Future.isDone都为true。 请注意,已完成的任务可能正常终止或通过抛出异常终止。 如果在此操作正在进行时修改了给定集合,则此方法的结果是不确定的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     *执行给定的任务,返回一个Futures列表,其中包含所有完成或超时到期时的状态和结果,以先发生者为准。 对于返回列表的每个元素,Future.isDone都为true。 返回时,未完成的任务将被取消。 请注意,已完成的任务可能正常终止或通过抛出异常终止。 如果在此操作正在进行时修改了给定集合,则此方法的结果是不确定的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务,返回已成功完成的任务的结果(即,不抛出异常),如果有的话。 在正常或特殊退货时,未完成的任务将被取消。 如果在此操作正在进行时修改了给定集合,则此方法的结果是不确定的。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 执行给定的任务,返回已成功完成的任务的结果(即,不抛出异常),如果在给定的超时之前已经执行了任何操作。 在正常或特殊退货时,未完成的任务将被取消。 如果在此操作正在进行时修改了给定集合,则此方法的结果是不确定的。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • invokeAny 被哪些类调用。


    invokeAny

ScheduledExecutorService

  • ScheduledExecutorService 继承了ExecutorService接口,增加了对定时任务的支持。
//创建和执行在给定延迟后启用的一次性操作。
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,随后在给定的时间段内启用; 即执行将在initialDelay之后开始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此类推。 如果任务的任何执行遇到异常,则后续执行被禁止。 否则,任务将仅通过取消或终止执行者来终止。 如果此任务的执行时间超过其周期,则后续执行可能会延迟,但不会同时执行。
public ScheduledFuture<?> scheduleAtFixedRate(@org.jetbrains.annotations.NotNull Runnable command, long initialDelay,  long period,TimeUnit unit)

EventExecutorGroup

  • EventExecutorGroup 继承了ScheduledExecutorService接口,对原来的ExecutorService的关闭接口提供了增强,提供了优雅的关闭接口。从接口名称上可以看出它是对多个EventExecutor的集合,提供了对多个EventExecutor的迭代访问接口。
EventExecutorGroup

SingleThreadEventExcutor

  • SingleThreadEventExcutor(单线程)实现了ScheduledExecutorService接口,支持执行定时任务。得有个地方存放定时任务信息。类中的实现是delayedTaskQueue,它是一个PriorityQueue,也是一个BlockingQueue。不过它里面的元素不是按照先来后到的顺序存取的,而是按照各个元素的优先级判断的SingleThreadEventExecutor类中有一个实例变量Thread,它引用的就是当前Executor所拥有的那个thread对象

SingleThreadEventExcutor类用到AtomicIntegerFieldUpdater基于反射的实用程序,可以对指定类的指定volatile int字段进行原子更新。 此类设计用于原子数据结构,其中同一节点的多个字段独立地受原子更新的影响。
请注意,此类中compareAndSet方法的保证比其他原子类弱。 因为此类无法确保该字段的所有使用都适用于原子访问的目的,所以它只能保证在compareAndSet的其他调用和同一更新程序上设置的原子性。

 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
       // 这行是判断是否在循环事件里面,点进去会跳到AbstractEventExecutor的inEventLoop方法
        boolean inEventLoop = inEventLoop();
        //如果inEventLoop为true就把任务添加一个任务队列里
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();  //启动线程
            addTask(task);//添加任务队列
            //判断是否关闭和移除任务
            if (isShutdown() && removeTask(task)) {
                reject();//调用拒绝方法
            }
        }
        //不是addTaskWakesUp 并且是wakesUpForTask
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop); //调用wakeup方法
        }
    }
  • startThread方法代码
//启动线程方法
   private void startThread() {
        if (state == ST_NOT_STARTED) {
            //这行代码是AtomicIntegerFieldUpdaterImpl#Unsafe的compareAndSwapInt方法
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

private void doStartThread() {
        assert thread == null; //断言一个线程变量,初始化一个null
        //Executor执行者
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread(); //获取当前线程
                if (interrupted) {
                    //终止线程(暴力处理)
                    thread.interrupt();
                }

                boolean success = false;
              //更新内部时间戳,该时间戳指示最近执行提交的任务的时间。 runAllTasks()和runAllTasks(long)自动更新此时间戳,因此通常不需要调用此方法。 但是,如果使用takeTask()或pollTask()手动执行任务,则必须在任务执行循环结束时调用此方法以进行准确的静默期检查。
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                     //死循环
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // 检查在循环结束时是否调用了confirmShutdown()。
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // 运行所有剩余的任务并关闭挂钩。
                        for (;;) {
                              //确认关闭
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();//清除
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();//释放
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }
                            //
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
  • 上面Thread内部run方法执行的是SingleThreadEventExecutor.this.run(),而这个run方法是一个抽象方法,留给了子类去实现了。不过可以肯定的是子类的run方法是不断的去tasksQueue中取出task去执行。现在重点分析下finally块中的代码。
    1、首先更改状态为正在关闭状态。
    2、如果子类中的run方法中的loop执行成功了,就得先调用confirmShutdown,确认任务队列中的任务是否都已经被执行了。
    3、然后还得再次确认下任务队列中是否已被执行完毕,因为在关闭的过程中外部也是能添加任务的。
    4、最终执行清理工作,更改状态为已关闭,释放信号量。
    5、如果这个时候还是有任务没执行完,那也只能是无奈了,记个log吧
    6、更新整个关闭过程为success
  • confirmShutdown代码
protected boolean confirmShutdown() {
    // 如果state状态 state < ST_SHUTTING_DOWN则直接return false
    if (!isShuttingDown()) {
        return false;
    }
    // 这个方法必须从内部调用,从修饰符 protected也可以看出
    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }
    // 取消所有的定时任务
    cancelDelayedTasks();
    
    if (gracefulShutdownStartTime == 0) {
        // 标记shutdown处理的开始时间
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }
    // 运行tasksQueue或者shutdownHooks中的所有Runnable都处理完成
    if (runAllTasks() || runShutdownHooks()) {
        //分析了下源码,isShutdown()这个只能是在外部线程调用了shutdown()接口的时候才会有可能成为true
        //但是现在这个方法已经@Deprecated,所以这个if块是不会进入的
        if (isShutdown()) {
            // shutdown 成功,没有更多的runnable需要执行
            return true;
        }

        // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
        wakeup(true);
        return false;
    }

    final long nanoTime = ScheduledFutureTask.nanoTime();
    // runAllTasks() 或者runAllTasks() + runShutdownHooks()方法执行时间操作了最大限制
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }
    // 现在时间与上个任务执行完成的时间差小于quietPeriod时间,继续检测
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // Check if any tasks were added to the queue every 100ms.
        // TODO: Change the behavior of takeTask() so that it returns on timeout.
        wakeup(true);
        try {
            //内部线程sleep 100ms
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore
        }

        return false;
    }

    // No tasks were added for last quiet period - hopefully safe to shut down.
    // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
    return true;
}

这个WAKEUP_TASK什么也不做,为啥取名wakeup呢

private final Semaphore threadLock = new Semaphore(0);  

//threadLock的内部permits设置为0,也就是说acquire()永远获取不到permit,会一直被阻塞着。
//那有什么用呢?另一种实现wait()/notify()。
SingleThreadEventExcutor

为什么需要AtomicInteger原子操作类

  • AtomicInteger 原子性类,对于Java中的运算操作,例如:自增或自减,若没有进行额外的同步操作,在多线程环境下就是线程不安全的。num++解析为num=num+1,明显,这个操作不具备原子性,多线程并发共享这个变量时必然会出现问题
  • num ++ 的原子性问题,num++的操作实际上分三个步骤"读-改-写"

int num = 10;
num =num++;//10

  • 临时变量读-改-写

int temp = num;
num = num +1;
num = temp;

  • CAS
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342