如何处理高并发任务

背景

  1. 在用户量比较高的情况下,会有很多请求过来,此时线程池处理能力已经无法满足需求,如何解决?
  • 方案:可以将当前的计算线程先保存起来,放入高并发消息队列中,等线程池中的任务较少时,再从队列拉数据去执行任务。
  1. 如果微服务框架下有服务被熔断或者降级,其他任务不能查询该服务的信息,又不能直接丢弃,如何解决?
  • 方案:将当前任务暂时存放在消息队列中,等待服务恢复之后,在从消息队列中拉取任务执行。

执行流程

  1. 当批量用户请求过来时,先把用户请求放入Redis Map中或者ConcurrentHashMap中,对用户请求去重,保证交易号幂等性;
  2. 这里创建一个线程池处理用户请求;
  • 如果当前线程池请求数目<核心线程池数目,直接让核心线程池去执行任务
  • 如果大于核心线程池数目,那么加入到线程队列中
  • 如果队列已满,那么创建新的线程去处理,如果新的线程数目超过了最大线程数目,任务将被拒绝
  • 拒绝策略是将任务提交到kafka消息队列中进行存储
  1. 等到线程池中的任务较少或者夜间用户访问较少的时候,从消息队列中拉取请求重新进行处理
  2. 如果处理失败将任务重新加入到消息队列中,等待一定的时机进行重试。


    项目框架

代码实现

  • 模拟线程处理用户请求
@Override
   public void run() {
       //业务操作
       System.out.println("多线程已经处理订单插入系统,订单号:"+ businessNo);
   }
  • 采用线程池处理以上请求
   @Autowired
    private WorkProducerService producerService;

    @Autowired
    private RedisUtils redisUtils;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }

    /**
     * 订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列
     */
    Queue<Object> msgQueue = new LinkedBlockingQueue<>();

    /**
     * 当线程池的容量满了,执行下面代码,将订单存入到缓冲队列
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (IS_QUEUE) {
                //订单加入到缓冲队列
                msgQueue.offer(((ThreadPoolService) r).getBusinessNo());
            } else {
                //增加并发量,订单加入到kafka消息队列
                producerService.sendMessage(((ThreadPoolService) r).getBusinessNo());
            }
            log.info("系统任务已满,把此订单交给(调度线程池)逐一处理,订单号:" + ((ThreadPoolService) r).getBusinessNo());
        }
    };


    /**
     * 创建线程池
     */
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

    /**
     * 将任务加入订单线程池
     */
    public void addOrders(String orderId) {
        log.info("此订单准备添加到线程池,订单号:" + orderId);
        //验证当前进入的订单是否已经存在
        if (redisUtils.getSetMembers(orderId) == null) {
            redisUtils.addSetMembers(orderId, new Object());
            ThreadPoolService businessThread = new ThreadPoolService(orderId);
            threadPool.execute(businessThread);
        }
    }

    /**
     * 获取目前的活跃线程数量
     * @return
     */
    public int getActiveCount(){
        return threadPool.getActiveCount();
    }

    /**
     * 线程池的定时任务----> 称为(调度线程池)。此线程池支持定时以及周期性执行任务的需求。
     */
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);


    /**
     * 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有订单记录,则重新加入到线程池
     */
    final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //判断缓冲队列是否存在记录
            if (!msgQueue.isEmpty()) {
                //当线程池的队列容量少于workQueueSize,则开始把缓冲队列的订单加入到线程池
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    String orderId = (String) msgQueue.poll();
                    ThreadPoolService businessThread = new ThreadPoolService(orderId);
                    threadPool.execute(businessThread);
                    log.info("(调度线程池)缓冲队列出现订单业务,重新添加到线程池,订单号:" + orderId);
                }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);

将处理不了的请求,发送给消息队列暂存

public boolean sendMessage(String msg) {
        try {
            String result = kafkaTemplate.send("business", msg).get().toString();
            if (result != null) {
                return true;
            }
        } catch (Exception e) {
            return false;
        }
        return false;
    }

等到线程池中的请求能够处理当前任务时,消费消息并对请求进行处理(可以设置定时任务进行拉取)

@KafkaListener(id = "test-consumer-group", topics = "business",containerFactory = "ackContainerFactory")
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
        log.info("Receive Business Number :------------"+record.value());
        if(threadPoolManager.getActiveCount()< MAX_POOL_SIZE){
            threadPoolManager.addOrders(record.value().toString());
            ack.acknowledge();
        }else{
            //未被消费的消息重新发到队列
            workProducerService.sendMessage(record.value().toString());
        }
    }

源码参考

并发任务调度

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,525评论 0 5
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,084评论 1 32
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,787评论 1 19
  • 老布是一名进入监狱五十年的老犯人,他身材矮小、步履蹒跚、眼窝深陷、面目慈祥,活脱一个邻家老大爷的良民形象。 整部影...
    御风者阅读 7,209评论 0 2
  • 声明 本文章是基于 glide 3.6.1的 个各类的功能介绍 1.Glide:向外暴露单例静态接口,构建Requ...
    河里的枇杷树阅读 1,170评论 0 3