线程池及线程调度

背景

文章通过接口层表象来实现一个简版且稳定的线程调度库,给予一个台阶,当你读完文章的末尾,希望你有一探RxJava欲望与信心。

目标

  1. TaskScheduler.executeMain(...); //主线程, 执行任务
  2. TaskScheduler.executeTask(...); //子线程, 线程池执行任务
  3. TaskScheduler.executeSingle(...); //子线程, 单线程执行任务
  4. TaskScheduler.create(...); //任务调度

项目

设计

  1. .func(...).func(...).func(...)...顺序流执行
  2. .observeOn(...)线程切换

效果图

        TaskScheduler.create(new Task<List<String>>() {
            @Override
            public List<String> run() {
                ...do something in io thread
                return new ArrayList<>();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Function<List<String>, String>() {
                    @Override
                    public String apply(@NonNull List<String> strings) throws Exception {
                        ...do something in new thread, such as time-consuming, map conversion, etc.
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, Boolean>() {
                    @Override
                    public Boolean apply(@NonNull String s) throws Exception {
                        ...do something in io thread, such as time-consuming, map conversion, etc.
                        return true;
                    }
                })
                ...
                .observeOn(Schedulers.mainThread())
                .subscribe(new Observer<Boolean>() {
                    @Override
                    public void onNext(@NonNull Boolean result) {
                        ...do something in main thread
                    }

                    @Override
                    public void onError(Throwable e) {
                        ...do something in main thread
                    }
                });

分析

  1. 线程
  2. 线程切换
  3. 任务调度

1. 线程

public class TaskManager {
    private static TaskManager ins;

    private Handler mainHandler;
    private ExecutorService cachedThreadPool;
    private ExecutorService singleThreadExecutor;

    private TaskManager() {
        mainHandler = new Handler(Looper.getMainLooper());
        cachedThreadPool = Executors.newCachedThreadPool();
        singleThreadExecutor = Executors.newSingleThreadExecutor();
    }

    static TaskManager getIns() {
        if (ins == null) {
            synchronized (TaskManager.class) {
                if (ins == null) {
                    ins = new TaskManager();
                }
            }
        }
        return ins;
    }

    /**
     * Execute sync task in main thread
     */
    void executeMain(Runnable runnable) { mainHandler.post(runnable); }

    /**
     * Execute async task in cached thread pool
     */
    void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }

    /**
     * Execute async task in single thread pool
     */
    void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }

    /**
     * Execute async task in a new thread
     */
    void executeNew(Runnable runnable) { new Thread(runnable).start(); }
}

线程切换的方法:抛runnable到相应线程,由线程来调度执行runnable,runnable中的方法即在相应线程中执行。
如无这样的显式切换线程,代码流(无论多少次方法递归调用)将在当前线程一直执行下去。同一线程,代码总是顺序的执行。

Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());

通过这行代码可以打印出当前在那一个线程。主线程的getName是main。

        new Thread(() -> {
                // Code block 1
                Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                ...
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        // Code block 2
                        Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                        ...
                    }
                });
        }).start();

这是一个通常的代码形式
Code block 1处在一个子线程中执行代码,通过new Handler(Looper.getMainLooper()).post(...)向主线程抛入一个runnable,runnable进入主线程消息队列,然后等主线程消息队列取出该runnable执行时,Code line 2处代码即在主线程中执行。
Code block 1与Code block 2在时间上并行执行。线程池同理。

public class TaskScheduler<T> {
    public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }

    public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }

    public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }

    ...
}

通过单例简单包装,实现目标1、2、3

2. 线程切换

    /**
     * Switch thread
     * scheduler 线程枚举,int类型: defaultThread、newThread、io、mainThread
     */
    public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
        if (scheduler == NEW_THREAD) {
            new Thread(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            }).start();
            return;
        } else if (scheduler == IO) {
            TaskScheduler.executeTask(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            });
            return;
        } else if (scheduler == MAIN_THREAD) {
            if (!isMainThread()) {
                TaskScheduler.executeMain(() -> {
                        if (runnable != null) {
                            runnable.run();
                        }
                });
                return;
            }
        }
        if (runnable != null) {
            runnable.run();
        }
    }

    public static boolean isMainThread() {
        return Looper.getMainLooper().getThread() == Thread.currentThread();
    }

3. 任务调度

3.1 开始前的准备

我们先来定义3个接口

interface.png

然后是2个对应的包装类,后面会用到

Task -> TaskEmitter
Function -> FunctionEmitter

public class Emitter {
    public int scheduler;
}
public class TaskEmitter<T> extends Emitter {
    public Task<T> task;

    public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
        this.task = task;
        this.scheduler = scheduler;
    }
}
public class FunctionEmitter<T, R> extends Emitter {
    public Function<? super T, ? extends R> function;

    public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
        this.function = function;
        this.scheduler = scheduler;
    }
}

3.2 Create

开始前,我们知道一些开源库如Glide,惯用.with(...)形式,这种方式实质:静态方法 + return new Instance(),
这里我们也用这种模式来开始create(...)。

实现分三步走

Step 1: Create

    public static <T> TaskScheduler<T> create(final Task<T> task) {
        TaskScheduler<T> schedulers = new TaskScheduler<T>();
        schedulers.task = task;
        return schedulers;
    }

创建TaskScheduler实例,持有 源任务task

    public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
        this.subscribeScheduler = scheduler;
        return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
    }

指定 源任务task 执行所在线程,丢弃当前TaskScheduler实例。
源任务task线程枚举 注入TaskEmitter后,返回新的实例TaskObserve,后续逻辑全由TaskObserve处理

Step 2: TaskObserve中间件

public static class TaskObserve<T> {
        private TaskEmitter taskEmitter;
        private List<FunctionEmitter> emitters;
        private int observeOnScheduler = Schedulers.defaultThread();

        TaskObserve(TaskEmitter<T> taskEmitter) {
            this.taskEmitter = taskEmitter;
            this.emitters = new ArrayList<>();
        }

        ...
}

TaskObserve: 中间件,初始和map转换时生成,包含以下成员
taskEmitter: 源任务
emitters: 转换队列,map转换时递增
observeOnScheduler: 线程枚举,observeOn观察者所在线程,可重复调用,当然只保留最后一次指定的线程

        TaskObserve(TaskObserve middle) {
            this.taskEmitter = middle.taskEmitter;
            this.observeOnScheduler = middle.observeOnScheduler;
            this.emitters = middle.emitters;
        }

        public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
            this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
            return new TaskObserve<TR>(this);
        }

map转换时,将 转换体Function 、当前 线程枚举 observeOnScheduler注入 FunctionEmitter ,添加到 转换队列
返回新的实例TaskObserve,丢弃当前TaskObserve实例,新实例线程枚举observeOnScheduler默认为默认线程

Step 3: Subscribe,才是开始!!!

核心思想

  1. 先执行 源任务 ,返回值
  2. 递归从 转换队列 取出 FunctionEmitter (含有转换体、线程枚举),Schedulers.switchThread(...)指定线程执行,转换返回值
  3. 转换队列 执行尽,提交任务,任务结束
        public void subscribe(final Observer<T> callback) {
            // 指定源任务线程枚举
            Schedulers.switchThread(taskEmitter.scheduler, () -> {
                    try {
                        // 执行源任务
                        Object t = taskEmitter.task.run();
                        // 转换队列是否为空
                        if (assertInterrupt(t)) {
                            // 转换队列空,提交本次任务,任务结束
                            submit(t, callback);
                            return;
                        }
                        // 转换队列不为空,继续转换
                        apply(t, emitters, callback);
                    } catch (Throwable e) {
                        // 任务流抛出异常,即时中断,任务结束
                        error(e, callback);
                    }
            });
        }
        private boolean assertInterrupt(Object emitter) throws Exception {
            if (emitter == null) {
                // 转换返回值,不能为Null!!!
                throw new RuntimeException("Apply output must not be null!");
            }
            return emitters.size() <= 0;
        }

assertInterrupt判断当前转换队列,是否执行尽了

Step 3 - 1: Apply转换队列转换

        private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
            // 依次从转换队列取出FunctionEmitter,然后移除
            final FunctionEmitter<E, F> f = emitters.get(0);
            emitters.remove(f);
            // 指定当前转换线程枚举
            Schedulers.switchThread(f.scheduler, () -> {
                    try {
                        // 转换,返回转换值
                        Object emitter = f.function.apply(o);
                        // 转换队列是否为空
                        if (assertInterrupt(emitter)) {
                            // 转换队列空,提交本次任务,任务结束
                            submit(emitter, callback);
                            return;
                        }
                        // 转换队列不为空,继续转换
                        apply(emitter, emitters, callback);
                    } catch (Throwable e) {
                        // 任务流抛出异常,即时中断,任务结束
                        error(e, callback);
                    }
            });
        }

Step 3 - 2: Submit提交

        private <S> void submit(final Object result, final Observer<S> callback) {
            // 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    try {
                        if (callback != null) {
                            // 成功,任务结束
                            callback.onNext((S) result);
                        }
                    } catch (Throwable e) {
                        error(e, callback);
                    }
            });
        }

        private <S> void error(final Throwable e, final Observer<S> callback) {
            // 指定当前转换线程枚举,即当前中间件线程枚举observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    if (callback != null) {
                        // 出错,任务结束
                        callback.onError(e);
                    }
            });
        }

小结:

泛型: java泛型属于类型擦除,无论T、F还是R...,最终都是Object。
设计: 这里的 任务流 实现方式为递归嵌套调用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,392评论 25 707
  • 作为一本被众人追捧,被众多知识型大咖认可的畅销书,提起想要阅读它的心,并不难, 但真正开始读下去,却也不是件容易的...
    小夭生活馆阅读 1,524评论 0 2
  • 岁末岂无雪,有意故来迟。 寒夜灯帐里,斟酌古人诗。 【2015年1月24日】 �
    d03e056874dc阅读 213评论 0 0
  • 刘 娜 焦点解决网络初级九期 驻马店 2018~05~30 坚持分享第95天 今天在天中晚报公众微信号上看...
    洋帆起航阅读 106评论 0 0