Java 7 Fork/Join 框架使用

最近有个业务是批量导出cognos报表, 由于未开发此功能, 人工导出需要大量的时间消耗, 奔着珍惜时间的使命写了一个导出工具类, 至此在导出的过程中用到了并发请求数, 比如:每秒并发10次,20次等. 工作中使用的Java8并发语法, 在此之前先介绍一下Java7 Fork/Join的框架使用方式.

之前使用的此框架是一个查询SQL时, 当时一个SQL有28个子语句通过left join 拼接而成, 查询速度为20s,经常卡死, 最后写成并发,28个子语句,分成3批次,每次10个SQL,并行查询,最后通过Java算法拼接成List,从20S变为1.4S左右,性能大大提升.

下面就开始今天的内容:

简介

从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。
这种思想和MapReduce很像(input --> split --> map --> reduce --> output)

主要有两步:
第一、任务切分;
第二、结果合并

刚刚我介绍的SQL其实就是这样的原理.

API 介绍

ForkJoinPool 池子

ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

ForkJoinTask 任务

ForkJoinTask代表运行在ForkJoinPool中的任务。

主要方法:

fork() 在当前线程运行的线程池中安排一个异步执行。简单的理解就是再创建一个子任务。
join() 当任务完成的时候返回计算结果。
invoke() 开始执行任务,如果必要,等待计算完成。
子类:

RecursiveAction 一个递归无结果的ForkJoinTask(没有返回值)
RecursiveTask 一个递归有结果的ForkJoinTask(有返回值)

例子

private static final ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(100),
        new ThreadFactoryBuilder().setNameFormat("Reports-%d").setDaemon(true).build(),
        new ThreadPoolExecutor.AbortPolicy());

这里先创建了一个多线程任务,意思为:
这里核心线程数5
最大线程数5
blockingQueue 最大size 100, 解释: workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择: ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

reject策略 java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy,意思是由调用线程处理该任务

另外的策略

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

方法调用,模拟一下并发执行SQL拼接表的过程

public Page<ReportVo> getRepairTaskReport() {
        List<String>ids = Lists.newArrayList("1","2","3","4","5","6","7","8","9");
        //多线程查询列数据
        Set<String> setIds = new HashSet<>();
        for (String id : ids) {
            setIds.add(id);
        }

        //整理要查询的列,以后可以做成由前端指定查询哪些column
        List<T> columns = new ArrayList<>();
        columns.add("ID1相关");
        columns.add("ID2相关");
        ...
        columns.add("ID9相关");


        Map<T, Future<List<Object[]>>> futureMap = new HashMap<>();
        Map<T, Callable<List<Object[]>>> columnCallableMap = getColumnCallableMap(appId, staff, columns, params, queryConditionVo, departIdSet);
        for (Map.Entry<T, Callable<List<Object[]>>> entry : columnCallableMap.entrySet()) {
            futureMap.put(entry.getKey(), executorService.submit(entry.getValue()));
        }

        //合并报表
        for (RepairTaskReportColumn column : columns) {
            try {
                //列数据
                List<Object[]> columnDataList = futureMap.get(column).get();
                for (Object[] objects : columnDataList) {
                    String departId = objects[0].toString();
                    String columnData = objects[1].toString();
                    //匹配行数据
                    for (String reportVo : ids) {
                        if (departId.equals(reportVo.getDepartId())) {
                            switch (column) {
                                case "ID1相关":
                                    // 并行返回ID1相关数据
                                    break;
                                case "ID2相关":
                                      // 并行返回ID2相关数据
                                    break;
                                ...
                                 //    
                            }
                        }
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
      
            }
        }
        return "最终结果";
    }


    /**
     * 根据请求的column数据,生成Callable
     *
     * @param columns
     * @param queryConditionVo
     * @return
     */
    private Map<T, Callable<List<Object[]>>> getColumnCallableMap(String ID, List<T> columns, Set<String> departIdSet) {
        Map<T, Callable<List<Object[]>>> columnMap = new HashMap<>();
        for (T column : columns) {
            switch (column) {
                case "ID1相关SQL查询":
                    columnMap.put("ID1相关", getSQL(ID,T,departIdSet));
                    break;
                case  "ID2相关SQL查询":
                    columnMap.put("ID2相关", getSQL(ID,T,departIdSet));
                    break;
                ....
                default:
                    break;

            }
        }

        return columnMap;
    }

    
        private Callable<List<Object[]>> getSQL(final String ID,  final T column, final Set<String> departIdSet) {
            return new Callable<List<Object[]>>() {
                @Override
                public List<Object[]> call() throws Exception {
                    //todo sql query
                    //返回格式:object[0]为departId,object[1]为需要的数据
                    String sql = "select * from table xxxxx";
                    List<Object[]> result = dao.getResult(sql);
                    return result;
                }
            };
        }

以上代码为伪代码,实现的逻辑其实很简单.
大致逻辑如下
我有一条SQL,为N个left join 拼接而成,那么我现在就是吧N个left 拆分成N个小SQL,并发执行,那么执行时间缩短为N倍, 然后通过N个SQL查询出的结果,通过相同的属性 再次拼接成业务正确的数据

就是这样的一个图:


image.png
image.png

哈哈, 大致就是这样,通过并发执行任务,人工点击的8八小时缩短为10分钟! 是不是很秀呢

下次讲解JDK8中并发执行的例子,更为简洁

欢迎小伙伴们留言哦

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容