Android最小局域网单对单TCP实现

1. 背景

今日需要实现一个局域网的wifi数据传输的功能。不可避免的,要进行TCP socket的操作。由于逻辑比较简单,就使用最小化的Socket通信即可。

2. 目标

实现一个TCP模块,供业务层调用。业务层把要发送的数据和发送的目标给到TCP模块,TCP模块完成传输,并将传输状态和传输结果反馈给业务层

3. 需求分析

需要一个类来封装所有的TCP操作,我们定义为ChannelTransport

接口:

  1. 启动网络连接
    startTcpService()
  2. 关闭网络连接
    stopTcpService()
  3. 网络联通
    onConnected()
  4. 网络连接失败
    onConnectFail()
  5. 网络联通的情况下,一端close,另外一端收到-1
    onConnectEnd()
  6. 断网
    onConnectException()
  7. 发送数据
    sendByte(Byte[] datas)
  8. 收到数据
    onRead(Byte[] datas)

4. TCP模块封装

4.1 接口层 IfSocket

接口层主要就是一接口定义:如打开socket,关闭socket,发送数据,接收数据,连接状态监听,数据监听

public interface IfSocket {
    
    public void start();
    public void sendTo(byte[] var1);
    public void receive();
    public void stop();
    public void setConnectEventListener(SocketConnectEventListener connectEventListener);
    public void setReadStreamListener(OnStreamListener onReadStreamListener);

    public static interface SocketConnectEventListener {
        /**
         * 用于Socket主线程,socket连接成功
         */
        public void onConnected();
        /**
         * 用于Socket主线程,socket连接失败
         */
        public void onConnectFail();
        
        /**
         * 用于IOReadThread,socket 传输过程中收到-1结束符,标志对方socket close或者关闭输入
         */
        public void onConnectEnd(); 
        
        /**
         *  用于IOReadThread和IOWriteThread,socket 传输过程中的Io exception
         */
        public void onConnectException();
    }

    
/**
 * 用于IO Thread ,一次socket传输接收到的数据
 * @author xuqiang
 *
 */
    public static interface OnStreamListener {
        public void onRead(byte[] var1);
        public void onSent();
    }


}

4.2 Socket端的具体实现

几个注意点

  1. start要分server和client两种情况
  2. IO线程用线程池实现TcpWriteIORunnable TcpReadIORunnable
  3. 设计一个心跳包线程TcpWriteAliveRunable,在当前没有send数据的情况下,循环send心跳包
public class TcpSocket implements IfSocket {
    boolean isServer = true; //是不是Server
    String ipAddress;                  //Server的IP,给client用于connect的
    protected ExecutorService mThreadPool; //线程池,用于新建receive和send线程
    protected ScheduledExecutorService mScheduledThreadpool; //Timer线程池,用于发送心跳包
    protected int mState; //当前的状态
    protected Socket mSocket; 
    protected ServerSocket mServerSocket;
    protected SocketConnectEventListener mConnectEventListener;
    protected OnStreamListener mOnStreamListener;
    private InputStream mInStream;
    private OutputStream mOutStream;
    public static final byte[] SEND_TAG = new byte[] { -5, -17, -13, -19 }; //数据头部,用于数据校验
    public static final byte[] SEND_ALIVE_TAG = new byte[] { -25, -31, -37, -43 }; //心跳包
    protected TcpWriteAliveRunable mTcpWriteAliveRunable;  //心跳包的task

    public TcpSocket(boolean isServer, String ipAddress) {
        super();
        this.isServer = isServer;
        this.ipAddress = ipAddress;
    }

    @Override
    public void setConnectEventListener(
            SocketConnectEventListener connectEventListener) {
        this.mConnectEventListener = connectEventListener;
    }

    @Override
    public void setReadStreamListener(OnStreamListener onReadStreamListener) {
        this.mOnStreamListener = onReadStreamListener;
    }

    @Override
    public void start() {
        this.mThreadPool = Executors.newCachedThreadPool();
        this.mScheduledThreadpool = Executors.newScheduledThreadPool(1);
        this.mTcpWriteAliveRunable = new TcpWriteAliveRunable(
                mOutStream, mConnectEventListener);
        try {
            if (isServer) {
                mServerSocket = new ServerSocket(TcpVar.PORT);
                this.mSocket = this.mServerSocket.accept();
            } else {
                this.mSocket = new Socket(ipAddress, TcpVar.PORT);
            }
            mState = TcpVar.STATE_CONNECTED;
            mConnectEventListener.onConnected();
            Dbg.i(TcpVar.TAG, " create socket sucess");
            mSocket.setSoTimeout(20000); // 加入超时
            mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
        } catch (Exception e) {
            mState = TcpVar.STATE_CONNECT_FAIL;
            mConnectEventListener.onConnectFail();
            Dbg.w(TcpVar.TAG, " create socket failed", e);
        }

    }

    @Override
    public void receive() {
        if (mState != TcpVar.STATE_CONNECTED) {
            return;
        }
        try {
            mInStream = new BufferedInputStream(this.mSocket.getInputStream());
        } catch (IOException e) {
            mInStream = null;
        }
        mThreadPool.execute(new TcpReadIORunnable(mInStream,
                mConnectEventListener, mOnStreamListener));
    }

    @Override
    public void sendTo(byte[] var1) {
        if (mState != TcpVar.STATE_CONNECTED) {
            return;
        }
        try {
            mOutStream = new BufferedOutputStream(
                    this.mSocket.getOutputStream());
        } catch (IOException e) {
            mOutStream = null;
        }

        try {
            //发送时阻塞当前线程,心跳包暂停发送,发送完毕后,心跳包重新发送
            mScheduledThreadpool.shutdownNow();
            mThreadPool.submit(new TcpWriteIORunnable(mOutStream,
                    mConnectEventListener, mOnStreamListener,var1)).get();
            mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        };

    }

    @Override
    public void stop() {
        mThreadPool.shutdownNow();
        mScheduledThreadpool.shutdownNow();
        try {
            mSocket.close();
            mServerSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        

    }

}

4.3 TcpWriteIoRunable的实现

数据格式很简单

  1. SEND_TAG
  2. data_length
  3. data
public class TcpWriteIORunnable implements Runnable {
    OutputStream mOutStream;
    SocketConnectEventListener mConnectEventListener;
    OnStreamListener mOnStreamListener;
    byte[] data;

    public TcpWriteIORunnable(OutputStream mOutStream,
            SocketConnectEventListener mConnectEventListener,
            OnStreamListener mOnStreamListener, byte[] datas) {
        this.mOutStream = mOutStream;
        this.mConnectEventListener = mConnectEventListener;
        this.mOnStreamListener = mOnStreamListener;
        this.data = data;
    }

    @Override
    public void run() {
        try {
            mOutStream.write(TcpSocket.SEND_TAG);
            mOutStream.write(Util.int2bytes(this.data.length));
            mOutStream.write(this.data);
            mOutStream.flush();
            mOnStreamListener.onSent();
        } catch (Exception e) {
            mConnectEventListener.onConnectException();
        }

    }
}

4.4 TcpWriteAliveRunable的实现

心跳包的设计非常简单,就是循环发送SEND_ALIVE_TAG

mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);

public class TcpWriteAliveRunable implements Runnable {
    OutputStream mOutStream;
    SocketConnectEventListener mConnectEventListener;
    
    public TcpWriteAliveRunable(OutputStream mOutStream,
            SocketConnectEventListener mConnectEventListener) {
        super();
        this.mOutStream = mOutStream;
        this.mConnectEventListener = mConnectEventListener;
    }

    @Override
    public void run() {
        try{
            mOutStream.write(TcpSocket.SEND_ALIVE_TAG);
        }
         catch (Exception e) {
                mConnectEventListener.onConnectException();
            }
    }

}

4.5 TcpReadIORunnable的实现

Read线程的流程主要分三步:

  1. 校验SEND_TAG。校验的过程中我们是一个字节一个字节的校验
  2. 第二步还是在读取数据长度
  3. 第三步就是读取真正的数据了。有三种策略读数据:
    1. 一个byte一个byte的读,这样效率较低
    2. mmInStream.read(len)。但是InputStream.read(len)有个问题就是,他可能实际读取的长度是小于len的。这个len是数据读取的最大值,所以也不能直接使用;
    3. 我的算法是:mmInStream.read(len),每次记录已经read的数据量,然后通过len-readBytes得到还剩下的数据长度,然后依次循环读取,直到数据量读满len或者read==-1(断网)为止。
public class TcpReadIORunnable implements Runnable {

    private boolean isStoped = false;
    InputStream mInStream;
    SocketConnectEventListener mConnectEventListener;
    OnStreamListener mOnReadStreamListener;

    public TcpReadIORunnable(InputStream mInStream,
            SocketConnectEventListener mConnectEventListener,
            OnStreamListener mOnReadStreamListener) {
        this.mInStream = mInStream;
        this.mConnectEventListener = mConnectEventListener;
        this.mOnReadStreamListener = mOnReadStreamListener;
    }

    @Override
    public void run() {
        int i = 0;
        ByteBuffer errorByteBuffer = ByteBuffer.allocate(1024 * 16);
        while (!this.isStoped) {
            try {
                // 1.判断起始标记 start
                int t = this.mInStream.read();
                if (t == -1) {
                    Dbg.e(TcpVar.TAG, "read stream is -1!!!!!!!"); // 网络一旦断了,或者一端关闭,则出循环,结束io线程
                    mConnectEventListener.onConnectEnd();
                    break;
                }
                Dbg.d(TcpVar.TAG, "mmInStream.read() one sucess ");
                byte b = (byte) (t & 0xFF);
                if (TcpSocket.SEND_TAG[i] != b) {
                    errorByteBuffer.put(b);
                    Dbg.e(TcpVar.TAG,
                            "!read byte error i:"
                                    + i
                                    + "  b:"
                                    + EncrypUtil
                                            .byteArrayToHexStr(new byte[] { b })
                                    + "  tag:"
                                    + EncrypUtil
                                            .byteArrayToHexStr(new byte[] { TcpSocket.SEND_TAG[i] }));
                    i = 0;
                    continue;
                }
                i++;
                if (i != TcpSocket.SEND_TAG.length) {
                    continue;//继续读下一个数据,直到SEND_TAG读完
                }
                i = 0;//到此处全部SEND_TAG全部读完
                //下面是数据的打印,用于调试
                if (errorByteBuffer.position() != 0) {
                    byte[] dst = new byte[errorByteBuffer.position()];
                    errorByteBuffer.position(0);
                    errorByteBuffer.get(dst, 0, dst.length);
                    errorByteBuffer.clear();
                    Dbg.e(TcpVar.TAG,
                            "!read byte error data:"
                                    + EncrypUtil.byteArrayToHexStr(dst));
                }

                errorByteBuffer.clear();
                // 2.读取包长度
                byte[] len = new byte[4];
                for (int j = 0; j < len.length; j++) {
                    len[j] = (byte) (this.mInStream.read() & 0xFF);
                }

                // mmInStream.read(len);
                int length = Util.bytes2int(len);
                // Dbg.d("read length:"+length);
                byte[] data = new byte[length];
                Dbg.e(TcpVar.TAG, "start read data,length =  " + length);
                // 3. 读取数据

                int readBytes = 0;
                while (readBytes < data.length) {
                    int read = mInStream.read(data, readBytes, data.length
                            - readBytes);
                    if (read == -1) {
                        break;
                    }
                    readBytes += read;
                }

                mOnReadStreamListener.onRead(data);
                Dbg.d("read byte end!!!!!!!");
            } catch (Exception e) {
                Dbg.e("WifiTransferService",
                        "Fail to read bytes from input stream of Wifiiothread "
                                + e.getMessage(), e.getMessage());
                mConnectEventListener.onConnectException();
                return;
            }

        }
    }


}

5.5 Android业务层调用的注意事项。

  1. 将ChannelTransport中使用TcpSocket做相应的操作,并且实现OnStreamListener和SocketConnectEventListener,即可。
  2. Socket的开启/关闭/发送/接收,以及OnStreamListener和SocketConnectEventListener的回调都是在不同的线程中工作的。为了保证线程同步问题,我们需要使用一个HandlerThread,并将所有的callback让HandlerThread去处理;然后使用ChannelTransport去extends Handler或者再新建一个Handler与这个HandlerThread对应起来。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 大纲 一.Socket简介 二.BSD Socket编程准备 1.地址 2.端口 3.网络字节序 4.半相关与全相...
    VD2012阅读 2,268评论 0 5
  • (一)Java部分 1、列举出JAVA中6个比较常用的包【天威诚信面试题】 【参考答案】 java.lang;ja...
    独云阅读 7,066评论 0 62
  • 之前去过一些公司做笔试题,排序算是比较基础的知识了,当时要求用JavaScript写出快速排序,当时不会就用Jav...
    BrianAguilar阅读 514评论 0 1
  • 周六上午两个孩子出门学画画,整好我去听武校长教育讲座。 听完后真是受益匪浅,明白了平时自己怎么都想不透的问题!也学...
    要改掉坏脾气的妈妈阅读 234评论 0 0
  • 转眼间,已经到了2017年的深秋季节了!决定回忆一下往昔!记录一下过往!小女子出生于1994.12.02。印象深刻...
    f75637cd56f2阅读 385评论 0 0