搭建生产级的Netty项目

Netty是Trustin Lee在2004年开发的一款高性能的网络应用程序框架。相比于JDK自带的NIO,Netty做了相当多的增强,且隔离了jdk nio的实现细节,API也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了Netty,比如Dubbo、Elasticsearch、Zookeeper等。

Netty的具体开发

提示:因代码相对较多,这里只展示其主要部分,至于项目中用到的编解码器、工具类,请直接拉到最后下载源码!也欢迎顺手给个Star~

需要的依赖
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-jmx</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29.Final</version>
</dependency>
Client端代码
package com.example.nettydemo.client;

import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException;

public class Client {

    public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);

        //客户端连接服务器最大允许时间,默认为30s
        bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s

        NioEventLoopGroup group = new NioEventLoopGroup();
        try {

            bootstrap.group(group);

            RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
            LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);

            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast(new FrameDecoder());
                    pipeline.addLast(new FrameEncoder());

                    pipeline.addLast(new ProtocolEncoder());
                    pipeline.addLast(new ProtocolDecoder());

                    pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
                    pipeline.addLast(new OperationToRequestMessageEncoder());

//                    pipeline.addLast(loggingHandler);

                }
            });

            //连接服务
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
            //因为future是异步执行,所以需要先连接上后,再进行下一步操作
            channelFuture.sync();

            long streamId = IdUtil.nextId();
            /**
             * 发送数据测试,按照定义的规则组装数据
             */
//            OrderOperation orderOperation =  new OrderOperation(1001, "你好啊,hi");
            RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));

            //将future放入center
            OperationResultFuture operationResultFuture = new OperationResultFuture();
            requestPendingCenter.add(streamId, operationResultFuture);

            //发送消息
            for (int i = 0; i < 10; i++) {
                channelFuture.channel().writeAndFlush(requestMessage);
            }

            //阻塞等待结果,结果来了之后会调用ResponseDispatcherHandler去set结果
//            OperationResult operationResult = operationResultFuture.get();
//            //将结果打印
//            System.out.println("返回:"+operationResult);

            channelFuture.channel().closeFuture().get();

        } finally {
            group.shutdownGracefully();
        }

    }

}

Server端代码
package com.example.nettydemo.server;

import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j;

import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException;

/**
 * netty server 入口
 */
@Slf4j
public class Server {


    public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //设置channel模式,因为是server所以使用NioServerSocketChannel
        serverBootstrap.channel(NioServerSocketChannel.class);

        //最大的等待连接数量
        serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
        //设置是否启用 Nagle 算法:用将小的碎片数据连接成更大的报文 来提高发送效率。
        //如果需要发送一些较小的报文,则需要禁用该算法
        serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);

        //设置netty自带的log,并设置级别
        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

        //thread
        //用户指定线程名
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
        NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
        UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));

        //只能使用一个线程,因GlobalTrafficShapingHandler比较轻量级
        NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));

        try {
            //设置react方式
            serverBootstrap.group(bossGroup, workGroup);

            //metrics
            MetricsHandler metricsHandler = new MetricsHandler();

            //trafficShaping流量整形
            //long writeLimit 写入时控制, long readLimit 读取时控制 具体设置看业务修改
            GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);


            //log
            LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
            LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);

            //设置childHandler,按执行顺序放
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {

                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast("debugLog", debugLogHandler);
                    pipeline.addLast("tsHandler", globalTrafficShapingHandler);
                    pipeline.addLast("metricHandler", metricsHandler);
                    pipeline.addLast("idleHandler", new ServerIdleCheckHandler());

                    pipeline.addLast("frameDecoder", new FrameDecoder());
                    pipeline.addLast("frameEncoder", new FrameEncoder());
                    pipeline.addLast("protocolDecoder", new ProtocolDecoder());
                    pipeline.addLast("protocolEncoder", new ProtocolEncoder());

                    pipeline.addLast("infoLog", infoLogHandler);
                    //对flush增强,减少flush次数牺牲延迟增强吞吐量
                    pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
                    //为业务处理指定单独的线程池
                    pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
                }
            });

            //绑定端口并阻塞启动
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();

            channelFuture.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            businessGroup.shutdownGracefully();
            eventLoopGroupForTrafficShaping.shutdownGracefully();
        }

    }

}

最后

以上介绍了Netty的基本用法,在代码中也做了一部分的关键注释,但可能还会有许多不足,也不可能满足所有人的要求,大家可根据自己的实际需求去改造此项目。附上源码地址netty源码

持续学习,记录点滴。更多文章请访问 文章首发

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 1.Netty 是什么? Netty 是一个基于 JAVA NIO 类库的异步通信框架,它的架构特点是:异步非阻塞...
    kingZXY2009阅读 1,612评论 2 13
  • Netty概述: 1、netty是基于Java NIO的网络应用框架,client-server框架 2、Nett...
    Java耕耘者阅读 2,284评论 0 1
  • 初春,小镇的夜,微凉,风寂寂的,似乎那暖意渐近。 饭后,两个人有些闲,出去走走。蜇伏了整整一个冬天,像冬眠复出的拖...
    蓝色山湾阅读 148评论 0 0
  • (有朋至远方来,她瘦了,我胖了,想哭) 我的前半生结束了,可是,我不喜欢这个结局。 应该是作为可恶的小三凌琳没有得...
    小猪猪同学阅读 335评论 3 4
  • 寒门出孝子,衣薄多补丁,金榜题名时,煮酒论英雄。
    7123934a4a75阅读 31评论 0 0