Netty多客户端通信机制

上篇文章讲解了客户端与服务端通信示例,本篇来讲解下多客户端之间是如何通信的,我们以一个聊天室的程序为例。
具体需求:
客户端1、2、3(通过remoteAddress来标识),当客户端1上线后,发送一条消息给服务端,当客户端2上线后,通知客户端1:“客户端2已经上线”,当客户端3上线后,通知客户端1和客户端2:“客户端3已经上线”。

按照Netty服务构建步骤进行,可以参见Netty构建服务的基本步骤文章来了解构建过程以及具体说明。

  1. 首先构建聊天室服务端入口,多个客户端通信都是经由服务端作为传输和通信载体。
    示例代码:
public class MyCatServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup =  new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChatServerInitializer());

            ChannelFuture channelFuture = bootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. 创建MyChatClientInitializer,添加编解码处理器ChannelPipeline.
    示例代码:
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        // 添加基于\r \n界定符的解码器
        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 添加字符串解码器
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 添加字符串编码器
        channelPipeline.addLast(new MyChatServerHandler()); // 自定义处理器
    }
}
  1. 创建自定义处理器MyChatServerHandler, 这里涉及到一个重要的组件ChannelGroup,它是线程安全的,ChannelGroup存储了已连接的Channel,Channel关闭会自动从ChannelGroup中移除,无需担心Channel生命周期。同时,可以对这些Channel做各种批量操作,可以以广播的形式发送一条消息给所有的Channels,调用它的writeAndFlush方法来实现。
    ChannelGroup可以进一步理解为设计模式中的发布-订阅模型,其底层是通过ConcurrentHashMap进行存储所有Channel的。
    示例代码:
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 这里要区分下是否是自己发的消息
        Channel channel = ctx.channel();
        // 这里使用了Java8的lambda表达式
        channelGroup.forEach(ch -> {
            if (ch == channel) { // 两个channel对象地址相同
                System.out.println("服务器端转发聊天消息:【自己】发送的消息, 内容:" + msg + "\n");
                ch.writeAndFlush("【自己】发送的消息, 内容:" + msg + "\n");
            } else {
                System.out.println("服务器端转发聊天消息:"+ ch.remoteAddress() + "发送的消息,内容:" + msg + "\n");
                ch.writeAndFlush(ch.remoteAddress() + "发送的消息,内容:" + msg + "\n");
            }
        });
    }

    // -----------以下覆写的方法是ChannelInboundHandlerAdapter中的方法---------------
    @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 加入了\n");

        // 先写入到客户端,最后再将自己添加到ChannelGroup中
        channelGroup.add(channel);
    }

    @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " 离开了\n");

        // 这里channelGroup会自动进行调用,所以这行代码不写也是可以的。
        channelGroup.remove(channel);
    }

    /**
     * 只要有客户端连接就会执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上线了\n");
    }

    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 下线了\n");
    }

    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}
  1. 构建客户端入口,连接服务端8899端口,示例中通过控制台输入形式给服务端发送消息。
    示例代码:
public class MyCatClient {

    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitalizer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            // channelFuture.channel().closeFuture().sync();

            // 从控制台不断的读取输入
            boolean running = true;
            try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
                while (running) {
                    channelFuture.channel().writeAndFlush(br.readLine() + "\r\n");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}
  1. 创建客户端MyChatClientInitializer,跟服务端基本类似的处理器。
    示例代码:
public class MyChatClientInitalizer extends ChannelInitializer<SocketChannel> {

    @Override protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new MyChatClientHandler());
    }
}
  1. 创建自定义处理器MyChatClientHandler, 很简单,只是输出一条消息。
    示例代码:
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

以上几个步骤就完成了聊天室程序的编码工作,下面运行服务端和客户端程序,运行结果如下:

MyChatClient(1) MyChatClient(2) MyChatClient(3) MyChatServer
Run None None /127.0.0.1:51049 上线了
[服务器] - /127.0.0.1:51055 加入了 Run None /127.0.0.1:51055 上线了
[服务器] - /127.0.0.1:51114 加入了 [服务器] - /127.0.0.1:51114 加入了 Run /127.0.0.1:51114 上线了

None:表示未运行 Run:表示运行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • netty常用API学习 netty简介 Netty是基于Java NIO的网络应用框架. Netty是一个NIO...
    花丶小伟阅读 5,988评论 0 20
  • RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到...
    谜碌小孩阅读 3,082评论 0 13
  • Netty心跳基本检测机制 首先,了解下为什么需要心跳?假设客户端(如手机,PAD)与服务器端已经建立了长连接,客...
    东升的思考阅读 1,460评论 0 2
  • 早上没敢请假去参加今天一天的落地会,昨晚睡觉突然想起月底啦,考勤还没做,计划今天把考勤做好发给上海。十点多看群里已...
    王莎莎2017阅读 604评论 0 1