事情经过
由于公司需要进行公众号迁移,需要对线上的openId进行清洗,由于数据量巨大,并且依赖了微信的外部接口,所以决定用多线程进行处理。
代码如下:
val exec = {
new ThreadPoolExecutor(20, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable](100),
new ThreadPoolExecutor.CallerRunsPolicy()
)
}
MemberDataSource.mysqlData.withConnection(conn => {
val stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(Integer.MIN_VALUE)
//and id>2107380
val rs = stmt.executeQuery("select * from test where status=0 ")
val arrayBuffer = new ArrayBuffer[String](100)
while (rs.next()) {
val row = ResultSetMapper.material[MemberChannelList].from(rs)
arrayBuffer += row.openId
if (arrayBuffer.length == 100) {
exec.execute(()=> logger.info(s"${arrayBuffer.toList}"))
arrayBuffer.clear
}
}
})
主要思路为:模仿去年切非玛时的方法,使用数据库游标将数据查出,封装一个长度为100的数组,然后使用多线程对微信进行请求获取结果。
但是在测试的时候发现,每个线程请求的数据发生了错乱,每个线程请求的数据有重复,这样造成了数据的重复操作,于是立刻停止。
例如:线程11和线程12请求的数据有相同的,于是怀疑是多线程导致的线程错乱的问题。
于是将问题原因锁定在arrayBuffer上,认为它不是线程安全的,所以导致了线程请求的数据出现重复,于是决定改成ConcurrentLinkList,但是依然请求的结果如此,折腾了一晚上,不管怎么修改,依然线程之间有重复数据,跟孙政两人修改代码到天明,依然没有找到问题,两个人都准备放弃了,决定就单线程的跑完算了,慢点就慢点,在早上准备放弃的时候,抱着最后一丝希望,请教远在深圳的祥哥,祥哥快速定位问题,并且写出了正确的代码
if(arrayBuffer.length == 99) {
val asList = arrayBuffer.toList
exec.execute ( openIdInsertMethod(asList) )
arrayBuffer.clear
}
一开始拿到代码,因为已经一晚上没睡,还没明白到底跟自己的有什么不同,只是盲目的粘贴上去,但是神奇的发现问题解决了,请求的参数里面再没出现之前的线程错乱问题
当时感觉特别不可思议,立刻比对之前的代码
发现代码唯一的不同在于
修改前:
exec.execute ( openIdInsertMethod(arrayBuffer.toList) )
修改后:
val asList = arrayBuffer.toList
exec.execute ( openIdInsertMethod(asList) )
一个是在线程池里面toList,一个是在外面定义一个变量去toList
终于明白原因:
在一个线程中开启另外一个新线程,则新开线程称为该线程的子线程,子线程初始优先级与父线程相同。不过主线程先启动占用了cpu资源,因此主线程总是优于子线程。然而,其实设置了优先级,也无法保障线程的执行次序。只不过,优先级高的线程获取CPU资源的概率较大,优先级低的并非没机会执行。
所以arrayBuffer.clear有可能先执行,那么exec.execute执行的arrayBuffer就是一个空的数组,所以就出现了多个线程出现了重复数据的原因,所以我们要保证的是exec.execute每次执行完arrayBuffer后再进行clear即可。而不是一开始定位的保证arrayBuffer的安全性。所以将toList操作放在外面去执行后,多线程数据就正常了。
在此感谢远在深圳的祥哥,立刻定位到问题,不然我们这次数据清洗可能真的要跑30多个小时。 再次感谢🙏🙏🙏