使用多线程和分页处理大数据量数据

        最近接了一个任务,需要我解析一个大文件(700MB)内的报文信息(约十万条)。这个任务有两个难点,一个是报文的解析,一个是对大文件、大数据量的处理。由于多数公司有着自己的一套自定义报文协议,因此报文的解析也不同。因此,这篇文章我们就来谈谈对大文件、大数据量的处理。
        我最开始的处理方式就是一次性读入,单线程处理,把文件数据分成一条条数据段然后一条条解析。然后果然没让我失望地报了堆内存溢出的错误。经过一番艰苦的斗争,总结了一些能避免内存溢出的方法。废话不说,直接开始。

1、分页读入

        如果用户传来多大的文件我们都直接读,这是十分危险的,因为我们无法控制用户传来的文件大小,一旦用户传来了一个特别大的文件,我们就可能直接内存溢出了。
        再联想我们逛淘宝的时候,不也是显示十几个宝贝,然后要我们往下拉才会显示下一页的宝贝嘛。一个手机屏幕装不下上百个商品,同样的,一个人也不能一口气看完几千个数据。因此我们可以有节制的读入一定量能处理的数据,等用户要读下一页内容了再处理下一页数据。
        说完了分页是什么,那就来说一下分页的大体思路吧。例如,一次读入20MB数据,我们知道,最后一个报文大概率会被截断。如下图所示,蓝色部分为几个完整的报文数据,而灰色部分表示最后一个不完整的报文。我们的做法就是丢弃掉最后一个不完整的报文,将蓝色部分作为page1。然后page2从page1的结尾处开始,再读入20MB数据……


        具体的操作在第二部分一起讲了吧。

2、使用位置信息代替二进制内容

        在我的任务中,我读入的是二进制文件,所以使用字节数组byte[]进行存储。一开始我将整个数组分片后存成List<byte[]>,List中的每个元素都是一个待解析的报文数据。后来发现,一开始读入文件的时候,已经存了一个byte[],现在存的List<byte[]>,内容上是重复的。反思一下,我们只是需要记录这个byte[]的分页情况,因此可以用类似(起始地址,长度)的结构,记录每个报文的二进制数据在总byte[]中的位置,把一串报文二进制数据换成几个记录位置的int信息,能减少不少内存空间。
       我们结合前两点思路,来理一下所需要的代码吧。
① 一个记录报文二进制信息的类:TcpPkg

public class TcpPkg{
    /**
    *@param startIndex: 页内起址
    *@param length: 数据长度
    *@param event: 解析后的报文数据
    */
    private int startIndex;
    private int length;
    private Event event;
    //根据需要写一些构造函数、get、set方法
    public int getStartIndex(){
        return startIndex;
    }
}

② 一个记录一个分页信息的类:Page

public class Page{
    /**
    *@param startIndex: 起址位置
    *@param length: 页长度
    *@param tcpPkgs: 该页存放的报文信息
    */
    private int startIndex;
    private int length;
    private List<TcpPkg> tcpPkgs;
    //根据需要写一些构造函数、get、set方法
    public Page(){
        tcpPkgs = new ArrayList<>();
    }
    public Page(startIndex, length, tcpPkgs){
        this.startIndex = startIndex;
        this.length = length;
        this.tcpPkgs = tcpPkgs;
    }
    public int getStartIndex(){
        return startIndex;
    }
    public int getLength(){
        return length;
    }
}

③ 用一个Map维护所有的页面信息:pageMap

Map<Integer, Page> pageMap = new HashMap<>();

④ 分享一个可能会用到的方法paging:根据传入的页号,进行按报文为单位的分页。
        我是在一个主要处理数据的类里,把Map<Integer, Page> pageMap设为类的成员变量。

/**
* 计算要解析的页号的位置、分片位置
* @param pageIndex: 用户当前想看的页号
*/
public void paging(int pageIndex) throws IOException{
    int pagedNum = pageMap.size(); //已分页的数目,因为分页肯定是按序分的,所以Map里包含几个项目就是分了多少页。
    if (pageIndex > pagedNum){
        //如果传入的页数比以分页的数目大,则从已分页的位置开始往后分页
        for(int i = pagedNum+1; i <= pageIndex; i++){
            int pageStart;
            if(i == 1){
                pageStart = 0; //第一页从第0字节开始
            }else{
                //如果pageMap内有分好的页,找最后一项的结束位置
                pageStart = pageMap.get(i-1).getStartIndex()+pageMap.get(i-1).getLength();
            }
            byte[] buffer = getFileBytes(filePath, pageStart, pageLength); //读入这一页的byte[]
           // !!!大家根据自己的分片函数,获得这一页的tcpPkgs
            Vector<TcpPkg> tcpPkgs = 自己的分片函数();
            TcpPkg lastTcpPkg = tcpPkgs.get(tcpPkgs.size()-1);
            int currentPageLength = lastTcpPkg.getStartIndex();
            tcpPkgs.remove(singleTcpPkgs.size()-1);
            pageMap.put(i, new Page(pageStart, currentPageLength, tcpPkgs)); //将分片情况存入pageMap
        }
    }
}
public static byte[] getFileBytes(String filePath, int startIndex, int length) throws IOException{
    FileInputStream stream = new FileInputStream(filePath);
    stream.skip(startIndex); //跳过之前的字节数
    byte[] buffer = new byte[length];
    stream.read(buffer);
    stream.close();
    return buffer;
}

3、多线程处理

① 要用多线程处理,我们就要先把这堆报文均匀分配给每个线程。

public List<Lists<T>> splitList(List<T> list, int pageSize){
    int listSize = list.size();
    int page = (listSize + (pageSize - 1)) / pageSize;
    List<List<T>> listArray = new ArrayList<List<T>>();
    for(int i = 0; i < page; i++){
        List<T> subList = new ArrayList<T>();
        for(int j = 0; j < listSize; j++){
             int pageIndex = ((j + 1) + (pageSize - 1)) / pageSize;
             if (pageIndex == (i + 1)){
                 subList.add(list.get(j));
             }
             if ((j + 1) == ((j + 1) * pageSize)){
                 break;
             }
         }
         listArray.add(subList);
    }
    return listArray;
}

② 构造一个线程处理类

public class ParsingThread implements Runnable{
    private byte[] buffer;
    private List<TcpPkgs> tcpPkgs;
    /**
    *@param buffer: 当前页的二进制内容
    *@param tcpPkgs: 当前线程分配到的报文二进制信息列表
    */
    public ParsingThread(byte[] buffer, List<TcpPkg> tcpPkgs){
        this.buffer = buffer;
        this.tcpPkgs = tcpPkgs;
    }
    @Override
    public void run(){
        for(int i = 0; i < tcpPkgs.size(); i++){
            TcpPkg tcpPkg = tcpPkgs.get(i);
            TODO:根据tcpPkg和buffer,解析对应字段
            存到tcpPkgs.get(i)的Event当中
        }
    }
    public List<TcpPkg> getTcpPkgs(){return tcpPkgs;}
}

③ 编写主线程。
        主线程主要分三个步骤:<1>切割数据,为多线程做准备; <2>启动多线程,让它们去执行解析程序;<3>汇总所有线程的解析结果
        我一开始编写多线程的时候,觉得最后的数据汇总是个特别困难的事情。我曾经用Future和CallableTask、CyclicBarrier和await(),但都没能解决我的问题...可能是我没用对,而且我任务里的实际状况有点麻烦吧。
        最后,为了汇总所有的线程结果,我的思路是:
(1)在自定义的ParsingThread中,写个getXXX方法,用来获取最后的解析结果。
(2)用Thread thread = new Thread(parsingThread)和thread.start()来启动线程。
(3)用thread.join(),让所有线程执行完毕后等在那里.
(4)等所有线程都执行完毕后,再通过ParsingThread的getXXX()方法获取每个线程的结果。
        大致的代码如下所示:

int batchSize = 1000;  //设置线程个数,因为我数据比较多,设了1000个,按需设置10、100个都是可以的
List<List <TcpPkgs>> parsingList = splitList(tcpPkgs, batchSize);
Vector<Thread> threads = new Vector<>();
Vector<ParsingThread> pthreads = new Vector<>();
//启动所有线程,让线程自己去解析
for(int i = 0; i < parsingList.size(); i++){
    List<TcpPkg> tcpPkgList = parsingList.get(i);
    ParsingThread pthread = new ParsingThread(buffer,tcpPkgList);
    pthreads.add(pThread);
    Thread thread = new Thread(pThread);
    threads.add(thread);
    thread.start();
}
//等待所有线程执行完毕
for(Thread thread: threads){
    try{
        thread.join();
    } catch (InterruptedException e){
        e.printStackTrace();
    }
}
//然后可以汇总数据啦
List<TcpPkg> parsingPkgList = new ArrayList<>();
for(ParsingThread pthread : pthreads){
    List<TcpPkg> parsingPkgs_pthread = pthread.getTcpPkgs();
    parsingPkgList.addAll(parsingPkgs_pthread);
}
for(Thread thread: threads){
    thread.interrupt();
}

       最后存的parsingPkgList就是我们汇总的数据啦!

结尾

       本篇文章其实只是提供了一个思路,并不是完整的代码,毕竟大家在实际应用的时候也会对查到的代码做很多调整。
       我在完成这个任务的时候,很难找到一篇能解决我问题的文章。因此,在自己磕磕绊绊地摸索着解决了这个问题后,很希望这一点微不足道的小经验能帮上遇到类似问题的有缘人。最后,祝大家都能解决问题,身体健康,保持开心!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • 1.进程与线程的概念,以及为什么要有进程线程,其中有什么区别,他们各自又是怎么同步的 基本概念:进程是对运行时程序...
    某WAP阅读 645评论 0 0
  • 对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了。而并发问题是绝大部分的程序...
    木有鱼丸啦阅读 910评论 0 1
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,520评论 28 53
  • 信任包括信任自己和信任他人 很多时候,很多事情,失败、遗憾、错过,源于不自信,不信任他人 觉得自己做不成,别人做不...
    吴氵晃阅读 6,181评论 4 8
  • 怎么对待生活,它也会怎么对你 人都是哭着来到这个美丽的人间。每个人从来到尘寰到升入天堂,整个生命的历程都是一本书,...
    静静在等你阅读 4,956评论 1 6