Android开发之使用Netty进行Socket编程(三)

Android开发之使用Netty进行Socket编程(二) 主要介绍了Netty框架内的主要使用的类以及 在客户端基本的建立连接并接收消息 的简单示例。众所周知,在Android应用开发中,一般是发起网络请求,拿到数据后异步更新UI,那么本文就介绍在开发过程中,如何封装Netty,方便开发者请求服务端的数据并异步地更新UI。

1 基本功能

  1. 与服务器建立TCP连接,TCP是面向连接的协议,只能用于点对点的通讯,所以对服务器的定位是IP地址+端口号。
  2. 发送消息给服务器,相当于发起请求,这个消息里面应该包含 与后台协议好的校验部分 以及 提供给服务器索引数据的参数部分
  3. 理想状态下,TCP连接一旦建立,在通信双方中的任何一方主动关闭连接前,TCP 连接都将被一直保持下去,所以需要定时不间断地给服务器发送一个后台定义的消息段,即心跳包,让对方知道自己“在线”。
  4. 接收服务器返回的数据,并且拿到这些数据异步地去更新程序页面,将获取到的JSON文本解析成POJO。

2 接口的定义

利用Java的回调机制,在子线程建立连接并发出请求,在接收服务器返回的数据后告诉主线程更新UI。还包括一些连接异常的处理都通过回调接口实现。

public interface INettyClient {
    void connect(String host, int port);//1. 建立连接
    void sendMessage(int mt, String msg, long delayed);//2. 发送消息
    void addDataReceiveListener(OnDataReceiveListener listener);//3. 为不同的请求添加监听器

    interface OnDataReceiveListener {
        void onDataReceive(int mt, String json);//接收到数据时触发
    }

    interface OnConnectStatusListener {
        void onDisconnected();//连接异常时触发
    }
}

3 对服务器返回数据的处理

对数据的处理主要是在ChannelHandler中完成(详见Android开发之使用Netty进行Socket编程(二) )。在这里我们主要继承了ChannelInboundHandlerAdapter,并提供了回调接口供主线程更新UI。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final String TAG = "Netty";
private INettyClient.OnConnectStatusListener statusListener;
private List<INettyClient.OnDataReceiveListener> listeners = new ArrayList<>();

 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//channelActive()方法将会在连接被建立并且准备进行通信时被调用。
        Log.d(TAG, "channel active");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
//channelRead()方法是在数据被接收的时候调用。
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
//verify(String body)方法对服务器返回的数据进行校验,并取出数据部分。
//具体校验的方法需要与后台同事进行协议。
        body = verify(body);

        Log.d(TAG, "verify : " + body);
        if (null != body)
            parseJson(body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,
//即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。
//在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
        ctx.close();
        Log.e(TAG, "Unexpected exception from downstream : "
                + cause.getMessage());
        if (statusListener != null)//连接异常时触发onDisconnected()
            statusListener.onDisconnected();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        LogUtils.d(TAG, "channelReadComplete");
    }

//对数据进行解析,拿出区分不同请求的 flag字段,再根据不同的flag字段去触发相对应的监听器
    private void parseJson(String json) {
        try {
            JSONObject jObject = new JSONObject(json);
            int msgType = jObject.getInt(Constant.FLAG_MT);
            Log.d(TAG, "parseJson message type: " + msgType + "  json: " + json);
            callListeners(msgType, json);
        } catch (Exception e) {
            LogUtils.e(TAG, "parseJson exception: " + e.getMessage());
            e.printStackTrace();
        }
    }

//遍历监听器List,触发拥有正确msgType 的OnDataReceiveListener,
//回调 void onDataReceive(int mt, String json);方法
     private void callListeners(final int msgType , final String json) {
        for (final INettyClient.OnDataReceiveListener listener : listeners)
            if (listener != null)
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {//主线程中进行
                        listener.onDataReceive(mt, json);
                    }
                });
    }

//绑定OnDataReceiveListener 
    public void addDataReceiveListener(INettyClient.OnDataReceiveListener listener) {
        if (!listeners.contains(listener))
            listeners.add(listener);
    }
//绑定OnConnectStatusListener
    public void setConnectStatusListener(INettyClient.OnConnectStatusListener listener) {   
         statusListener = listener;
    }
}```
以上,就是一个供Android客户端使用的``ChannelHandler``,可以通过实现具体的``OnDataReceiveListener ``来异步地获得服务器返回的 数据。

## 4 NettyClient的实现
以上仅仅是展示了如何处理服务器返回的数据。建立连接、发送消息以及心跳包的功能还没进行封装。在2.接口的定义 里面已经定义好了NettyClient应该具备哪些行为,现在进行具体的实现。
主要的实现思路是:
1. 构建Bootstrap,其中包括设置好ChannelHandler来处理将来接收到的数据(详见[Android开发之使用Netty进行Socket编程(二)](http://www.jianshu.com/p/db74e673e43c) )。由Boostrap建立连接。通过``   channel.writeAndFlush(constructMessage(sendMsg)).sync()``发送消息。这些工作都在子线程完成。
2. 在子线程 建立连接并向服务器发送请求,这里采用了`HanlderThread`+`Handler`的方案。通过`Looper`依次从`Handler`的队列中获取信息,逐个进行处理,保证安全,不会出现混乱。
3. 心跳包的发送通过``handleMessage(Message msg)``中的死循环进行不间断地发送。
4. `NettyClientHandler `的实现中我们已经知道,当Netty异常时会触发`statusListener.onDisconnected();`,NettyClient中,onDisconnected()方法会进行重连操作。
接收到服务器返回的消息时,会在主线程中触发``onDataReceiveListener .onDataReceive(mt, json);``。
5. 外部通过单例模式进行调用。

public class NettyClient implements INettyClient {
private final String TAG = NettyClient.class.getSimpleName();
private static NettyClient mInstance;
private Bootstrap bootstrap;
private Channel channel;
private String host;
private int port;
private HandlerThread workThread = null;
private Handler mWorkHandler = null;
private NettyClientHandler nettyClientHandler;

private final String ACTION_SEND_TYPE = "action_send_type";
private final String ACTION_SEND_MSG = "action_send_msg";
private final int MESSAGE_INIT = 0x1;
private final int MESSAGE_CONNECT = 0x2;
private final int MESSAGE_SEND = 0x3;
private Handler.Callback mWorkHandlerCallback = new Handler.Callback() {

    @Override
    public boolean handleMessage(Message msg) {
        switch (msg.what) {
            case MESSAGE_INIT: {
                NioEventLoopGroup  group = new NioEventLoopGroup();
                bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(group);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                        pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
                        pipeline.addLast(nettyClientHandler);
                    }
                });
                break;
            }
            case MESSAGE_CONNECT: {
                try {
                    if (TextUtils.isEmpty(host) || port == 0) {
                        Exception exception = new Exception("Netty host | port is invalid");
                        throw exception;
                    }
                    channel = bootstrap.connect(new InetSocketAddress(host,
                            port)).sync().channel();
                } catch (Exception e) {
                    LogUtils.e(TAG, "connect failed  " + e.getMessage() + "  reconnect delay: " + Constant.DELAY_CONNECT);
                    ToastUtils.showOnOtherThread("服务器连接失败");
                    e.printStackTrace();
                    sendReconnectMessage();
                }
                break;
            }
            case MESSAGE_SEND: {
                String sendMsg = msg.getData().getString(ACTION_SEND_MSG);
                int mt = msg.getData().getInt(ACTION_SEND_TYPE);
                try {
                    if (channel != null && channel.isOpen()) {
                        channel.writeAndFlush(constructMessage(sendMsg)).sync();
                        Log.d(TAG, "send succeed " + constructMessage(sendMsg));
                    } else {
                        throw new Exception("channel is null | closed");
                    }
                } catch (Exception e) {
                    LogUtils.e(TAG, "send failed " + e.getMessage());
                    sendReconnectMessage();
                    e.printStackTrace();
                } finally {
                    if (Constant.MT.HAND_SHAKE.getType() == mt)
                        sendMessage(mt, sendMsg, Constant.DELAY_HAND_SHAKE);
                }

                break;
            }
        }
        return true;
    }
};

private NettyClient() {
    init();
}

public synchronized static NettyClient getInstance() {
    if (mInstance == null)
        mInstance = new NettyClient();
    return mInstance;
}

private void init() {
    workThread = new HandlerThread(NettyClient.class.getName());
    workThread.start();
    mWorkHandler = new Handler(workThread.getLooper(), mWorkHandlerCallback);
    nettyClientHandler = new NettyClientHandler();
    nettyClientHandler.setConnectStatusListener(new OnConnectStatusListener() {
        @Override
        public void onDisconnected() {
            sendReconnectMessage();
        }
    });
    mWorkHandler.sendEmptyMessage(MESSAGE_INIT);
}

@Override
public void connect(String host, int port) {
    this.host = host;
    this.port = port;
    mWorkHandler.sendEmptyMessage(MESSAGE_CONNECT);
}

@Override
public void addDataReceiveListener(OnDataReceiveListener listener) {
    if (nettyClientHandler != null)
        nettyClientHandler.addDataReceiveListener(listener);
}

private void sendReconnectMessage() {
   mWorkHandler.sendEmptyMessageDelayed(MESSAGE_CONNECT, Constant.DELAY_CONNECT);
}

@Override
public void sendMessage(int mt, String msg, long delayed) {
    if (TextUtils.isEmpty(msg))
        return;
    Message message = new Message();
    Bundle bundle = new Bundle();
    message.what = MESSAGE_SEND;
    bundle.putString(ACTION_SEND_MSG, msg);
    bundle.putInt(ACTION_SEND_TYPE, mt);
    message.setData(bundle);
    mWorkHandler.sendMessageDelayed(message, delayed);
}

private String constructMessage(String json) {
    String message;
    //与后台协议好,如何设置校验部分,然后和json一起发给服务器
    return message;
}

}


## 5 NettyClient的使用
NettyClient采用了全局单例的模式。
1. 在Activity或者Fragment中为NettyClient绑定相应的数据接收监听器:
  NettyClient.getInstance().addDataReceiveListener(new INettyClient.OnDataReceiveListener() {
        @Override
        public void onDataReceive(int mt, String json) {

//这些逻辑运行在主线程
if (mt == Constant.MSG_TYPE) {
UserModel user=new Gson().fromJson(json, UserModel .class);
textView.setText(user.getName);
}
}

2. 在Activity或者Fragment中发起请求:
```NettyClient.getInstance().sendMessage(Constant.MSG_TYPE, "发向服务器的消息", 0);```
可以看到使用起来十分简单方便。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容