1. 背景
今日需要实现一个局域网的wifi数据传输的功能。不可避免的,要进行TCP socket的操作。由于逻辑比较简单,就使用最小化的Socket通信即可。
2. 目标
实现一个TCP模块,供业务层调用。业务层把要发送的数据和发送的目标给到TCP模块,TCP模块完成传输,并将传输状态和传输结果反馈给业务层
3. 需求分析
需要一个类来封装所有的TCP操作,我们定义为ChannelTransport
接口:
- 启动网络连接
startTcpService() - 关闭网络连接
stopTcpService() - 网络联通
onConnected() - 网络连接失败
onConnectFail() - 网络联通的情况下,一端close,另外一端收到-1
onConnectEnd() - 断网
onConnectException() - 发送数据
sendByte(Byte[] datas) - 收到数据
onRead(Byte[] datas)
4. TCP模块封装
4.1 接口层 IfSocket
接口层主要就是一接口定义:如打开socket,关闭socket,发送数据,接收数据,连接状态监听,数据监听
public interface IfSocket {
public void start();
public void sendTo(byte[] var1);
public void receive();
public void stop();
public void setConnectEventListener(SocketConnectEventListener connectEventListener);
public void setReadStreamListener(OnStreamListener onReadStreamListener);
public static interface SocketConnectEventListener {
/**
* 用于Socket主线程,socket连接成功
*/
public void onConnected();
/**
* 用于Socket主线程,socket连接失败
*/
public void onConnectFail();
/**
* 用于IOReadThread,socket 传输过程中收到-1结束符,标志对方socket close或者关闭输入
*/
public void onConnectEnd();
/**
* 用于IOReadThread和IOWriteThread,socket 传输过程中的Io exception
*/
public void onConnectException();
}
/**
* 用于IO Thread ,一次socket传输接收到的数据
* @author xuqiang
*
*/
public static interface OnStreamListener {
public void onRead(byte[] var1);
public void onSent();
}
}
4.2 Socket端的具体实现
几个注意点
- start要分server和client两种情况
- IO线程用线程池实现TcpWriteIORunnable TcpReadIORunnable
- 设计一个心跳包线程TcpWriteAliveRunable,在当前没有send数据的情况下,循环send心跳包
public class TcpSocket implements IfSocket {
boolean isServer = true; //是不是Server
String ipAddress; //Server的IP,给client用于connect的
protected ExecutorService mThreadPool; //线程池,用于新建receive和send线程
protected ScheduledExecutorService mScheduledThreadpool; //Timer线程池,用于发送心跳包
protected int mState; //当前的状态
protected Socket mSocket;
protected ServerSocket mServerSocket;
protected SocketConnectEventListener mConnectEventListener;
protected OnStreamListener mOnStreamListener;
private InputStream mInStream;
private OutputStream mOutStream;
public static final byte[] SEND_TAG = new byte[] { -5, -17, -13, -19 }; //数据头部,用于数据校验
public static final byte[] SEND_ALIVE_TAG = new byte[] { -25, -31, -37, -43 }; //心跳包
protected TcpWriteAliveRunable mTcpWriteAliveRunable; //心跳包的task
public TcpSocket(boolean isServer, String ipAddress) {
super();
this.isServer = isServer;
this.ipAddress = ipAddress;
}
@Override
public void setConnectEventListener(
SocketConnectEventListener connectEventListener) {
this.mConnectEventListener = connectEventListener;
}
@Override
public void setReadStreamListener(OnStreamListener onReadStreamListener) {
this.mOnStreamListener = onReadStreamListener;
}
@Override
public void start() {
this.mThreadPool = Executors.newCachedThreadPool();
this.mScheduledThreadpool = Executors.newScheduledThreadPool(1);
this.mTcpWriteAliveRunable = new TcpWriteAliveRunable(
mOutStream, mConnectEventListener);
try {
if (isServer) {
mServerSocket = new ServerSocket(TcpVar.PORT);
this.mSocket = this.mServerSocket.accept();
} else {
this.mSocket = new Socket(ipAddress, TcpVar.PORT);
}
mState = TcpVar.STATE_CONNECTED;
mConnectEventListener.onConnected();
Dbg.i(TcpVar.TAG, " create socket sucess");
mSocket.setSoTimeout(20000); // 加入超时
mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
} catch (Exception e) {
mState = TcpVar.STATE_CONNECT_FAIL;
mConnectEventListener.onConnectFail();
Dbg.w(TcpVar.TAG, " create socket failed", e);
}
}
@Override
public void receive() {
if (mState != TcpVar.STATE_CONNECTED) {
return;
}
try {
mInStream = new BufferedInputStream(this.mSocket.getInputStream());
} catch (IOException e) {
mInStream = null;
}
mThreadPool.execute(new TcpReadIORunnable(mInStream,
mConnectEventListener, mOnStreamListener));
}
@Override
public void sendTo(byte[] var1) {
if (mState != TcpVar.STATE_CONNECTED) {
return;
}
try {
mOutStream = new BufferedOutputStream(
this.mSocket.getOutputStream());
} catch (IOException e) {
mOutStream = null;
}
try {
//发送时阻塞当前线程,心跳包暂停发送,发送完毕后,心跳包重新发送
mScheduledThreadpool.shutdownNow();
mThreadPool.submit(new TcpWriteIORunnable(mOutStream,
mConnectEventListener, mOnStreamListener,var1)).get();
mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
};
}
@Override
public void stop() {
mThreadPool.shutdownNow();
mScheduledThreadpool.shutdownNow();
try {
mSocket.close();
mServerSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.3 TcpWriteIoRunable的实现
数据格式很简单
- SEND_TAG
- data_length
- data
public class TcpWriteIORunnable implements Runnable {
OutputStream mOutStream;
SocketConnectEventListener mConnectEventListener;
OnStreamListener mOnStreamListener;
byte[] data;
public TcpWriteIORunnable(OutputStream mOutStream,
SocketConnectEventListener mConnectEventListener,
OnStreamListener mOnStreamListener, byte[] datas) {
this.mOutStream = mOutStream;
this.mConnectEventListener = mConnectEventListener;
this.mOnStreamListener = mOnStreamListener;
this.data = data;
}
@Override
public void run() {
try {
mOutStream.write(TcpSocket.SEND_TAG);
mOutStream.write(Util.int2bytes(this.data.length));
mOutStream.write(this.data);
mOutStream.flush();
mOnStreamListener.onSent();
} catch (Exception e) {
mConnectEventListener.onConnectException();
}
}
}
4.4 TcpWriteAliveRunable的实现
心跳包的设计非常简单,就是循环发送SEND_ALIVE_TAG
mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
public class TcpWriteAliveRunable implements Runnable {
OutputStream mOutStream;
SocketConnectEventListener mConnectEventListener;
public TcpWriteAliveRunable(OutputStream mOutStream,
SocketConnectEventListener mConnectEventListener) {
super();
this.mOutStream = mOutStream;
this.mConnectEventListener = mConnectEventListener;
}
@Override
public void run() {
try{
mOutStream.write(TcpSocket.SEND_ALIVE_TAG);
}
catch (Exception e) {
mConnectEventListener.onConnectException();
}
}
}
4.5 TcpReadIORunnable的实现
Read线程的流程主要分三步:
- 校验SEND_TAG。校验的过程中我们是一个字节一个字节的校验
- 第二步还是在读取数据长度
- 第三步就是读取真正的数据了。有三种策略读数据:
- 一个byte一个byte的读,这样效率较低
- mmInStream.read(len)。但是InputStream.read(len)有个问题就是,他可能实际读取的长度是小于len的。这个len是数据读取的最大值,所以也不能直接使用;
- 我的算法是:mmInStream.read(len),每次记录已经read的数据量,然后通过len-readBytes得到还剩下的数据长度,然后依次循环读取,直到数据量读满len或者read==-1(断网)为止。
public class TcpReadIORunnable implements Runnable {
private boolean isStoped = false;
InputStream mInStream;
SocketConnectEventListener mConnectEventListener;
OnStreamListener mOnReadStreamListener;
public TcpReadIORunnable(InputStream mInStream,
SocketConnectEventListener mConnectEventListener,
OnStreamListener mOnReadStreamListener) {
this.mInStream = mInStream;
this.mConnectEventListener = mConnectEventListener;
this.mOnReadStreamListener = mOnReadStreamListener;
}
@Override
public void run() {
int i = 0;
ByteBuffer errorByteBuffer = ByteBuffer.allocate(1024 * 16);
while (!this.isStoped) {
try {
// 1.判断起始标记 start
int t = this.mInStream.read();
if (t == -1) {
Dbg.e(TcpVar.TAG, "read stream is -1!!!!!!!"); // 网络一旦断了,或者一端关闭,则出循环,结束io线程
mConnectEventListener.onConnectEnd();
break;
}
Dbg.d(TcpVar.TAG, "mmInStream.read() one sucess ");
byte b = (byte) (t & 0xFF);
if (TcpSocket.SEND_TAG[i] != b) {
errorByteBuffer.put(b);
Dbg.e(TcpVar.TAG,
"!read byte error i:"
+ i
+ " b:"
+ EncrypUtil
.byteArrayToHexStr(new byte[] { b })
+ " tag:"
+ EncrypUtil
.byteArrayToHexStr(new byte[] { TcpSocket.SEND_TAG[i] }));
i = 0;
continue;
}
i++;
if (i != TcpSocket.SEND_TAG.length) {
continue;//继续读下一个数据,直到SEND_TAG读完
}
i = 0;//到此处全部SEND_TAG全部读完
//下面是数据的打印,用于调试
if (errorByteBuffer.position() != 0) {
byte[] dst = new byte[errorByteBuffer.position()];
errorByteBuffer.position(0);
errorByteBuffer.get(dst, 0, dst.length);
errorByteBuffer.clear();
Dbg.e(TcpVar.TAG,
"!read byte error data:"
+ EncrypUtil.byteArrayToHexStr(dst));
}
errorByteBuffer.clear();
// 2.读取包长度
byte[] len = new byte[4];
for (int j = 0; j < len.length; j++) {
len[j] = (byte) (this.mInStream.read() & 0xFF);
}
// mmInStream.read(len);
int length = Util.bytes2int(len);
// Dbg.d("read length:"+length);
byte[] data = new byte[length];
Dbg.e(TcpVar.TAG, "start read data,length = " + length);
// 3. 读取数据
int readBytes = 0;
while (readBytes < data.length) {
int read = mInStream.read(data, readBytes, data.length
- readBytes);
if (read == -1) {
break;
}
readBytes += read;
}
mOnReadStreamListener.onRead(data);
Dbg.d("read byte end!!!!!!!");
} catch (Exception e) {
Dbg.e("WifiTransferService",
"Fail to read bytes from input stream of Wifiiothread "
+ e.getMessage(), e.getMessage());
mConnectEventListener.onConnectException();
return;
}
}
}
}
5.5 Android业务层调用的注意事项。
- 将ChannelTransport中使用TcpSocket做相应的操作,并且实现OnStreamListener和SocketConnectEventListener,即可。
- Socket的开启/关闭/发送/接收,以及OnStreamListener和SocketConnectEventListener的回调都是在不同的线程中工作的。为了保证线程同步问题,我们需要使用一个HandlerThread,并将所有的callback让HandlerThread去处理;然后使用ChannelTransport去extends Handler或者再新建一个Handler与这个HandlerThread对应起来。