最近接了一个任务,需要我解析一个大文件(700MB)内的报文信息(约十万条)。这个任务有两个难点,一个是报文的解析,一个是对大文件、大数据量的处理。由于多数公司有着自己的一套自定义报文协议,因此报文的解析也不同。因此,这篇文章我们就来谈谈对大文件、大数据量的处理。
我最开始的处理方式就是一次性读入,单线程处理,把文件数据分成一条条数据段然后一条条解析。然后果然没让我失望地报了堆内存溢出的错误。经过一番艰苦的斗争,总结了一些能避免内存溢出的方法。废话不说,直接开始。
1、分页读入
如果用户传来多大的文件我们都直接读,这是十分危险的,因为我们无法控制用户传来的文件大小,一旦用户传来了一个特别大的文件,我们就可能直接内存溢出了。
再联想我们逛淘宝的时候,不也是显示十几个宝贝,然后要我们往下拉才会显示下一页的宝贝嘛。一个手机屏幕装不下上百个商品,同样的,一个人也不能一口气看完几千个数据。因此我们可以有节制的读入一定量能处理的数据,等用户要读下一页内容了再处理下一页数据。
说完了分页是什么,那就来说一下分页的大体思路吧。例如,一次读入20MB数据,我们知道,最后一个报文大概率会被截断。如下图所示,蓝色部分为几个完整的报文数据,而灰色部分表示最后一个不完整的报文。我们的做法就是丢弃掉最后一个不完整的报文,将蓝色部分作为page1。然后page2从page1的结尾处开始,再读入20MB数据……
具体的操作在第二部分一起讲了吧。
2、使用位置信息代替二进制内容
在我的任务中,我读入的是二进制文件,所以使用字节数组byte[]进行存储。一开始我将整个数组分片后存成List<byte[]>,List中的每个元素都是一个待解析的报文数据。后来发现,一开始读入文件的时候,已经存了一个byte[],现在存的List<byte[]>,内容上是重复的。反思一下,我们只是需要记录这个byte[]的分页情况,因此可以用类似(起始地址,长度)的结构,记录每个报文的二进制数据在总byte[]中的位置,把一串报文二进制数据换成几个记录位置的int信息,能减少不少内存空间。
我们结合前两点思路,来理一下所需要的代码吧。
① 一个记录报文二进制信息的类:TcpPkg
public class TcpPkg{
/**
*@param startIndex: 页内起址
*@param length: 数据长度
*@param event: 解析后的报文数据
*/
private int startIndex;
private int length;
private Event event;
//根据需要写一些构造函数、get、set方法
public int getStartIndex(){
return startIndex;
}
}
② 一个记录一个分页信息的类:Page
public class Page{
/**
*@param startIndex: 起址位置
*@param length: 页长度
*@param tcpPkgs: 该页存放的报文信息
*/
private int startIndex;
private int length;
private List<TcpPkg> tcpPkgs;
//根据需要写一些构造函数、get、set方法
public Page(){
tcpPkgs = new ArrayList<>();
}
public Page(startIndex, length, tcpPkgs){
this.startIndex = startIndex;
this.length = length;
this.tcpPkgs = tcpPkgs;
}
public int getStartIndex(){
return startIndex;
}
public int getLength(){
return length;
}
}
③ 用一个Map维护所有的页面信息:pageMap
Map<Integer, Page> pageMap = new HashMap<>();
④ 分享一个可能会用到的方法paging:根据传入的页号,进行按报文为单位的分页。
我是在一个主要处理数据的类里,把Map<Integer, Page> pageMap设为类的成员变量。
/**
* 计算要解析的页号的位置、分片位置
* @param pageIndex: 用户当前想看的页号
*/
public void paging(int pageIndex) throws IOException{
int pagedNum = pageMap.size(); //已分页的数目,因为分页肯定是按序分的,所以Map里包含几个项目就是分了多少页。
if (pageIndex > pagedNum){
//如果传入的页数比以分页的数目大,则从已分页的位置开始往后分页
for(int i = pagedNum+1; i <= pageIndex; i++){
int pageStart;
if(i == 1){
pageStart = 0; //第一页从第0字节开始
}else{
//如果pageMap内有分好的页,找最后一项的结束位置
pageStart = pageMap.get(i-1).getStartIndex()+pageMap.get(i-1).getLength();
}
byte[] buffer = getFileBytes(filePath, pageStart, pageLength); //读入这一页的byte[]
// !!!大家根据自己的分片函数,获得这一页的tcpPkgs
Vector<TcpPkg> tcpPkgs = 自己的分片函数();
TcpPkg lastTcpPkg = tcpPkgs.get(tcpPkgs.size()-1);
int currentPageLength = lastTcpPkg.getStartIndex();
tcpPkgs.remove(singleTcpPkgs.size()-1);
pageMap.put(i, new Page(pageStart, currentPageLength, tcpPkgs)); //将分片情况存入pageMap
}
}
}
public static byte[] getFileBytes(String filePath, int startIndex, int length) throws IOException{
FileInputStream stream = new FileInputStream(filePath);
stream.skip(startIndex); //跳过之前的字节数
byte[] buffer = new byte[length];
stream.read(buffer);
stream.close();
return buffer;
}
3、多线程处理
① 要用多线程处理,我们就要先把这堆报文均匀分配给每个线程。
public List<Lists<T>> splitList(List<T> list, int pageSize){
int listSize = list.size();
int page = (listSize + (pageSize - 1)) / pageSize;
List<List<T>> listArray = new ArrayList<List<T>>();
for(int i = 0; i < page; i++){
List<T> subList = new ArrayList<T>();
for(int j = 0; j < listSize; j++){
int pageIndex = ((j + 1) + (pageSize - 1)) / pageSize;
if (pageIndex == (i + 1)){
subList.add(list.get(j));
}
if ((j + 1) == ((j + 1) * pageSize)){
break;
}
}
listArray.add(subList);
}
return listArray;
}
② 构造一个线程处理类
public class ParsingThread implements Runnable{
private byte[] buffer;
private List<TcpPkgs> tcpPkgs;
/**
*@param buffer: 当前页的二进制内容
*@param tcpPkgs: 当前线程分配到的报文二进制信息列表
*/
public ParsingThread(byte[] buffer, List<TcpPkg> tcpPkgs){
this.buffer = buffer;
this.tcpPkgs = tcpPkgs;
}
@Override
public void run(){
for(int i = 0; i < tcpPkgs.size(); i++){
TcpPkg tcpPkg = tcpPkgs.get(i);
TODO:根据tcpPkg和buffer,解析对应字段
存到tcpPkgs.get(i)的Event当中
}
}
public List<TcpPkg> getTcpPkgs(){return tcpPkgs;}
}
③ 编写主线程。
主线程主要分三个步骤:<1>切割数据,为多线程做准备; <2>启动多线程,让它们去执行解析程序;<3>汇总所有线程的解析结果
我一开始编写多线程的时候,觉得最后的数据汇总是个特别困难的事情。我曾经用Future和CallableTask、CyclicBarrier和await(),但都没能解决我的问题...可能是我没用对,而且我任务里的实际状况有点麻烦吧。
最后,为了汇总所有的线程结果,我的思路是:
(1)在自定义的ParsingThread中,写个getXXX方法,用来获取最后的解析结果。
(2)用Thread thread = new Thread(parsingThread)和thread.start()来启动线程。
(3)用thread.join(),让所有线程执行完毕后等在那里.
(4)等所有线程都执行完毕后,再通过ParsingThread的getXXX()方法获取每个线程的结果。
大致的代码如下所示:
int batchSize = 1000; //设置线程个数,因为我数据比较多,设了1000个,按需设置10、100个都是可以的
List<List <TcpPkgs>> parsingList = splitList(tcpPkgs, batchSize);
Vector<Thread> threads = new Vector<>();
Vector<ParsingThread> pthreads = new Vector<>();
//启动所有线程,让线程自己去解析
for(int i = 0; i < parsingList.size(); i++){
List<TcpPkg> tcpPkgList = parsingList.get(i);
ParsingThread pthread = new ParsingThread(buffer,tcpPkgList);
pthreads.add(pThread);
Thread thread = new Thread(pThread);
threads.add(thread);
thread.start();
}
//等待所有线程执行完毕
for(Thread thread: threads){
try{
thread.join();
} catch (InterruptedException e){
e.printStackTrace();
}
}
//然后可以汇总数据啦
List<TcpPkg> parsingPkgList = new ArrayList<>();
for(ParsingThread pthread : pthreads){
List<TcpPkg> parsingPkgs_pthread = pthread.getTcpPkgs();
parsingPkgList.addAll(parsingPkgs_pthread);
}
for(Thread thread: threads){
thread.interrupt();
}
最后存的parsingPkgList就是我们汇总的数据啦!
结尾
本篇文章其实只是提供了一个思路,并不是完整的代码,毕竟大家在实际应用的时候也会对查到的代码做很多调整。
我在完成这个任务的时候,很难找到一篇能解决我问题的文章。因此,在自己磕磕绊绊地摸索着解决了这个问题后,很希望这一点微不足道的小经验能帮上遇到类似问题的有缘人。最后,祝大家都能解决问题,身体健康,保持开心!