mina整合spring ,服务端反向发送消息到客户端 完整实例

之前的项目需要用到mina,实现的功能主要是:服务端主动发送消息到客户端,这个的服务端为外网的tomcat,客户端为内网的tomcat,由于无法知道内网tomcat 的地址,也就不能直接通过http的方式发送信息回来,最后想来想去用mina实现了这个功能。

当然,我这里的服务端是整合的了spring 的,也可以直接把服务端独立出来,不整合spring,这个都一样,区别不大。

代码和配置如下:

---------------------------

1,jar包,我这里使用的是spring4.0.5,mina2.0.7

maven部分文件如下,这个包会自动也依赖进来mina-filter-ssl-1.1.7.jar

[java] view plain copy


[java] view plain copy

                  

            org.springframework  

            spring-context-support  

            ${springframework-version}  

          

          

            org.springframework  

            spring-jdbc  

            ${springframework-version}  

          

          

            org.springframework  

            spring-orm  

            ${springframework-version}  

          

          

            org.springframework  

            spring-aop  

            ${springframework-version}  

          

          

          

            org.springframework  

            spring-web  

            ${springframework-version}  

              

                  

                    commons-logging  

                    commons-logging  




          

            org.springframework  

            spring-webmvc  

            ${springframework-version}  




          

            org.aspectj  

            aspectjrt  

            ${aspectj-version}  


          

            org.aspectj  

            aspectjweaver  

            ${aspectj-version}  







          

            org.apache.mina  

            mina-core  

2.0.7  


          

            org.apache.mina  

            mina-integration-spring  

1.1.7  


          

            org.apache.mina  

            mina-integration-beans  

2.0.8  


2,spring-ztc_app-mina.xml (spring与mina 的配置文件)

spring与mina 的配置文件,需要导入到spring 的总配置文件中,或者加入到web.xml的spring监听扫描中

[java] view plain copy


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

xsi:schemaLocation="http://www.springframework.org/schema/beans   

http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">  







  

  

              


  

  


                -->  


value="org.apache.mina.integration.beans.InetSocketAddressEditor">  







  



init-method="bind" destroy-method="unbind">  

  

  

  







  

          


                /> -->  


  





  



class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">  

  

              

  

  









3,mina服务端业务处理类

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.service.IoHandlerAdapter;  

import org.apache.mina.core.session.IdleStatus;  

import org.apache.mina.core.session.IoSession;  



/**

 * @Description: mina服务端业务处理类

 * @author whl

 * @date 2014-9-30 下午12:36:28

 *

 */  

public class ServerHandler extends IoHandlerAdapter {  



private final static Log log = LogFactory.getLog(ServerHandler.class);  



public ServerHandler() {  

// TODO Auto-generated constructor stub  

     }  

@Override  

public void exceptionCaught(IoSession session, Throwable cause)  

throws Exception {  

     }  



@Override  

public void messageReceived(IoSession session, Object message)  

throws Exception {  

log.debug("服务端收到信息-------------");  


//获取客户端发过来的key  

          String key = message.toString();  

System.out.println("message :"+message.toString());  

String carPark_id = key.substring(key.indexOf("=") + 1);  

System.out.println("carPark_id :"+carPark_id);  



//保存客户端的会话session  

          SessionMap sessionMap = SessionMap.newInstance();  

          sessionMap.addSession(carPark_id, session);  


     }  

@Override  

public void messageSent(IoSession session, Object message) throws Exception {  

log.debug("------------服务端发消息到客户端---");  

     }  

@Override  

public void sessionClosed(IoSession session) throws Exception {  

// TODO Auto-generated method stub  

log.debug("远程session关闭了一个..." + session.getRemoteAddress().toString());  

     }  

@Override  

public void sessionCreated(IoSession session) throws Exception {  

log.debug(session.getRemoteAddress().toString() +"----------------------create");  

     }  

@Override  

public void sessionIdle(IoSession session, IdleStatus status)  

throws Exception {  

log.debug(session.getServiceAddress() +"IDS");  

     }  

@Override  

public void sessionOpened(IoSession session) throws Exception {  

log.debug("连接打开:"+session.getLocalAddress());  

     }  



    }  

4,服务端缓存客户端socket连接的单例类

[html] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import java.util.HashMap;  

import java.util.Map;  


import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.session.IoSession;  



/**  

 * @Description: 单例工具类,保存所有mina客户端连接  

 * @author whl  

 * @date 2014-9-29 上午10:09:15  

 *  

 */  

public class SessionMap {  


private final static Loglog = LogFactory.getLog(SessionMap.class);  


private static SessionMapsessionMap = null;  


private Mapmap = new HashMap();  



    //构造私有化 单例  

    private SessionMap(){}  



    /**  

     * @Description: 获取唯一实例  

     * @author whl  

     * @date 2014-9-29 下午1:29:33  

     */  

    public static SessionMap newInstance(){  

        log.debug("SessionMap单例获取---");  

if(sessionMap == null){  

sessionMap = new SessionMap();  

        }  

        return sessionMap;  

    }  



    /**  

     * @Description: 保存session会话  

     * @author whl  

     * @date 2014-9-29 下午1:31:05  

     */  

    public void addSession(String key, IoSession session){  

log.debug("保存会话到SessionMap单例---key=" + key);  

        this.map.put(key, session);  

    }  


    /**  

     * @Description: 根据key查找缓存的session  

     * @author whl  

     * @date 2014-9-29 下午1:31:55  

     */  

    public IoSession getSession(String key){  

log.debug("获取会话从SessionMap单例---key=" + key);  

        return this.map.get(key);  

    }  


    /**  

     * @Description: 发送消息到客户端  

     * @author whl  

     * @date 2014-9-29 下午1:57:51  

     */  

    public void sendMessage(String[] keys, Object message){  

        for(String key : keys){  

IoSessionsession = getSession(key);  


log.debug("反向发送消息到客户端Session---key=" + key + "----------消息=" + message);  

if(session == null){  

                return;  

            }  

            session.write(message);  


        }  

    }  





}  

5,编码解码器,

HCoderFactory.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import java.nio.charset.Charset;  


import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.ProtocolCodecFactory;  

import org.apache.mina.filter.codec.ProtocolDecoder;  

import org.apache.mina.filter.codec.ProtocolEncoder;  


/**

 * @Description: 编码和解码器工厂类.

 * @author whl

 * @date 2014-9-30 下午12:34:59

 *

 */  

public class HCoderFactory implements ProtocolCodecFactory {  

private final HEncoder encoder;  

private final HDecoder decoder;  


public HCoderFactory() {  

//this(Charset.defaultCharset());  

this(Charset.forName("UTF-8"));  


    }  


public HCoderFactory(Charset charSet) {  

this.encoder = new HEncoder(charSet);  

this.decoder = new HDecoder(charSet);  

    }  


@Override  

public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {  

return decoder;  

    }  


@Override  

public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {  

return encoder;  

    }  

}  

HDecoder.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import java.nio.charset.Charset;  

import java.nio.charset.CharsetDecoder;  


import org.apache.mina.core.buffer.IoBuffer;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.CumulativeProtocolDecoder;  

import org.apache.mina.filter.codec.ProtocolDecoderOutput;  


/**

 * @Description: 解码工具类

 * @author whl

 * @date 2014-9-30 下午12:35:22

 *

 */  

public class HDecoder extends CumulativeProtocolDecoder {  

private final Charset charset;  


public HDecoder(Charset charset) {  

this.charset = charset;  

    }  


public boolean doDecode(IoSession session, IoBuffer in,  

ProtocolDecoderOutput out)throws Exception {  


//System.out.println("-------doDecode----------");  


        CharsetDecoder cd = charset.newDecoder();  

        String mes = in.getString(cd);  

        out.write(mes);  

return true;  



/*

        if (in.remaining() > 4) {// 有数据时,读取字节判断消息长度

            in.mark();// 标记当前位置,以便reset

            int size = in.getInt();

            // 如果消息内容不够,则重置,相当于不读取size

            if (size > in.remaining()) {

                in.reset();

                return false;// 接收新数据,以拼凑成完整数据

            } else if (size != 0 && (size - 4 >= 0)) {

                byte[] bytes = new byte[size - 4];

                //int protocol = in.getInt();

                // 拿到客户端发过来的数据组装成基础包写出去

                in.get(bytes, 0, size - 4);

                //in.get(bytes, size - 4, size);


                PackageBeanFactory beanFactory = (PackageBeanFactory) session

                        .getAttribute(ServerHandler.BEAN_FACTORY);

                //out.write(beanFactory.getPackage(protocol, size, bytes));


                String mes = in.getString(cd);


                out.write(mes);

                // 如果读取内容后还粘了包,就让父类再给读取进行下次解析

                if (in.remaining() > 0) {

                    return true;

                }

            }

        }


        return false;// 处理成功,让父类进行接收下个包

*/    



    }  

}  

HEncoder.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import java.nio.charset.Charset;  

import java.nio.charset.CharsetDecoder;  

import java.nio.charset.CharsetEncoder;  


import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.buffer.IoBuffer;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.ProtocolEncoder;  

import org.apache.mina.filter.codec.ProtocolEncoderOutput;  



/**

 * @Description: 编码工具类

 * @author whl

 * @date 2014-9-30 下午12:35:35

 *

 */  

public class HEncoder implements ProtocolEncoder {  


private final static Log log = LogFactory.getLog(HEncoder.class);  


private final Charset charset;  


public HEncoder(Charset charset) {  

this.charset = charset;  

    }  


@Override  

public void encode(IoSession session, Object message,  

ProtocolEncoderOutput out)throws Exception {  


        CharsetEncoder ce = charset.newEncoder();  

        String mes = (String) message;  

IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);  

        buffer.putString(mes,ce);  

        buffer.flip();  

        out.write(buffer);  


/*System.out.println("---------encode-------------");

        String mes = (String) message;

        byte[] data = mes.getBytes("UTF-8");

        IoBuffer buffer = IoBuffer.allocate(data.length + 4);

        buffer.putInt(data.length);

        buffer.put(data);

        buffer.flip();

        out.write(buffer);

        out.flush();*/  

    }  


@Override  

public void dispose(IoSession session) throws Exception {  

log.info("Dispose called,session is " + session);  

    }  

}  

6,客户端程序

ClentMain.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import java.net.InetSocketAddress;  

import java.text.SimpleDateFormat;  

import java.util.Date;  


import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.filterchain.IoFilterAdapter;  

import org.apache.mina.core.future.ConnectFuture;  

import org.apache.mina.core.session.IoSession;  

import org.apache.mina.filter.codec.ProtocolCodecFilter;  

import org.apache.mina.filter.logging.LoggingFilter;  

import org.apache.mina.transport.socket.nio.NioSocketConnector;  


/**

 * @Description: mina客户端,包含断线重连机制,空闲重连机制

 * @author whl

 * @date 2014-11-2

 */  

public class ClentMain extends Thread{  


private final static Log log = LogFactory.getLog(ClentMain.class);  


@Override  

public void run() {  


//ip  

String host ="192.168.0.38";  

//端口  

int port = 6007;  

//停车场id  

final String carPark_id = "1";  



// 创建客户端连接器.  

final NioSocketConnector connector = new NioSocketConnector();  

//设置连接超时    

connector.setConnectTimeoutMillis(30000);   

// 设置默认访问地址    

connector.setDefaultRemoteAddress(new InetSocketAddress(host, port));  

//将IoSession的主键属性注入线程映射表MDC中  

//connector.getFilterChain().addLast("mdc", new MdcInjectionFilter());    

//日志过滤器  

connector.getFilterChain().addLast("logger", new LoggingFilter());  

// 设置编码过滤器  

connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new HCoderFactory()));          

//添加处理器    

connector.setHandler(new ClintHandler());  

// 设置接收缓冲区的大小    

connector.getSessionConfig().setReceiveBufferSize(10240);  

// 设置输出缓冲区的大小    

connector.getSessionConfig().setSendBufferSize(10240);  


/**

         * 空闲重连的机制,根据需要选择相应的配置

         */  

// 读写都空闲时间:30秒    

//connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);   

// 读(接收通道)空闲时间:40秒   

//connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 40);  

// 写(发送通道)空闲时间:50秒   

//connector.getSessionConfig().setIdleTime(IdleStatus.WRITER_IDLE, 50);   


//断线重连回调拦截器    

connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {    

@Override    

public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {    

for(;;){    

try{    

Thread.sleep(3000);   

                        ConnectFuture future = connector.connect();    

future.awaitUninterruptibly();// 等待连接创建成功    

IoSession session = future.getSession();// 获取会话    

session.write("key="+carPark_id);  


if(session.isConnected()){    

log.info("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");    

//System.out.println("断线重连["+ connector.getDefaultRemoteAddress().getHostName() +":"+ connector.getDefaultRemoteAddress().getPort()+"]成功");  

break;    

                        }    

}catch(Exception ex){    

log.info("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());    

//System.out.println("重连服务器登录失败,3秒再连接一次:" + ex.getMessage());    

                    }    

                }    

            }    

        });  


//开始连接  

for (;;) {    

try {    

                ConnectFuture future = connector.connect();    

// 等待连接创建成功    

                future.awaitUninterruptibly();    

// 获取会话    

                IoSession session = future.getSession();    

//发送消息  

session.write("key=" + carPark_id);  

log.error("连接服务端" + host + ":" + port + "[成功]" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));    

break;    

}catch (Exception e) {    

//System.out.println("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage());    

log.error("连接服务端" + host + ":" + port + "失败" + ",,时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ", 连接MSG异常,请检查MSG端口、IP是否正确,MSG服务是否启动,异常内容:" + e.getMessage(), e);    

// 连接失败后,重连10次,间隔30s    

try {  

Thread.sleep(5000);  

}catch (InterruptedException e1) {  

                    e1.printStackTrace();  

log.error("连接服务端失败后,睡眠5秒发生异常!");  

                }  

            }    

        }  




// cf.getSession().write("quit");//发送消息  

//cf.getSession().close();  

//cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开  

//connector.dispose();  


    }  


}  

ClintHandler.java

[java] view plain copy

package cn.hydom.ztc.ztc_app.controller.mina;  


import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

import org.apache.mina.core.service.IoHandlerAdapter;  

import org.apache.mina.core.session.IdleStatus;  

import org.apache.mina.core.session.IoSession;  



/**

 * @Description: 客户端业务处理类

 * @author whl

 * @date 2014-11-2

 */  

public class ClintHandler extends IoHandlerAdapter {  


private final static Log log = LogFactory.getLog(ClintHandler.class);  


/**

     * 写处理服务端推送的信息的逻辑

     */  

@Override  

public void messageReceived(IoSession session, Object message)  

throws Exception {  






System.out.println("-----服务顿返回的json数据----");  

        String s = message.toString();  

System.out.println("message :" + s);  

System.out.println("message length:" + s.length());  





    }  


@Override    

public void sessionIdle(IoSession session, IdleStatus status) throws Exception {    

log.info("-客户端与服务端连接[空闲] - " + status.toString());    

System.out.println("-客户端与服务端连接[空闲] - " + status.toString());  


if(session != null){    

session.close(true);    

        }    

    }    





}  

7,测试

这里就不写实际的代码了,

1,首先部署web工程启动tomcat,让服务端先运行起来,

2,然后运行客户端

[java] view plain copy

public class ClintTest1 {  



public static void main(String[] args) throws Exception {  


ClentMain mina =new ClentMain();  

        mina.start();  


    }  

}  

3,想要服务端主动发回来信息,还得在服务端的web工程的action中写一个htpp访问的方法,方法中去sessionMap类中缓存的map中查找session,然后发送消息。

代码如下,我用的是springmvc

[java] view plain copy

/**

     * @Description: 发送消息到客户端

     * @author whl

     * @date 2014-9-29 下午1:18:54

     */  

@ResponseBody  

@RequestMapping(value="/sendMessage")  

public String sendMessage(HttpServletRequest request, String[] carPark_id){  

try{  


//  

//获取链接的参数carPark_id  

log.debug("carPark_id[].length--------- " + carPark_id.length);  

for(String id : carPark_id){  

log.debug("carPark_id --------- " + id);  

            }  

//这里用的假数据                                                carPark_id = new String[]{"1"};  

//发送的信息String jsonstr = "123";//反向发送信息log.debug("开始反向发送消息到客户端-------- ");SessionMap sessionMap = SessionMap.newInstance();sessionMap.sendMessage(carPark_id, jsonstr);//返回信息return "发送的信息为" + jsonstr;

}catch (Exception e) {log.error(e);return "出错了。。";}}

4,好了,现在重新发布工程,启动服务端和客户端,然后访问这个链接就可以了,当然springmvc需要自己配置,这也不是这里的重点。

注意:客户端发过来的carPark_id必须与服务端查找的一致,不让就找不到相应的客户端session连接,消息无法发送成功。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容