Android 之Mina的集成实现

Mina框架简介:

  1. Mina是什么东西?
    Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,Apache MINA 通常可被称之为:
    NIO 框架库;
    客户端/服务器框架库;
    或者一个网络socket库。
  2. MINA框架的特点有
    基于java NIO类库开发;采用非阻塞方式的异步传输;事件驱动;支持批量数据传输;支持TCP、UDP协议;控制反转的设计模式(支持Spring);采用优雅的松耦合架构;可灵活的加载过滤器机制;单元测试更容易实现;可自定义线程的数量,以提高运行于多处理器上的性能;采用回调的方式完成调用,线程的使用更容易。
  3. Mina的框架
    当远程客户首次访问采用MINA编写的程序时,IoAcceptor作为线程运行,负责接受来自客户的请求。当有客户请求连接时,创建一个IoSession,该IoSession与IoProcessor、SocketChannel以及IOService联系起来。IoProcessor也作为另外一个线程运行,定时检查客户是否有数据到来,并对客户请求进行处理,依次调用在IOService注册的各个IoFilter,最后调用IoHandler进行最终的逻辑处理,再将处理后的结果Filter后返回给客户端。
  4. Mina的现有应用
    MINA框架的应用比较广泛,应用的开源项目有Apache Directory、AsyncWeb、ApacheQpid、QuickFIX/J、Openfire、SubEthaSTMP、red5等。MINA框架当前稳定版本是1.1.6,最新的2.0版本目前已经发布了M1版本。

实现 Mina 三部曲

先来看下效果图

image.png

流程大概是这样的:
首先先启动一个本地的服务--启动成功后--通知前端可以开始连接了--前端连接成功后 -- 就准备发送消息--服务器接收到消息--回调给前端

  1. 下载jar包
    官网下载地址 http://mina.apache.org/mina-project/userguide/user-guide-toc.html
    这里需要下载 mina-core-2.0.16.jar和 slf4j-android-1.6.1-RC1.jar

  2. 先把下载的jar包复制粘贴至项目的libs目录下,然后在app下的build.gradle文件里面导入下载的jar包,由于我的项目里面用到了rxJava,所以就顺带导入rxJava的包。如果不懂rxJava语法,那么请看链接http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/0430/2815.html

    compile files('libs/mina-core-2.0.16.jar')
    compile files('libs/slf4j-android-1.6.1-RC1.jar')
    compile 'io.reactivex.rxjava2:rxjava:2.1.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
  1. 以上工作准备好之后,下面就开始编码之路,文末会给出demo下载地址
    3.1 客户端的核心代码:
    public void connect(final Context context) {
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
                NioSocketConnector mSocketConnector = new NioSocketConnector();
                //设置协议封装解析处理
                mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
                //设置心跳包
                KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory());
                //每 5 分钟发送一个心跳包
                heartFilter.setRequestInterval(5 * 60);
                //心跳包超时时间 10s
                heartFilter.setRequestTimeout(10);
                // 获取过滤器链
                DefaultIoFilterChainBuilder filterChain = mSocketConnector.getFilterChain();
                filterChain.addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
                // 添加编码过滤器 处理乱码、编码问题
                filterChain.addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
                mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);
                //设置 handler 处理业务逻辑
                mSocketConnector.setHandler(new HeartBeatHandler(context));
                mSocketConnector.addListener(new HeartBeatListener(mSocketConnector));
                //配置服务器地址
                InetSocketAddress mSocketAddress = new InetSocketAddress(ConnectUtils.HOST, ConnectUtils.PORT);
                //发起连接
                ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
                mFuture.awaitUninterruptibly();
                IoSession mSession = mFuture.getSession();
                Log.d("", "======连接成功" + mSession.toString());
                e.onNext(mSession);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
                IoSession mSession = (IoSession) o;
                Log.d("MainActivity", "======连接成功了吗====" + mSession.isConnected());
                SessionManager.getInstance().setSeesion(mSession);
                SessionManager.getInstance().writeToServer("你看见了吗\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

客户端设置setHandler监听 IoHandlerAdapter,这个可以自己写个类重载一下

/**
 * Created by huanghongfa on 2017/7/28.
 */

public class HeartBeatHandler extends IoHandlerAdapter {

    private final String TAG = "HeartBeatHandler";

    public static final String BROADCAST_ACTION = "com.commonlibrary.mina.broadcast";
    public static final String MESSAGE = "message";
    private Context mContext;

    public HeartBeatHandler(Context context) {
        this.mContext = context;

    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用exceptionCaught");
    }

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        Log.e(TAG,  "接收到服务器端消息:" + message.toString());
        if (mContext != null) {
            Intent intent = new Intent(BROADCAST_ACTION);
            intent.putExtra(MESSAGE, message.toString());
            LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用messageSent");
        //        session.close(true);//加上这句话实现短连接的效果,向客户端成功发送数据后断开连接
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionClosed");

    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionCreated");

    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) throws Exception {
        Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionOpened");
    }
}

客户端addListener监听 IoServiceListener

/**
 * Created by huanghongfa on 2017/7/28.
 * 监听服务器断线原因
 */

public class HeartBeatListener implements IoServiceListener {

    public NioSocketConnector connector;

    public HeartBeatListener(NioSocketConnector connector) {
        this.connector = connector;
    }

    @Override
    public void serviceActivated(IoService arg0) throws Exception {
    }

    @Override
    public void serviceDeactivated(IoService arg0) throws Exception {
    }

    @Override
    public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {
    }

    @Override
    public void sessionClosed(IoSession arg0) throws Exception {
        Log.d("", "hahahaha");
    }

    @Override
    public void sessionCreated(IoSession arg0) throws Exception {
    }

    @Override
    public void sessionDestroyed(IoSession arg0) {
        ClientConnectManager.getInstance().rePeatConnect();
    }

    /*
     * 断线重连操作
     * @param content
     */
//    public void repeatConnect(String content) {
//        // 执行到这里表示Session会话关闭了,需要进行重连,我们设置每隔3s重连一次,如果尝试重连5次都没成功的话,就认为服务器端出现问题,不再进行重连操作
//        int count = 0;// 记录尝试重连的次数
//        boolean isRepeat = false;
//        while (!isRepeat && count <= 10) {
//            try {
//                count++;// 重连次数加1
//                ConnectFuture future = connector.connect(new InetSocketAddress(
//                        ConnectUtils.HOST, ConnectUtils.PORT));
//                future.awaitUninterruptibly();// 一直阻塞住等待连接成功
//                IoSession session = future.getSession();// 获取Session对象
//                if (session.isConnected()) {
//                    isRepeat = true;
//                    // 表示重连成功
//                    System.out.println(content + ConnectUtils.stringNowTime() + " : 断线重连" + count
//                            + "次之后成功.....");
//                    SessionManager.getInstance().setSeesion(session);
//                    SessionManager.getInstance().writeToServer("重新连接的");
//                    break;
//                }
//            } catch (Exception e) {
//                if (count == ConnectUtils.REPEAT_TIME) {
//                    System.out.println(content + ConnectUtils.stringNowTime() + " : 断线重连"
//                            + ConnectUtils.REPEAT_TIME + "次之后仍然未成功,结束重连.....");
//                    break;
//                } else {
//                    System.out.println(content + ConnectUtils.stringNowTime() + " : 本次断线重连失败,3s后进行第" + (count + 1) + "次重连.....");
//                    try {
//                        Thread.sleep(3000);
//                        System.out.println(content + ConnectUtils.stringNowTime() + " : 开始第" + (count + 1) + "次重连.....");
//                    } catch (InterruptedException e1) {
//                        e1.printStackTrace();
//                    }
//                }
//            }
//        }
//    }
}

长连接心跳机制监听

/**
 * Created by huanghongfa on 2017/7/28.
 */

public class HeartBeatMessageFactory  implements KeepAliveMessageFactory {

    @Override
    public boolean isRequest(IoSession ioSession, Object o) {
        //如果是客户端主动向服务器发起的心跳包, return true, 该框架会发送 getRequest() 方法返回的心跳包内容.
        return false;
    }

    @Override
    public boolean isResponse(IoSession ioSession, Object o) {
        //如果是服务器发送过来的心跳包, return true后会在 getResponse() 方法中处理心跳包.
        return false;
    }

    @Override
    public Object getRequest(IoSession ioSession) {
        //自定义向服务器发送的心跳包内容.
        return null;
    }

    @Override
    public Object getResponse(IoSession ioSession, Object o) {
        //自定义解析服务器发送过来的心跳包.
        return null;
    }
}

3.2 服务端核心代码:

   /**
     * 启动服务
     */
    private void startService() {
        IoAcceptor acceptor;
        try {
            // 创建一个非阻塞的server端的Socket
            acceptor = new NioSocketAcceptor();
            // 设置过滤器(使用mina提供的文本换行符编解码器)
            DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
//            acceptor.getFilterChain().addLast("decoder",
//                    new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
//                            LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
            // 为接收器设置管理服务
            acceptor.setHandler(new ServiceHandler());
            acceptor.getFilterChain().addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
            // 自定义的编解码器
            acceptor.getFilterChain().addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
            // 设置读取数据的换从区大小
            acceptor.getSessionConfig().setReadBufferSize(2048);
            // 读写通道10秒内无操作进入空闲状态
            acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
            // 绑定端口
            acceptor.bind(new InetSocketAddress(PORT));
            Log.d(TAG, "服务器启动成功... 端口号未:" + PORT);
            mIStartConnectService.startConnect();
        } catch (Exception e) {
            Log.d(TAG, "服务器启动异常..." + e);
        }
    }

服务器的IoHandlerAdapter监听

/**
 * Created by huanghongfa on 2017/7/28.
 */

public class ServiceHandler extends IoHandlerAdapter {

    private final String TAG = "ServiceHandler";

    // 从端口接受消息,会响应此方法来对消息进行处理
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        super.messageReceived(session, message);
        Log.d(TAG, "服务器接受消息成功...");
        String msg = message.toString();
        if ("exit".equals(msg)) {
            // 如果客户端发来exit,则关闭该连接
            session.close(true);
        }
        // 向客户端发送消息
        Date date = new Date();
        session.write(date);
        Log.d(TAG, "服务器接受消息成功..." + msg);
    }

    // 向客服端发送消息后会调用此方法
    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
//        session.close(true);//加上这句话实现短连接的效果,向客户端成功发送数据后断开连接
        Log.d(TAG, "服务器发送消息成功...");
    }

    // 关闭与客户端的连接时会调用此方法
    @Override
    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        Log.d(TAG, "服务器与客户端断开连接...");
    }

    // 服务器与客户端创建连接
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        Log.d(TAG, "服务器与客户端创建连接...");
    }

    // 服务器与客户端连接打开
    @Override
    public void sessionOpened(IoSession session) throws Exception {
        Log.d(TAG, "服务器与客户端连接打开...");
        super.sessionOpened(session);
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        super.sessionIdle(session, status);
        Log.d(TAG, "服务器进入空闲状态...");
        SessionManager.getInstance().writeToServer("你看见了吗");
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        super.exceptionCaught(session, cause);
        Log.d(TAG, "服务器发送异常...");
    }
}

好了以上都是些核心代码,为了避免直接复制代码后,有些类没有找到,或者是还不知道要怎么运用的,这里直接给出demo,该demo直接下来下来后就可以运行了,需要修改的可能就是AS的环境配置了。
demo下载地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,522评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,601评论 18 139
  • 2e45075fd084阅读 238评论 0 1
  • 2017-9-14,周四 1/意想不到,第一辆小黄车,座椅不能完全下调。 2/意想不到,换了一辆。 3/意想不到,...
    余俊娟阅读 209评论 0 0
  • 新荷月下立婷婷,丛簇蛙声倦鸟鸣。 风过无言摇树影,留得清静数天星。 (新韵 十一庚) 师父(静坐的孙行者)评: 这...
    瑾檀yuying阅读 476评论 15 24