首先说明下为什么要写这篇文章,最近在查一个前负责人留下的一个Hbase中的数据问题,是为了记录前端的行为记录,做法是在服务端开了一个接口来接收前端的请求,这个接口的请求实作是在服务端开了一个线程池来处理写入Hbase的动作,但经常会发现有些数据会丢失,也没有日志
线程池定义代码如下:
Executor executor = new ThreadPoolExecutor(32, 128, 0, TimeUnit.DAYS, new ArrayBlockingQueue(1024));
上面是线程池的构造方式,首先来了解下这个构造函数中的参数的定义
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
参数定义
- corePoolSize 在线程池中一直存在的线程数,即使是空闲的
- maximumPoolSize 线程池的最大线程数
- keepAliveTime 当线程池中的线程数大于
corePoolSize
,空闲的线程在被销毁前会在线程池中保留的最大时间 - unit keepAliveTime 的时间单位
- workQueue 当线程池中的线程数大于maximumPoolSize ,会把后续的线程放入队列中
- threadFactory 线程工厂
- RejectedExecutionHandler 线程队列的拒绝策略
- AbortPolicy(默认策略) 如果线程队列满了,则会丢弃当前任务,并抛出异常RejectedExecutionException
- DiscardPolicy 如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
- DiscardOldestPolicy 如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
- CallerRunsPolicy 如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
再结合前言和参数定义中可以知道,创建的线程池中,传的BlockingQueue是ArrayBlockingQueue
BlockingQueue
BlockingQueue是阻塞队列接口,我们介绍三个实现
- ArrayBlockingQueue(有界队列)
- LinkedBlockingDeque(可选边界)
- SynchronousQueue
SynchronousQueue 的描述
有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;
有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;
ArrayBlockingQueue 是一个有界队列,超过这个队列的容量,则后续的数据就不允许再添加,那这个对于我们开始说的那个问题,就会有影响了,在线程池中的线程数量超过128(max)+1024(workQueue)的大小后,就会执行RejectedExecutionHandler 的内容了
因为当前这个问题是一个会有大并发量进来记录行为日志,肯定会超过这个范围的,而且终端用户也是一个大量级的,所以对于会丢失数据,也就不足为奇了
那这个问题目前有两个想法:
- 调整BlockingQueue 为LinkedBlockingDeque
- 对于进来的数据,用消息队列进行处理(redis,mq)都可以作为参考
后续文章会继续这方面的学习