Android开发之使用Netty进行Socket编程(二) 主要介绍了Netty框架内的主要使用的类以及 在客户端基本的建立连接并接收消息 的简单示例。众所周知,在Android应用开发中,一般是发起网络请求,拿到数据后异步更新UI,那么本文就介绍在开发过程中,如何封装Netty,方便开发者请求服务端的数据并异步地更新UI。
1 基本功能
- 与服务器建立TCP连接,TCP是面向连接的协议,只能用于点对点的通讯,所以对服务器的定位是IP地址+端口号。
- 发送消息给服务器,相当于发起请求,这个消息里面应该包含 与后台协议好的校验部分 以及 提供给服务器索引数据的参数部分。
- 理想状态下,TCP连接一旦建立,在通信双方中的任何一方主动关闭连接前,TCP 连接都将被一直保持下去,所以需要定时不间断地给服务器发送一个后台定义的消息段,即心跳包,让对方知道自己“在线”。
- 接收服务器返回的数据,并且拿到这些数据异步地去更新程序页面,将获取到的JSON文本解析成POJO。
2 接口的定义
利用Java的回调机制,在子线程建立连接并发出请求,在接收服务器返回的数据后告诉主线程更新UI。还包括一些连接异常的处理都通过回调接口实现。
public interface INettyClient {
void connect(String host, int port);//1. 建立连接
void sendMessage(int mt, String msg, long delayed);//2. 发送消息
void addDataReceiveListener(OnDataReceiveListener listener);//3. 为不同的请求添加监听器
interface OnDataReceiveListener {
void onDataReceive(int mt, String json);//接收到数据时触发
}
interface OnConnectStatusListener {
void onDisconnected();//连接异常时触发
}
}
3 对服务器返回数据的处理
对数据的处理主要是在ChannelHandler
中完成(详见Android开发之使用Netty进行Socket编程(二) )。在这里我们主要继承了ChannelInboundHandlerAdapter
,并提供了回调接口供主线程更新UI。
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final String TAG = "Netty";
private INettyClient.OnConnectStatusListener statusListener;
private List<INettyClient.OnDataReceiveListener> listeners = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//channelActive()方法将会在连接被建立并且准备进行通信时被调用。
Log.d(TAG, "channel active");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//channelRead()方法是在数据被接收的时候调用。
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
//verify(String body)方法对服务器返回的数据进行校验,并取出数据部分。
//具体校验的方法需要与后台同事进行协议。
body = verify(body);
Log.d(TAG, "verify : " + body);
if (null != body)
parseJson(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,
//即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。
//在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
ctx.close();
Log.e(TAG, "Unexpected exception from downstream : "
+ cause.getMessage());
if (statusListener != null)//连接异常时触发onDisconnected()
statusListener.onDisconnected();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
LogUtils.d(TAG, "channelReadComplete");
}
//对数据进行解析,拿出区分不同请求的 flag字段,再根据不同的flag字段去触发相对应的监听器
private void parseJson(String json) {
try {
JSONObject jObject = new JSONObject(json);
int msgType = jObject.getInt(Constant.FLAG_MT);
Log.d(TAG, "parseJson message type: " + msgType + " json: " + json);
callListeners(msgType, json);
} catch (Exception e) {
LogUtils.e(TAG, "parseJson exception: " + e.getMessage());
e.printStackTrace();
}
}
//遍历监听器List,触发拥有正确msgType 的OnDataReceiveListener,
//回调 void onDataReceive(int mt, String json);方法
private void callListeners(final int msgType , final String json) {
for (final INettyClient.OnDataReceiveListener listener : listeners)
if (listener != null)
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {//主线程中进行
listener.onDataReceive(mt, json);
}
});
}
//绑定OnDataReceiveListener
public void addDataReceiveListener(INettyClient.OnDataReceiveListener listener) {
if (!listeners.contains(listener))
listeners.add(listener);
}
//绑定OnConnectStatusListener
public void setConnectStatusListener(INettyClient.OnConnectStatusListener listener) {
statusListener = listener;
}
}```
以上,就是一个供Android客户端使用的``ChannelHandler``,可以通过实现具体的``OnDataReceiveListener ``来异步地获得服务器返回的 数据。
## 4 NettyClient的实现
以上仅仅是展示了如何处理服务器返回的数据。建立连接、发送消息以及心跳包的功能还没进行封装。在2.接口的定义 里面已经定义好了NettyClient应该具备哪些行为,现在进行具体的实现。
主要的实现思路是:
1. 构建Bootstrap,其中包括设置好ChannelHandler来处理将来接收到的数据(详见[Android开发之使用Netty进行Socket编程(二)](http://www.jianshu.com/p/db74e673e43c) )。由Boostrap建立连接。通过`` channel.writeAndFlush(constructMessage(sendMsg)).sync()``发送消息。这些工作都在子线程完成。
2. 在子线程 建立连接并向服务器发送请求,这里采用了`HanlderThread`+`Handler`的方案。通过`Looper`依次从`Handler`的队列中获取信息,逐个进行处理,保证安全,不会出现混乱。
3. 心跳包的发送通过``handleMessage(Message msg)``中的死循环进行不间断地发送。
4. `NettyClientHandler `的实现中我们已经知道,当Netty异常时会触发`statusListener.onDisconnected();`,NettyClient中,onDisconnected()方法会进行重连操作。
接收到服务器返回的消息时,会在主线程中触发``onDataReceiveListener .onDataReceive(mt, json);``。
5. 外部通过单例模式进行调用。
public class NettyClient implements INettyClient {
private final String TAG = NettyClient.class.getSimpleName();
private static NettyClient mInstance;
private Bootstrap bootstrap;
private Channel channel;
private String host;
private int port;
private HandlerThread workThread = null;
private Handler mWorkHandler = null;
private NettyClientHandler nettyClientHandler;
private final String ACTION_SEND_TYPE = "action_send_type";
private final String ACTION_SEND_MSG = "action_send_msg";
private final int MESSAGE_INIT = 0x1;
private final int MESSAGE_CONNECT = 0x2;
private final int MESSAGE_SEND = 0x3;
private Handler.Callback mWorkHandlerCallback = new Handler.Callback() {
@Override
public boolean handleMessage(Message msg) {
switch (msg.what) {
case MESSAGE_INIT: {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(nettyClientHandler);
}
});
break;
}
case MESSAGE_CONNECT: {
try {
if (TextUtils.isEmpty(host) || port == 0) {
Exception exception = new Exception("Netty host | port is invalid");
throw exception;
}
channel = bootstrap.connect(new InetSocketAddress(host,
port)).sync().channel();
} catch (Exception e) {
LogUtils.e(TAG, "connect failed " + e.getMessage() + " reconnect delay: " + Constant.DELAY_CONNECT);
ToastUtils.showOnOtherThread("服务器连接失败");
e.printStackTrace();
sendReconnectMessage();
}
break;
}
case MESSAGE_SEND: {
String sendMsg = msg.getData().getString(ACTION_SEND_MSG);
int mt = msg.getData().getInt(ACTION_SEND_TYPE);
try {
if (channel != null && channel.isOpen()) {
channel.writeAndFlush(constructMessage(sendMsg)).sync();
Log.d(TAG, "send succeed " + constructMessage(sendMsg));
} else {
throw new Exception("channel is null | closed");
}
} catch (Exception e) {
LogUtils.e(TAG, "send failed " + e.getMessage());
sendReconnectMessage();
e.printStackTrace();
} finally {
if (Constant.MT.HAND_SHAKE.getType() == mt)
sendMessage(mt, sendMsg, Constant.DELAY_HAND_SHAKE);
}
break;
}
}
return true;
}
};
private NettyClient() {
init();
}
public synchronized static NettyClient getInstance() {
if (mInstance == null)
mInstance = new NettyClient();
return mInstance;
}
private void init() {
workThread = new HandlerThread(NettyClient.class.getName());
workThread.start();
mWorkHandler = new Handler(workThread.getLooper(), mWorkHandlerCallback);
nettyClientHandler = new NettyClientHandler();
nettyClientHandler.setConnectStatusListener(new OnConnectStatusListener() {
@Override
public void onDisconnected() {
sendReconnectMessage();
}
});
mWorkHandler.sendEmptyMessage(MESSAGE_INIT);
}
@Override
public void connect(String host, int port) {
this.host = host;
this.port = port;
mWorkHandler.sendEmptyMessage(MESSAGE_CONNECT);
}
@Override
public void addDataReceiveListener(OnDataReceiveListener listener) {
if (nettyClientHandler != null)
nettyClientHandler.addDataReceiveListener(listener);
}
private void sendReconnectMessage() {
mWorkHandler.sendEmptyMessageDelayed(MESSAGE_CONNECT, Constant.DELAY_CONNECT);
}
@Override
public void sendMessage(int mt, String msg, long delayed) {
if (TextUtils.isEmpty(msg))
return;
Message message = new Message();
Bundle bundle = new Bundle();
message.what = MESSAGE_SEND;
bundle.putString(ACTION_SEND_MSG, msg);
bundle.putInt(ACTION_SEND_TYPE, mt);
message.setData(bundle);
mWorkHandler.sendMessageDelayed(message, delayed);
}
private String constructMessage(String json) {
String message;
//与后台协议好,如何设置校验部分,然后和json一起发给服务器
return message;
}
}
## 5 NettyClient的使用
NettyClient采用了全局单例的模式。
1. 在Activity或者Fragment中为NettyClient绑定相应的数据接收监听器:
NettyClient.getInstance().addDataReceiveListener(new INettyClient.OnDataReceiveListener() {
@Override
public void onDataReceive(int mt, String json) {
//这些逻辑运行在主线程
if (mt == Constant.MSG_TYPE) {
UserModel user=new Gson().fromJson(json, UserModel .class);
textView.setText(user.getName);
}
}
2. 在Activity或者Fragment中发起请求:
```NettyClient.getInstance().sendMessage(Constant.MSG_TYPE, "发向服务器的消息", 0);```
可以看到使用起来十分简单方便。