使用Mina开发文件传输协议

前言

使用Mina大概也有半年了,一直忙于开发而忘了总结,项目里的业务系统只需要发送文字消息,而TCP底层是不区分文字还是文件的,所有的应用层报文最终都要转换为字节流,有了上次开发语音通话的经验,这次来尝试一下文件的传输,希望这篇文章对初学者有所帮助!

本文将提供部分Mina框架的原理,使用Mina开发文件传输协议的思路、解决方案和遇到的难题。如所述的内容有错误还望指出!

Mina框架的JAR开发包可以到官方网站下载:
http://mina.apache.org/

为什么使用Mina框架

Mina提供基于Java NIO的Reactor网络模型API,并且封装了会话层、表示层,减轻开发者开发编码器、解码器的负担,即便不需要NIO的特性也能很大程序的提高开发效率。通过Mina内置的API可以方便地解决粘包缺包问题,方便的将字节报文转换为应用层消息报文,通过IDLE事件可以轻松开发心跳协议,支持并发地处理应用层报文,满足于文件的并发随机读写需求。

Java NIO相比于传统的BIO(阻塞IO)模型,不需要为每一个连接创建一个线程,而是通过一个或一组线程监听多路复用器Selector中的事件,然后由具体的事件决定哪些线程去处理。多路复用器的事件变化是由底层驱动的,因此开发者可以通过轮询事件的变化来编程。

借用一张Mina框架图

Mina框架使用固定的一个单一线程池创建或监听连接事件,IoConnector和IoAcceptor都有一个Selector对象,该对象上注册了OP_CONNECT或者OP_ACCEPT事件,由IoConnector和IoAcceptor各自的单一线程池轮询。

当连接通道创建完毕,它会被封装为一个IoSession接口,然后分配给某个Processor线程执行该会话的读写。每个Processor都只维护一个Selector对象,IoSession相关事件就是被注册到这个Selector对象上。Processor线程通过select轮询注册在该Selector上的事件。

因此某个IoSession的读写一定是按顺序由某个Processor线程执行的,因为每个IoSession只注册到一个Selector上,因为一个Selector对应一个Processor线程,所以每个IoSession的读写也是固定在一个Processor线程上执行的。这一点是至关重要的,因为使用单个线程顺序读写通道的数据才可以保证数据发送和接收的有序性。如果用多个线程并发读写通道,并不能保证数据发送和接收的顺序。

虽然底层字节流数据的顺序一定要保证有序性,但是封装成应用层报文对象后并不一定要顺序处理,Mina提供了ExecutorFilter提供并发处理应用层报文的能力。

Mina框架底层具体是如何读写的?请阅读另一篇文章 《Mina框架会话读写源码分析》

文件传输思路

文件的传输实际就是文件报文的设计问题。设计好的文件报文,由编码器转换为底层字节流发送,接收端通过解码器接收字节流并解决粘包、缺包问题,最后转换为文件报文对象由具体的消息处理器处理。

文件协议的设计既要保证文件传输的完整性,还要考虑如并发、断点等额外功能需求,由要兼顾性能负载方面的需求。

本例实现了并发随机读取文件传输和并发随机写入文件。

应用层报文设计

文件协议.png

RequestSendFileMessage:请求发送文件,包含文件名、文件MD5、文件长度、业务编号、压缩后的文件长度、文件分段长度。

        setType(TYPE_REQUEST_SEND_FILE);
        JSONObject json=new JSONObject();
        json.put("fileName", fileTask.fileName);
        json.put("md5", fileTask.md5);
        json.put("size", fileTask.size);
        json.put("id", fileTask.id);
        json.put("zippedFileSize", fileTask.zippedFileSize);
        json.put("fileSegmentSize", fileTask.fileSegmentSize);
        setBody(json.toString().getBytes(Charset.forName("UTF-8")));

AcceptReceiveFileMessage:同意接收文件报文,包含业务编号。

        setType(TYPE_ACCEPT_RECEIVE_FILE);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", id);
        setBody(jsonObject.toString());

RefuseReceiveFileMessage:拒绝接收文件报文,包含业务编号、错误代号、错误描述。

        setType(TYPE_REFUSE_RECEIVE_FILE);
        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("errorCode", errorCode);
        json.put("des", des);
        setBody(json.toString());

SendFilePartMessage:发送文件分段报文,包含业务编号、分段编号、文件分段字节流.

        setType(TYPE_SEND_FILE_PART);
        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("partId", partId);
        byte[] jsonBody = json.toString().getBytes("UTF-8");
        ByteBuffer buffer = ByteBuffer.allocate(jsonBody.length + 2 + data.length); 
        buffer.putShort((short) jsonBody.length);
        buffer.put(jsonBody);
        buffer.put(data);
        setBody(buffer.array());

ReceiveFileFinishMessage:通知对方已完成文件的接收和是否完成对文件的MD5校验。

        setType(TYPE_RECEIVE_FILE_PART);
        JSONObject json = new JSONObject();
        json.put("id", id);
        json.put("success", success);
        setBody(json.toString());

流程分析

发送者先将原文件压缩,计算出压缩后的文件的长度,除以分段长度得到分段总数,一并发送给接收端:

准备发送文件.png

一方A请求发送文件,另一方B判断是否可以接收文件,然后通知A发送文件:

文件请求及回复.png

A收到B回复开发文件分段,先计算出文件要分为多少个应用报文,然后由多线程随机读取文件发送:

发送文件分段.png

B依次收到SendFilePartMessage报文,采用并发随机写入的方式将文件分段写入本地硬盘,当文件的同步分段计数器数值等于总分段数,说明任务完成,通知发送端校验结果:

接收文件分段.png

发送者收到 ReceiveFileFinishMessage,将本地的文件任务删除。

实现分析

压缩/解压文件

本例中使用了GZIPInputStream/GZIPInputStream压缩与解压文件,可以节约一定的网络传输流量:

public static void zipFile(String source, String target) throws IOException {
        FileInputStream fin = null;
        FileOutputStream fout = null;
        GZIPOutputStream gzout = null;
        try {
            fin = new FileInputStream(source);
            fout = new FileOutputStream(target);
            gzout = new GZIPOutputStream(fout);
            byte[] buf = new byte[1024];
            int num;
            while ((num = fin.read(buf)) != -1) {
                gzout.write(buf, 0, num);
            }
        } finally {
            if (gzout != null)
                gzout.close();
            if (fout != null)
                fout.close();
            if (fin != null)
                fin.close();
        }
    }

public static void unZipFile(String source, String target) throws IOException {
        FileInputStream fin = null;
        GZIPInputStream gzin = null;
        FileOutputStream fout = null;
        try {
            fin = new FileInputStream(source);
            gzin = new GZIPInputStream(fin);
            fout = new FileOutputStream(target);
            byte[] buf = new byte[1024];
            int num;
            while ((num = gzin.read(buf, 0, buf.length)) != -1) {
                fout.write(buf, 0, num);
            }
        } finally {
            if (fout != null)
                fout.close();
            if (gzin != null)
                gzin.close();
            if (fin != null)
                fin.close();
        }
    }

文件的并发随机读写

利用多线程并发读写文件需要用到RandomAccessFile,因为FileInputStream/FileReader、FileOutputStream/FileWriter均不支持在文件内部使用搜寻方法,它们的读写是流式进行的。RandomAccessFile内部有一个指针标记了当前读写文件的位置,通过使用seek方法可以将该指针定位到文件的任意位置实现随机读写。

读取指定位置的文件分段:

                RandomAccessFile randomAccessFile = new RandomAccessFile(
                        fileTask.zippedFilePath, "rw");
                byte[] buffer = new byte[fileTask.fileSegmentSize];
                int availableSize;
                randomAccessFile.seek(pardIdLocal * fileTask.fileSegmentSize);
                availableSize = randomAccessFile.read(buffer);
                randomAccessFile.close();

写入指定位置的文件分段:

                RandomAccessFile randomAccessFile = new RandomAccessFile(
                        fileTask.zippedFilePath, "rw");
                long beginIndex = fileTask.fileSegmentSize * filePart.partId;
                randomAccessFile.seek(beginIndex);
                // System.out.println("file length = "+randomAccessFile.length()+" , beginIndex = "+beginIndex);
                randomAccessFile.write(filePart.data);
                randomAccessFile.close();

编码器与解码器

了解TCP协议原理的同学都知道TCP在实际的网络传输过程中是有可能会被分片的,即便是我们通过API显示发送了一个报文,在底层缓冲区中该报文也可能会被拆分重组再发送。

假设发送端的报文有2K个字节,那么接收端处理的过程中有可能分多次获取这2K个字节,每次获取的数据都是不确定的。

Mina中可以通过自定义解码器和编码器实现上传应用报文对象到底层字节流的相关转换。

所以通过TCP协议传输数据必须有明确的应用层报文规定以避免实际网络传输中的缺包粘包问题。

本例中的应用层报文格式: |两个字节的魔数|一个字节的消息类型定义 |两个字节的消息数据长度|若干字节的消息数据|

消息基类:

public class SocketMessage implements Serializable{

    public SocketMessage(){
    }
    
    public SocketMessage(SocketMessage msg){
        setType(msg.getType());
        setBody(msg.getBody());
    }

    private static final short MAX_BODY_LENGTH = 1400;
    public static final byte HEADER1 = 0x5c;
    public static final byte HEADER2 = 0x74;
    private byte type;
    
    //定义消息类型
    public static final byte TYPE_HEART_BEAT = -1;
    public static final byte TYPE_SEND_TEXT = 0;
    public static final byte TYPE_REQUEST_SEND_FILE = 1;
    public static final byte TYPE_ACCEPT_RECEIVE_FILE = 2;
    public static final byte TYPE_REFUSE_RECEIVE_FILE = 3;
    public static final byte TYPE_SEND_FILE_PART = 4;
    public static final byte TYPE_RECEIVE_FILE_PART = 5;

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    private byte[] body;


    public byte[] getBody() {
        return body;
    }
    
    public String getJSONBody() {
        return new String(body,Charset.forName("UTF-8"));
    }

    public void setBody(byte[] bytes){
        body = bytes;
    }
    
    /**
     * 设置消息体,一般用json解析
     */
    public void setBody(String str) {
        this.body = str.getBytes(Charset.forName("UTF-8"));
    }
}

编码器:

public class BaseEncoder extends ProtocolEncoderAdapter{

    @Override
    public void encode(IoSession session, Object obj, ProtocolEncoderOutput output)
            throws Exception {
        SocketMessage msg = (SocketMessage) obj;
        IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
        buffer.order(ByteOrder.BIG_ENDIAN);
        buffer.put(SocketMessage.HEADER1);
        buffer.put(SocketMessage.HEADER2);
        buffer.put(msg.getType());
        if(msg.getType()!=SocketMessage.TYPE_HEART_BEAT){
            byte[] body = msg.getBody();
            short bodyLength = (short) body.length;
            buffer.putShort(bodyLength);
            buffer.put(body);
        }
        buffer.flip();      
        output.write(buffer); 
    }
}

解码器:

public class BaseDecoder extends CumulativeProtocolDecoder{
    @Override
    protected boolean doDecode(IoSession session,IoBuffer in,
                               ProtocolDecoderOutput out) throws Exception {
        if(in.remaining()>=3){
            in.mark(); // 标记当前位置,方便reset
            byte[] header = new byte[2];
            in.get(header, 0, header.length);
            if(header[0]== SocketMessage.HEADER1 && header[1]== SocketMessage.HEADER2){
                byte type=in.get();
                //System.out.println("类型 :"+type);
                if(type==SocketMessage.TYPE_HEART_BEAT){
                    SocketMessage msg = new SocketMessage();
                    msg.setType(SocketMessage.TYPE_HEART_BEAT);
                    out.write(msg);
                    if(in.remaining()>=3){
                        return true;
                    }
                }else{
                    if(in.remaining()>=2){
                        short bodyLength = in.getShort();
                        if(in.remaining()>=bodyLength){
                            byte[] body = new byte[bodyLength];
                            in.get(body, 0, bodyLength);
                            SocketMessage msg = new SocketMessage();
                            msg.setType(type);
                            msg.setBody(body);
                            out.write(msg);
                            if(in.remaining()>=3){
                                //再来一遍
                                return true; 
                            }
                        }else{
                            //长度不够
                            in.reset();
                        }
                    }else{
                        //长度不够
                        in.reset();
                    }
                }
            }else{
                System.err.println("HEADER[0] = "+header[0]+" , HEADER[1] = "+header[1]);
                throw new IllegalArgumentException("错误的HEADER");
            }
        }
        return false;
    }
}

文件分段长度的设定

假设设定文件分段长度为 1KB,那么发送端的每个随机读取线程都将占用1KB的内存。同理服务端处理文件分段接收的线程池的每个线程也会占用1KB的缓冲区。

虽然应用层报文在底层会被分片发送,但还是要注意,如果设置这个值过大,当执行文件任务的会话增多,很容易内存溢出。

本例中设计报文数据长度最大不能超过2^32K,使用固定长度的线程池FixedThreadPool执行文件的并发写入,即使session数量很多但是缓冲区的数量是固定的。在使用CachedThreadPool要注意内存问题。

发送端与接收端的分段同步

发送端和接收端都维护了一个FileTask对象,这个对象代表一个文件任务:

public class FileTask {
    public boolean running = true;
    public long id;
    public String zippedFilePath;
    public long zippedFileSize;
    public String fileName;
    public String filePath;
    public String md5;
    public long startTime;
    public long startTime2; //网络计时
    public long size;
    public int fileSegmentSize;
    public AtomicInteger partId = new AtomicInteger(0);
}

其中的partId 是一个原子整型同步计数器,其原理是Compare And Swipe。发送端每发送一个SendFilePartMessage都将partId +1,同理接收端每接收一个SendFilePartMessage并将文件分段写入后也执行partId +1。

这种粗略的方式只能用于统计文件所有分段是否发送完成,如果要做断点传送还需要维护一个partId数组,服务端记录那些partId已成功接收哪些没有,在恢复文件传输时将该数组发送给发送端,这样发送端就知道接下来哪些分段需要发送。

测试

在项目中创建了两个项目,一个客户端用于发送文件,一个服务端处理文件的接收。
客户端:

14:47:05 已建立连接
14:47:05 session:1 已建立
开始压缩原文件
压缩结束 , 耗时:11478 ms
原文件大小:215089914 压缩后大小:174372279
请求发送文件 G:\test2.pdf id 1496990826746
总计5677个分段
发送完毕
发送文件用时:2.447 s
平均速度:69589.46821043624 kbps
对方接收并校验成功,总计用时(压缩+发送文件+解压):21.057 s
移除FileTask 1496990826746

服务端:

已绑定 8890
14:47:05 session:1 已建立
收到接收文件请求 : test2.pdf id 1496990826746
预计占用空间 : 389462193 Bytes 
同意接收文件:test2.pdf 临时存放路径:G:\server_download\1496990838242_zipped_test2.pdf 目标路径 G:\server_download\1496990838242_test2.pdf
文件 G:\server_download\1496990838242_zipped_test2.pdf 接收完毕 文件总计174372279字节
开始解压
解压结束 , 耗时:3498 ms
原文件大小:215089914 压缩文件大小:174372279
原MD5:594a131a0166f3c6e4043f77eb6c0a2b
下载文件md5:594a131a0166f3c6e4043f77eb6c0a2b
校验通过
移除FileTask 1496990826746

因为是本地测试所以速度很快,放到实际的网络中会遇到延时、断线异常等各种问题,待完善。

项目地址:MinaExample

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,160评论 11 349
  • 非原创文章,网络收集,如遇原作者,请私聊会标明出处! 1--11 tcp协议中三次握手和四次挥手建立TCP需要三次...
    Juinjonn阅读 2,153评论 0 28
  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,840评论 0 13
  • 一直时不时地收到一些用户的询问,「简书什么时候公测啊?」,所以觉得有必要向一直等待的用户汇报一下简书近期的进展。 ...
    简书阅读 1,702评论 0 7