项目介绍
-
简单介绍: 我在实际的工作之中做过的项目阐述这个项目。
netty client,netty server,other server主要围绕这三个服务之间的逻辑处理。
逻辑图:
- 主要以netty server为例讲解
项目中主要解决了一下问题:
- 在netty server的核心业务处理类中处理无法注入bean的问题
- 提供netty连接通道channel在分布式下的共享服务思路·后续更新具体实现方案
netty和springboot的整合
- pom.xml 文件
在SpringBoot项目里添加netty的依赖,注意不要用netty5的依赖,因为已经废弃了
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.netty.vr</groupId>
<artifactId>centre</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>centre</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.20.1</version>
<configuration>
<!-- 不指定单元测试 -->
<skipTests>true</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>
- yml配置文件
将端口和IP写入application.yml文件里,本机测试,用127.0.0.1就ok。即使在服务器上部署也建议127.0.0.1,然后用nginx配置域名代理
netty:
port: 8888
url: 127.0.0.1
- netty服务启动引导类
package com.daoyin.vr.centre.component;
import com.daoyin.vr.centre.netty.MyWebSocketChannelHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* netty程序的入口,负责启动应用
* @author Mr培
*/
@Component
public class NettyServer {
public void start(InetSocketAddress address){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup);
server.channel(NioServerSocketChannel.class);
server.childHandler(new MyWebSocketChannelHandler());
System.out.println("服务端开启等待客户端连接....");
Channel ch = server.bind(address).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
//优雅的退出程序
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
- 启动main方法,CommandLineRunner #run()
这里主要是通过CommandLineRunner 接口的run方法,实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中。
当然还有其他的启动方法,比如
- 利用 ApplicationListener 上下文监听器
- 注解@PostConstruct
- 利用监听器启动
这里以CommandLineRunner为例
package com.daoyin.vr.centre;
import com.daoyin.vr.centre.component.NettyServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.net.InetSocketAddress;
/**
* 实现 CommandLineRunner 执行额外任务
* @author Mr培
*/
@SpringBootApplication
public class CentreApplication implements CommandLineRunner {
@Value("${netty.port}")
private int port;
@Value("${netty.url}")
private String url;
/**
* final修饰 + 构造器 ≌ @Autowired 注解
* */
private final NettyServer server;
public CentreApplication(NettyServer server) {
this.server = server;
}
public static void main(String[] args) {
SpringApplication.run(CentreApplication.class, args);
}
/**
* 服务开始则启动netty server
* */
@Override
public void run(String... args){
InetSocketAddress address = new InetSocketAddress(url,port);
server.start(address);
}
}
- 存储整个工程的全局配置
可根据个人的不同业务逻辑自行实现业务方法
package com.daoyin.vr.centre.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
/**
* 存储整个工程的全局配置
* @author rp
*
*/
public class NettyConfig {
/**
* 存储每一个客户端接入进来时的channel对象
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 储存每一个客户端接入进来时的channel键值对 对象
* */
public static ConcurrentHashMap<String, ChannelId> cannelMap = new ConcurrentHashMap<>();
/**
* vr设备状态储存
* 0 未占用 1 占用
* */
public static ConcurrentHashMap<String, Integer> vrStatusMap = new ConcurrentHashMap<>();
/**
* 添加设备和状态
* */
public static void addStatus(String key,Integer status){
vrStatusMap.put(key,status);
}
/**
* 查询设备状态
* */
public static Integer findStatus(String key){
return vrStatusMap.get(key);
}
/**
* 储存netty客户端channel管道
* */
public static void addChannel(String key,Channel channel){
group.add(channel);
cannelMap.put(key,channel.id());
}
/**
* 查询客户端channel管道
* */
public static Channel findChannel(String key){
return group.find(cannelMap.get(key));
}
/**
* 移除客户端channel管道
* */
public static void removeChannel(Channel channel){
NettyConfig.group.remove(channel);
}
/**
* 根据设备标识找到客户端channel并发送消息
* */
public static void send(String key,String value){
findChannel(key).writeAndFlush(new TextWebSocketFrame(value));
}
/**
* 根据客户端channel直接发送消息
* */
public static void send(Channel channel,String value){
channel.writeAndFlush(new TextWebSocketFrame(value));
}
/**
* 根据设备标识移除客户端channel
* */
public static void removeChannel(String key,Channel channel){
group.remove(channel);
cannelMap.remove(key);
}
/**
* 群发消息
* */
public static void sendAll(String value){
group.writeAndFlush(new TextWebSocketFrame(value));
}
}
- 初始化连接时候的各个组件
package com.daoyin.vr.centre.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* 初始化连接时候的各个组件
* @author Mr培
*
*/
public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel){
channel.pipeline().addLast("http-codec", new HttpServerCodec());
channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
channel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
channel.pipeline().addLast("handler", new MyWebSocketHandler());
}
}
- 接收/处理/响应客户端websocket请求的核心业务处理类
服务端业务处理handler
package com.daoyin.vr.centre.netty;
import com.daoyin.vr.centre.service.VrChannelService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.*;
/**
* 接收/处理/响应客户端websocket请求的核心业务处理类
* @author rp
*/
@Component
public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketServerHandshaker handShaker;
private static MyWebSocketHandler myWebSocketHandler;
@Autowired
private VrChannelService vrChannelService;
/**
* 解决无法注入问题
* */
@PostConstruct
public void init() {
myWebSocketHandler = this;
}
private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
/**
* 客户端与服务端创建连接的时候调用
* 使用单线程池在第一次连接成功发送消息到客户端·不要问为什么要用线程发送消息
* */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(()->NettyConfig.send(ctx.channel(),"OK"));
executorService.shutdown();
executorService.awaitTermination(0L, TimeUnit.MILLISECONDS);
System.out.println("客户端与服务端连接开启...");
}
/**
* 客户端与服务端断开连接的时候调用
* */
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.removeChannel(ctx.channel());
myWebSocketHandler.vrChannelService.updateStatusByChannelId(2,ctx.channel().id().toString());
System.out.println("客户端与服务端连接关闭...");
}
/**
* 服务端接收客户端发送过来的数据结束之后调用
* */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 工程出现异常的时候调用
* */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
NettyConfig.send(ctx.channel(),"EXCEPTION");
cause.printStackTrace();
ctx.close();
}
/**
* 服务端处理客户端websocket请求的核心方法
* */
@Override
protected void channelRead0(ChannelHandlerContext context, Object msg) throws Exception {
//处理客户端向服务端发起http握手请求的业务
if (msg instanceof FullHttpRequest) {
handHttpRequest(context, (FullHttpRequest)msg);
}else if (msg instanceof WebSocketFrame) {
//处理websocket连接业务
handWebsocketFrame(context, (WebSocketFrame)msg);
}
}
/**
* 处理客户端与服务端之前的websocket业务
* @param ctx
* @param frame
*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
//判断是否是关闭websocket的指令
if (frame instanceof CloseWebSocketFrame) {
handShaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
}
//判断是否是ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//判断是否是二进制消息,如果是二进制消息,抛出异常
if( ! (frame instanceof TextWebSocketFrame) ){
System.out.println("目前我们不支持二进制消息·可能客户端断开连接");
return;
}
//获取客户端向服务端发送的消息
String vrId = ((TextWebSocketFrame) frame).text();
System.out.println("接收到客户端消息---->> " + vrId);
if (vrId.contains(":0")){
vrId = vrId.split(":")[0];
}else{
NettyConfig.addChannel(vrId,ctx.channel());
NettyConfig.send(ctx.channel(),"OK");
}
NettyConfig.addStatus(vrId,0);
//将信息存储mysql数据库
myWebSocketHandler.vrChannelService.insertVrMysql(vrId,0,ctx.channel().id().toString());
}
/**
* 处理客户端向服务端发起http握手请求的业务
* @param ctx
* @param req
*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
if (!req.decoderResult().isSuccess() || !("websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WEB_SOCKET_URL, null, false);
handShaker = wsFactory.newHandshaker(req);
if (handShaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}else{
handShaker.handshake(ctx.channel(), req);
}
}
/**
* 服务端向客户端响应消息
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res){
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
//服务端向客户端发送数据
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* netty 5 的方法
* @Override
* protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
* //处理客户端向服务端发起http握手请求的业务
* if (msg instanceof FullHttpRequest) {
* handHttpRequest(context, (FullHttpRequest)msg);
* }else if (msg instanceof WebSocketFrame) {
* //处理websocket连接业务
* handWebsocketFrame(context, (WebSocketFrame)msg);
* }
* }
*/
}
netty连接通道channel在分布式下的共享服务思路
首先来说channel是无法被序列化储存的,所以对于一些想要把channel序列化存储后再取出来,给客户端发送消息是不现实的,即使将channel序列化存储后取出来也不再是原来的channel了。
- 思路一: 用 rabbitmq 来通知每个netty server
以两个netty server为例,把每个netty server 当作rabbitmq的服务中心。
例如: 当client1连接到server1,client2连接server2,这时有其他的服务连接到server1,需要sercer2通知client2时,就可以通过server1发送通知告诉server2,server2再发送消息给client2。这样就实现了netty的分布式的部署。
- 思路二: netty注册中心服务
这种方案其实跟第一种没多大区别,思路都是一样的。只是说将rabbitmq换成了netty server来进行消息的互通,发送消息还是通过当前的server发送到连接的client。
其他
- 若想了解其他的启动方式可以参考
springboot整合netty的多种方式 - 了解其他的优秀文章
Netty服务端Channel的创建与初始化
深入理解 Netty-Channel架构体系