Mina框架简介:
- Mina是什么东西?
Apache MINA 是一个网络应用框架,有助于用户非常方便地开发高性能、高伸缩性的网络应用。它通过Java NIO提供了一个抽象的、事件驱动的、异步的位于各种传输协议(如TCP/IP和UDP/IP)之上的API,Apache MINA 通常可被称之为:
NIO 框架库;
客户端/服务器框架库;
或者一个网络socket库。 - MINA框架的特点有
基于java NIO类库开发;采用非阻塞方式的异步传输;事件驱动;支持批量数据传输;支持TCP、UDP协议;控制反转的设计模式(支持Spring);采用优雅的松耦合架构;可灵活的加载过滤器机制;单元测试更容易实现;可自定义线程的数量,以提高运行于多处理器上的性能;采用回调的方式完成调用,线程的使用更容易。 - Mina的框架
当远程客户首次访问采用MINA编写的程序时,IoAcceptor作为线程运行,负责接受来自客户的请求。当有客户请求连接时,创建一个IoSession,该IoSession与IoProcessor、SocketChannel以及IOService联系起来。IoProcessor也作为另外一个线程运行,定时检查客户是否有数据到来,并对客户请求进行处理,依次调用在IOService注册的各个IoFilter,最后调用IoHandler进行最终的逻辑处理,再将处理后的结果Filter后返回给客户端。 - Mina的现有应用
MINA框架的应用比较广泛,应用的开源项目有Apache Directory、AsyncWeb、ApacheQpid、QuickFIX/J、Openfire、SubEthaSTMP、red5等。MINA框架当前稳定版本是1.1.6,最新的2.0版本目前已经发布了M1版本。
实现 Mina 三部曲
先来看下效果图
流程大概是这样的:
首先先启动一个本地的服务--启动成功后--通知前端可以开始连接了--前端连接成功后 -- 就准备发送消息--服务器接收到消息--回调给前端
下载jar包
官网下载地址 http://mina.apache.org/mina-project/userguide/user-guide-toc.html
这里需要下载 mina-core-2.0.16.jar和 slf4j-android-1.6.1-RC1.jar先把下载的jar包复制粘贴至项目的libs目录下,然后在app下的build.gradle文件里面导入下载的jar包,由于我的项目里面用到了rxJava,所以就顺带导入rxJava的包。如果不懂rxJava语法,那么请看链接http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/0430/2815.html
compile files('libs/mina-core-2.0.16.jar')
compile files('libs/slf4j-android-1.6.1-RC1.jar')
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
- 以上工作准备好之后,下面就开始编码之路,文末会给出demo下载地址
3.1 客户端的核心代码:
public void connect(final Context context) {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
NioSocketConnector mSocketConnector = new NioSocketConnector();
//设置协议封装解析处理
mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
//设置心跳包
KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory());
//每 5 分钟发送一个心跳包
heartFilter.setRequestInterval(5 * 60);
//心跳包超时时间 10s
heartFilter.setRequestTimeout(10);
// 获取过滤器链
DefaultIoFilterChainBuilder filterChain = mSocketConnector.getFilterChain();
filterChain.addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
// 添加编码过滤器 处理乱码、编码问题
filterChain.addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);
//设置 handler 处理业务逻辑
mSocketConnector.setHandler(new HeartBeatHandler(context));
mSocketConnector.addListener(new HeartBeatListener(mSocketConnector));
//配置服务器地址
InetSocketAddress mSocketAddress = new InetSocketAddress(ConnectUtils.HOST, ConnectUtils.PORT);
//发起连接
ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
mFuture.awaitUninterruptibly();
IoSession mSession = mFuture.getSession();
Log.d("", "======连接成功" + mSession.toString());
e.onNext(mSession);
e.onComplete();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
IoSession mSession = (IoSession) o;
Log.d("MainActivity", "======连接成功了吗====" + mSession.isConnected());
SessionManager.getInstance().setSeesion(mSession);
SessionManager.getInstance().writeToServer("你看见了吗\n");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
客户端设置setHandler监听 IoHandlerAdapter,这个可以自己写个类重载一下
/**
* Created by huanghongfa on 2017/7/28.
*/
public class HeartBeatHandler extends IoHandlerAdapter {
private final String TAG = "HeartBeatHandler";
public static final String BROADCAST_ACTION = "com.commonlibrary.mina.broadcast";
public static final String MESSAGE = "message";
private Context mContext;
public HeartBeatHandler(Context context) {
this.mContext = context;
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用exceptionCaught");
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
Log.e(TAG, "接收到服务器端消息:" + message.toString());
if (mContext != null) {
Intent intent = new Intent(BROADCAST_ACTION);
intent.putExtra(MESSAGE, message.toString());
LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
}
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用messageSent");
// session.close(true);//加上这句话实现短连接的效果,向客户端成功发送数据后断开连接
}
@Override
public void sessionClosed(IoSession session) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionClosed");
}
@Override
public void sessionCreated(IoSession session) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionCreated");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionIdle");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
Log.d(TAG, ConnectUtils.stringNowTime() + " : 客户端调用sessionOpened");
}
}
客户端addListener监听 IoServiceListener
/**
* Created by huanghongfa on 2017/7/28.
* 监听服务器断线原因
*/
public class HeartBeatListener implements IoServiceListener {
public NioSocketConnector connector;
public HeartBeatListener(NioSocketConnector connector) {
this.connector = connector;
}
@Override
public void serviceActivated(IoService arg0) throws Exception {
}
@Override
public void serviceDeactivated(IoService arg0) throws Exception {
}
@Override
public void serviceIdle(IoService arg0, IdleStatus arg1) throws Exception {
}
@Override
public void sessionClosed(IoSession arg0) throws Exception {
Log.d("", "hahahaha");
}
@Override
public void sessionCreated(IoSession arg0) throws Exception {
}
@Override
public void sessionDestroyed(IoSession arg0) {
ClientConnectManager.getInstance().rePeatConnect();
}
/*
* 断线重连操作
* @param content
*/
// public void repeatConnect(String content) {
// // 执行到这里表示Session会话关闭了,需要进行重连,我们设置每隔3s重连一次,如果尝试重连5次都没成功的话,就认为服务器端出现问题,不再进行重连操作
// int count = 0;// 记录尝试重连的次数
// boolean isRepeat = false;
// while (!isRepeat && count <= 10) {
// try {
// count++;// 重连次数加1
// ConnectFuture future = connector.connect(new InetSocketAddress(
// ConnectUtils.HOST, ConnectUtils.PORT));
// future.awaitUninterruptibly();// 一直阻塞住等待连接成功
// IoSession session = future.getSession();// 获取Session对象
// if (session.isConnected()) {
// isRepeat = true;
// // 表示重连成功
// System.out.println(content + ConnectUtils.stringNowTime() + " : 断线重连" + count
// + "次之后成功.....");
// SessionManager.getInstance().setSeesion(session);
// SessionManager.getInstance().writeToServer("重新连接的");
// break;
// }
// } catch (Exception e) {
// if (count == ConnectUtils.REPEAT_TIME) {
// System.out.println(content + ConnectUtils.stringNowTime() + " : 断线重连"
// + ConnectUtils.REPEAT_TIME + "次之后仍然未成功,结束重连.....");
// break;
// } else {
// System.out.println(content + ConnectUtils.stringNowTime() + " : 本次断线重连失败,3s后进行第" + (count + 1) + "次重连.....");
// try {
// Thread.sleep(3000);
// System.out.println(content + ConnectUtils.stringNowTime() + " : 开始第" + (count + 1) + "次重连.....");
// } catch (InterruptedException e1) {
// e1.printStackTrace();
// }
// }
// }
// }
// }
}
长连接心跳机制监听
/**
* Created by huanghongfa on 2017/7/28.
*/
public class HeartBeatMessageFactory implements KeepAliveMessageFactory {
@Override
public boolean isRequest(IoSession ioSession, Object o) {
//如果是客户端主动向服务器发起的心跳包, return true, 该框架会发送 getRequest() 方法返回的心跳包内容.
return false;
}
@Override
public boolean isResponse(IoSession ioSession, Object o) {
//如果是服务器发送过来的心跳包, return true后会在 getResponse() 方法中处理心跳包.
return false;
}
@Override
public Object getRequest(IoSession ioSession) {
//自定义向服务器发送的心跳包内容.
return null;
}
@Override
public Object getResponse(IoSession ioSession, Object o) {
//自定义解析服务器发送过来的心跳包.
return null;
}
}
3.2 服务端核心代码:
/**
* 启动服务
*/
private void startService() {
IoAcceptor acceptor;
try {
// 创建一个非阻塞的server端的Socket
acceptor = new NioSocketAcceptor();
// 设置过滤器(使用mina提供的文本换行符编解码器)
DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain();
// acceptor.getFilterChain().addLast("decoder",
// new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
// LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
// 为接收器设置管理服务
acceptor.setHandler(new ServiceHandler());
acceptor.getFilterChain().addLast("encoder", new ProtocolCodecFilter(new FrameCodecFactory()));
// 自定义的编解码器
acceptor.getFilterChain().addLast("decoder", new ProtocolCodecFilter(new FrameCodecFactory()));
// 设置读取数据的换从区大小
acceptor.getSessionConfig().setReadBufferSize(2048);
// 读写通道10秒内无操作进入空闲状态
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
// 绑定端口
acceptor.bind(new InetSocketAddress(PORT));
Log.d(TAG, "服务器启动成功... 端口号未:" + PORT);
mIStartConnectService.startConnect();
} catch (Exception e) {
Log.d(TAG, "服务器启动异常..." + e);
}
}
服务器的IoHandlerAdapter监听
/**
* Created by huanghongfa on 2017/7/28.
*/
public class ServiceHandler extends IoHandlerAdapter {
private final String TAG = "ServiceHandler";
// 从端口接受消息,会响应此方法来对消息进行处理
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
Log.d(TAG, "服务器接受消息成功...");
String msg = message.toString();
if ("exit".equals(msg)) {
// 如果客户端发来exit,则关闭该连接
session.close(true);
}
// 向客户端发送消息
Date date = new Date();
session.write(date);
Log.d(TAG, "服务器接受消息成功..." + msg);
}
// 向客服端发送消息后会调用此方法
@Override
public void messageSent(IoSession session, Object message) throws Exception {
super.messageSent(session, message);
// session.close(true);//加上这句话实现短连接的效果,向客户端成功发送数据后断开连接
Log.d(TAG, "服务器发送消息成功...");
}
// 关闭与客户端的连接时会调用此方法
@Override
public void sessionClosed(IoSession session) throws Exception {
super.sessionClosed(session);
Log.d(TAG, "服务器与客户端断开连接...");
}
// 服务器与客户端创建连接
@Override
public void sessionCreated(IoSession session) throws Exception {
super.sessionCreated(session);
Log.d(TAG, "服务器与客户端创建连接...");
}
// 服务器与客户端连接打开
@Override
public void sessionOpened(IoSession session) throws Exception {
Log.d(TAG, "服务器与客户端连接打开...");
super.sessionOpened(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
super.sessionIdle(session, status);
Log.d(TAG, "服务器进入空闲状态...");
SessionManager.getInstance().writeToServer("你看见了吗");
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
super.exceptionCaught(session, cause);
Log.d(TAG, "服务器发送异常...");
}
}
好了以上都是些核心代码,为了避免直接复制代码后,有些类没有找到,或者是还不知道要怎么运用的,这里直接给出demo,该demo直接下来下来后就可以运行了,需要修改的可能就是AS的环境配置了。
demo下载地址