具有优先级的线程池

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/russle/article/details/83218137

问题由来:
多线程接收kafka的消息,有时消息几乎同时达到,先简单处理后提交给线程池再次处理,结果出现当先到达的消息msgA和后到达的消息msgB到达时间相差很小时,例如10毫秒,几乎同时提交到线程池,因为提交的线程池的时间相差更小有时几乎是完全相同的时间,导致偶然消息处理乱序,消息msgB被先处理了。

解决思路:
所有提交给线程池的Runnable带有优先级,虽然几乎同时提交到线程池,但是线程执行任务时从自己的任务队列中找当前优先级最高的先处理。基于上面的情况,可以根据消息的到达时间进行优先级比较,先到达的优先级高。

具体实现
分为两个类,第一个可比较优先级的Runnable,第二个线程池的创建时使用PriorityBlockingQueue而不是LinkedBlockingQueue。

Runnable代码

package com.yq;

import lombok.extern.slf4j.Slf4j;
import java.util.Random;

@Slf4j
public class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
    private long rts;
    private String name;
    PrioritizedRunnable(long rts, String name) {
        this.rts = rts;
        this.name = name;
    }
    public long getRts() {
        return rts;
    }
    public String getName() {
        return name;
    }
    @Override
    public int compareTo(PrioritizedRunnable secondOne) {
        // 时间越小越优先
       log.info("compareTo. this.name={}, secondOne.name={}", this.getName(), secondOne.getName());
        if (this.getRts() < secondOne.getRts()) {
            return -1;
        }else if(this.getRts()> secondOne.getRts()){
            return 1;
        } else {
            return 0;
        }
    }
    @Override
    public void run() {
        Random random = new Random();
        log.info("rts={}, name={}", rts, name);
        try {
            int sleepRandom = random.nextInt(200);
            Thread.sleep(sleepRandom);
        } catch (Exception ex) {
            log.info("sleep exception", ex);
        }
    }
}

具体调用的示例代码

package com.yq;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class PriorityDemo {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                Long.MAX_VALUE, /* timeout */
                TimeUnit.NANOSECONDS,
                new PriorityBlockingQueue<Runnable>(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        PrioritizedRunnable p1 = new PrioritizedRunnable(1234, "name1-1");
        PrioritizedRunnable p2 = new PrioritizedRunnable(1500, "name4-2");
        PrioritizedRunnable p3 = new PrioritizedRunnable(1590, "name5-3");
        PrioritizedRunnable p4 = new PrioritizedRunnable(1490, "name3-4");
        PrioritizedRunnable p5 = new PrioritizedRunnable(1290, "name2-5");

        executor.execute(p4);
        executor.execute(p1);
        executor.execute(p2);
        executor.execute(p3);
        executor.execute(p5);
        log.info("submit 5 Runnable");
        Thread.sleep(30*1000);
        ruleExecutor.shutdown();
        log.info("done!");
    }
}

备注:提交到线程池使用execute方法,不要使用submit要不然回报

java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357) ~[na:1.8.0_161]
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489) ~[na:1.8.0_161]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) ~[na:1.8.0_161]
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_161]

运行结果
可以看到当线程池执行任务,虽然p4先提交,但是因为p1优先级更高,所以先执行p1 (23:22:33.283 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1234, name=name1-1), 然后是p4.

23:22:33.283 [main] INFO com.yq.PrioritizedRunnable - compareTo. this.name=name5-3, o.name=name4-2
23:22:33.283 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1234, name=name1-1
23:22:33.283 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable - rts=1490, name=name3-4
23:22:33.288 [main] INFO com.yq.PrioritizedRunnable - compareTo. this.name=name2-5, o.name=name4-2
23:22:33.288 [main] INFO com.yq.PriorityDemo - submit 5 Runnable
23:22:33.440 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable - compareTo. this.name=name4-2, o.name=name5-3
23:22:33.440 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable - rts=1290, name=name2-5
23:22:33.451 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1500, name=name4-2
23:22:33.497 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1590, name=name5-3
23:23:03.289 [main] INFO com.yq.PriorityDemo - done!

Process finished with exit code 0

作者:russle
来源:CSDN
原文:https://blog.csdn.net/russle/article/details/83218137
版权声明:本文为博主原创文章,转载请附上博文链接!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容