Netty使用kryo序列化传输对象

参考文章:
https://blog.csdn.net/eguid_1/article/details/79316403
https://blog.csdn.net/top_code/article/details/50901623

通常我们习惯将编码(Encode)称为序列化(serialization),它将对象序列化为字节数组,用于网络传输、数据持久化或其他用途。
同样的,解码(Decoder)称为反序列化(deserialization),它把从网络、磁盘等读取到的字节数组还原成原始对象,以方便后续业务逻辑操作。

kryo是个高效的java序列化/反序列化库,目前Twitter、Apache、Storm、Hive等等都在使用该技术。
kryo序列化技术的性能足够好,比它更高效的序列化工具就只有google的protobuf了,protobuf有个缺点就是传输的每一个类结构都需要相对应的proto文件,如果类结构发生了变化,需要重新生成proto文件;protobuf的优点是和平台无关扩展性好,支持java,C++,Python三种语言。kyro速度快,序列化后体积小,性能仅次于protobuf,跨语言支持复杂。
常见序列化框架性能对比图:


序列化性能对比-耗时
序列化性能对比-体积
  1. 实现Kryo序列化工具类。
    序列化接口Serializer :
package learn.netty.serial.kryo;

/**
 * 序列化工具接口
 *
 * @author stone
 * @date 2019/7/31 9:25
 */
public  interface Serializer {
    /**
     * 序列化
     * @param obj
     */
    byte[] serialize(Object obj);


    /**
     * 反序列化
     * @param bytes 字节数组
     * @return
     */
    <T> T deserialize(byte[] bytes);
}

KryoSerializer 序列化工具类

package learn.netty.serial.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.BeanSerializer;
import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
 * 基于kryo的序列化/反序列化工具
 * @author stone
 * @date 2019/7/31 9:30
 */
public class KryoSerializer implements Serializer {
    private final Class<?> ct;
    // kryo 是非线程安全类
    final ThreadLocal<Kryo> kryoLocal = new ThreadLocal<Kryo>() {
        @Override
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.register(ct, new BeanSerializer(kryo, ct));
            return kryo;
        }
    };

    public KryoSerializer(Class<?> ct) {
        this.ct = ct;
    }

    public Class<?> getCt() {
        return ct;
    }

    private Kryo getKryo() {
        return kryoLocal.get();
    }



    @Override
    public byte[] serialize(Object obj) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        Output output = new Output(bos);
        try {
            Kryo kryo = getKryo();
            kryo.writeObjectOrNull(output, obj, obj.getClass());
            output.flush();
            return bos.toByteArray();
        } finally {
            IOUtils.closeQuietly(output);
            IOUtils.closeQuietly(bos);
        }
    }

    @Override
    public <T> T deserialize(byte[] bytes) {
        if (bytes == null)
            return null;
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        Input input = new Input(bais);
        try {
            Kryo kryo = getKryo();
            return (T) kryo.readObjectOrNull(input, ct);
        } finally {
            IOUtils.closeQuietly(input);
            IOUtils.closeQuietly(bais);
        }
    }
    
}

SerializerFactory:

package learn.netty.serial.kryo;

/**
 * 序列化工具类工厂实现
 * @author stone
 * @date 2019/7/31 11:21
 */
public class SerializerFactory {
    public static Serializer getSerializer(Class<?> cls) {
        return new KryoSerializer(cls);
    }
}
  1. 实现编码器Encoder,继承自MessageToByteEncoder
package learn.netty.serial.kryo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 自定义kryo编码器(将传输对象变为byte数组)
 *
 * @author stone
 * @date 2019/7/30 14:16
 */
public class KryoMsgEncoder extends MessageToByteEncoder<LsCar>{
    private Serializer serializer = SerializerFactory.getSerializer(LsCar.class);

    @Override
    protected void encode(ChannelHandlerContext ctx, LsCar msg, ByteBuf out) throws Exception {
        // 1. 将对象转换为byte
        byte[] body = serializer.serialize(msg);
        // 2. 读取消息的长度
        int dataLength = body.length;
        // 3. 先将消息长度写入,也就是消息头
        out.writeInt(dataLength);
        out.writeBytes(body);
    }
}

  1. 实现解码器Decoder,继承ByteToMessageDecoder类
package learn.netty.serial.kryo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 自定义解码器(将字节数组变为对象)
 * @author stone
 * @date 2019/7/30 14:32
 */
public class KryoMsgDecoder extends ByteToMessageDecoder {
    private static final int HEAD_LENGTH = 4; // 表示数据流(头部是消息长度)头部的字节数
    private Serializer serializer = SerializerFactory.getSerializer(LsCar.class);

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < HEAD_LENGTH) {
            return;
        }
        // 标记当前readIndex的位置
        in.markReaderIndex();
        // 读取传送过来的消息长度,ByteBuf的 readInt() 方法会让它的readIndex+4
        int dataLength = in.readInt();
        if (dataLength <= 0) {// 如果读到的消息长度不大于0,这是不应该出现的情况,关闭连接
            ctx.close();
        }
        if (in.readableBytes() < dataLength) { // 说明是不完整的报文,重置readIndex
            in.resetReaderIndex();
            return;
        }

        // 至此,读取到一条完整报文
        byte[] body = new byte[dataLength];
        in.readBytes(body);

        // 将bytes数组转换为我们需要的对象
        LsCar msg = serializer.deserialize(body);
        out.add(msg);
    }
}
  1. 服务端代码
package learn.netty.serial.kryo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author stone
 * @date 2019/7/30 14:49
 */
public class KryoTransferServer {
    private final int port;

    public KryoTransferServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new KryoMsgEncoder());
                            ch.pipeline().addLast(new KryoMsgDecoder());
                            ch.pipeline().addLast(new KryoServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置tcp缓冲区
                    .option(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,同步等待绑定成功
            ChannelFuture f = b.bind(port).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new KryoTransferServer(8889).run();
    }

}
  1. 服务端业务处理类
package learn.netty.serial.kryo;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;


/**
 * @author stone
 * @date 2019/7/30 14:54
 */
public class KryoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {LsCar car = new LsCar();
            car.setName("Server");
            car.setBrand("qirui");
            car.setPrice(24.5f);
            car.setSpeed(196);

            System.out.println("Server write msg: " + car);
            ChannelFuture f = ctx.writeAndFlush(car);
            f.addListener(ChannelFutureListener.CLOSE);
        } finally {
            ReferenceCountUtil.release(msg);
        }


    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

  1. 客户端代码
package learn.netty.serial.kryo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author stone
 * @date 2019/7/30 14:59
 */
public class KryoTransferClient {
    private String host;
    private int port;
    private LsCar message;

    public KryoTransferClient(String host, int port, LsCar message) {
        this.host = host;
        this.port = port;
        this.message = message;
    }

    public void send() throws InterruptedException {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 配置启动辅助类
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new KryoMsgEncoder());
                            ch.pipeline().addLast(new KryoMsgDecoder());
                            ch.pipeline().addLast(new KryoClientHandler(message));
                        }
                    });
            // 异步连接服务器,同步等待连接成功
            ChannelFuture f = b.connect(host, port).sync();
            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LsCar msg = new LsCar();
        msg.setName("Client");
        msg.setBrand("changcheng");
        msg.setSpeed(100);
        msg.setPrice(12.5f);
        KryoTransferClient client = new KryoTransferClient("127.0.0.1", 8889, msg);
        client.send();
    }
}
  1. 客户端服务类
package learn.netty.serial.kryo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * @author stone
 * @date 2019/7/30 15:04
 */
public class KryoClientHandler extends ChannelInboundHandlerAdapter {
    private final LsCar message;

    public KryoClientHandler(LsCar message) {
        this.message = message;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("client send message: " + message);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            LsCar body = (LsCar) msg;
            System.out.println("client receive msg: " + body);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容