本文大纲如下:
- 1、写作背景
- 2、基本的UDP包收发用法
- 3、采用NIO方式处理UDP
一、背景
本篇内容,主要来源是在对公司代码重构。公司一个项目是采用UDP方式通信,在UDP的不可靠基础上,封装成可靠的通信协议。其本质是UDP+协议的方式,因今天的重点是UDP通信,所以只讲解UDP模块。由于APP有N个的通信对象,之前的代码中,也就有了N个线程监听接收的消息,N个线程发送消息。这样就会使用大量的线程,而且监听的线程一直处于阻塞状态,效率低下。在这种情况下,也就有必要对此模块进行重构了。
二、基本的UDP包收发用法
这也是公司之前的用法,比较简单粗暴,好处是开发成本低,但后期业务增加的时候,性能会有所下降
对于收发UDP包,需要localIp + localPort + remoteIp + remotePort,属于端对端的通信
1)、UDP发送数据
public static void Send(byte[] data, int offset, int length, int localPort, InetAddress remoteAddress,
int remotePort) throws Exception {
if (remoteAddress == null || remotePort <= 0) {
throw new Exception("Null remote address !!!");
}
if (data == null || offset < 0 || length <= 0) {
throw new Exception("null send data !!!");
}
// 会分配一个可用的本地端口
DatagramSocket socket = new DatagramSocket(null);
// 多个UDP socket绑定相同的端口
socket.setReuseAddress(true);
// 绑定本地端口
socket.bind(new InetSocketAddress(localPort));
// 封装成Packet
DatagramPacket packet = new DatagramPacket(data, offset, length, remoteAddress, remotePort);
socket.send(packet);
socket.close();
}
发送UDP包流程:
- 构建DatagramSocket
- 绑定本地发送端口
- 构建发送的UDP数据包
- 发送
- 关闭Socket
2)、UDP接收数据
DatagramSocket socket = new MulticastSocket(null);
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(listenPort));
protected Runnable listenLoop = new Runnable() {
@Override
public void run() {
byte[] receiveBuffer = new byte[1024];
DatagramPacket packet = new DatagramPacket(receiveBuffer, receiveBuffer.length);
while (listenRunning) {
if (socket != null && !socket.isClosed()) {
try {
socket.receive(packet);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
接收UDP包流程:
- 构建DatagramSocket
- 绑定本地发送端口
- 构建接收的UDP数据包
- socket.receive(packet);
- 关闭Socket
三、NIO重构UDP收发模块
1)、思路
NIO是同步非阻塞方式,将DatagramChannel向Selector选择器注册,使用一个Thread轮询Selector,当网卡准备数据时,就能告知用户开始处理发送或接收事件。总之,一切的数据发送和接收前,都得到Selector注册,得到了Selector的“允许”后,才能处理后续的工作。
2)、核心代码
// 发送接口
public interface Sender extends Closeable {
// 触发异步的发送请求
boolean postSendAsync() throws IOException;
void send(String message,InetSocketAddress remoteAddress);
}
// 接收接口
public interface Receiver extends Closeable {
// 触发异步的接收请求
boolean postReceiveAsync() throws IOException;
// 开始监听
void start();
}
// 用于Channel向Selector注册
public interface IoProvider extends Closeable {
boolean registerInput(DatagramChannel channel, HandleProviderCallback callback);
boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback);
void unRegisterInput(DatagramChannel channel);
void unRegisterOutput(DatagramChannel channel);
abstract class HandleProviderCallback implements Runnable {
@Override
public final void run() {
onProviderIo();
}
/**
* 可以进行接收或者发送时的回调
*
*/
protected abstract void onProviderIo();
}
}
// 实现了Sender和Receiver
class DatagramChannelAdapter implements Sender,Receiver,Closeable {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean isSending = new AtomicBoolean();
private final DatagramChannel channel;
private final IoProvider ioProvider;
private final UdpDataDispatcher dispatcher;
private final Queue<UDPSendSnapshot> queue = new ConcurrentLinkedQueue<>();
private final ReceiveUdpListener receiverUdpListener;
DatagramChannelAdapter(DatagramChannel channel, IoProvider ioProvider, ReceiveUdpListener receiverUdpListener) throws IOException {
this.channel = channel;
this.ioProvider = ioProvider;
this.receiverUdpListener = receiverUdpListener;
dispatcher = new UdpDataDispatcher(channel);
// 非阻塞模式下操作
channel.configureBlocking(false);
}
@Override
public boolean postReceiveAsync() throws IOException {
if (isClosed.get()) {
throw new IOException("Current channel is closed!");
}
// 注册能不能输入
return ioProvider.registerInput(channel, inputCallback);
}
@Override
public void start() {
try {
postReceiveAsync();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public boolean postSendAsync() throws IOException {
if (isClosed.get()) {
throw new IOException("Current channel is closed!");
}
// 当前发送的数据附加到回调中
return ioProvider.registerOutput(channel, outputCallback);
}
@Override
public void send(String message,InetSocketAddress remoteAddress) {
queue.offer(new UDPSendSnapshot(message,remoteAddress));
requestSend();
}
private void requestSend() {
if (isSending.compareAndSet(false,true) ) {
if (queue.size() <= 0){
isSending.set(false);
return;
}
try {
if (!postSendAsync()) {
isSending.set(false);
}
} catch (IOException e) {
e.printStackTrace();
CloseUtils.close(this);
}
}
}
@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
// 解除注册回调
ioProvider.unRegisterInput(channel);
ioProvider.unRegisterOutput(channel);
// 关闭
CloseUtils.close(channel);
}
}
// 输入的数据操作
private final IoProvider.HandleProviderCallback inputCallback = new IoProvider.HandleProviderCallback() {
@Override
protected void onProviderIo() {
if (isClosed.get()) {
return;
}
System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!inputCallback");
ReceiveUdpData receiveUdp = dispatcher.receive();
try {
if (receiveUdp == null) {
throw new IOException();
}
postReceiveAsync();
receiverUdpListener.onReceiveUdpListener(receiveUdp.getBytes(),receiveUdp.getTotal(),receiveUdp.getAddress(),receiveUdp.getPort());
} catch (IOException e) {
CloseUtils.close(DatagramChannelAdapter.this);
}
}
};
// 输出的数据操作
private final IoProvider.HandleProviderCallback outputCallback = new IoProvider.HandleProviderCallback() {
@Override
protected void onProviderIo() {
if (isClosed.get() || queue.size() == 0) {
return;
}
System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!outputCallback");
synchronized (isSending) {
UDPSendSnapshot snapshot = queue.poll();
dispatcher.sendMessage(snapshot.getMessage(),snapshot.getRemoteAddress());
isSending.set(false);
}
}
};
/**
* 收到监听UDP消息之后的回调
*/
interface ReceiveUdpListener {
void onReceiveUdpListener(byte[] data, int length, InetSocketAddress address, int port);
}
}
public class IoSelectorProvider implements IoProvider {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
// 是否处于某个过程
private final AtomicBoolean inRegInput = new AtomicBoolean(false);
private final AtomicBoolean inRegOutput = new AtomicBoolean(false);
// 读和写的数据选择器
private final Selector readSelector;
private final Selector writeSelector;
private final ExecutorService dataHandlePool;
private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();
private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();
public IoSelectorProvider() throws IOException {
readSelector = Selector.open();
writeSelector = Selector.open();
dataHandlePool = Executors.newFixedThreadPool(4,
new Factory.NameableThreadFactory("IoProvider-Thread-"));
// 开始输出输入的监听
startRead();
startWrite();
}
private void startRead() {
Runnable runnable = new Runnable() {
@Override
public void run() {
while (!isClosed.get()) {
try {
if (readSelector.select() == 0) {
waitSelection(inRegInput);
continue;
} else if (inRegInput.get()) {
waitSelection(inRegInput);
}
Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isValid()) {
// 对应着下面的两种形式 可读
System.out.println("可读的回调");
handleSelection(selectionKey,
SelectionKey.OP_READ, inputCallbackMap, dataHandlePool, inRegInput);
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException ignored) {
break;
}
}
}
};
// 启动线程
new Thread(runnable)
.start();
}
private void startWrite() {
Runnable runnable = new Runnable() {
@Override
public void run() {
while (!isClosed.get()) {
try {
if (writeSelector.select() == 0) {
waitSelection(inRegOutput);
continue;
} else if (inRegOutput.get()) {
waitSelection(inRegOutput);
}
Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isValid()) {
// 可写
if (selectionKey.isWritable()) {
System.out.println("可写的回调");
handleSelection(selectionKey,
SelectionKey.OP_WRITE, outputCallbackMap, dataHandlePool, inRegOutput);
}
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException ignored) {
break;
}
}
}
};
// 启动线程
new Thread(runnable)
.start();
}
private static void handleSelection(SelectionKey key, int keyOps,
HashMap<SelectionKey, Runnable> map,
ExecutorService pool, AtomicBoolean locker) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (locker) {
try {
// 重点
// 取消继续对keyOps的监听
key.interestOps(key.readyOps() & ~keyOps);
} catch (CancelledKeyException e) {
return;
}
}
Runnable runnable = null;
try {
runnable = map.get(key);
} catch (Exception ignored) {
}
if (runnable != null && !pool.isShutdown()) {
// 异步调度
pool.execute(runnable);
}
}
@Override
public boolean registerInput(DatagramChannel channel, HandleProviderCallback callback) {
return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,
inputCallbackMap, callback) != null;
}
@Override
public boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback) {
return registerSelection(channel, writeSelector, SelectionKey.OP_WRITE, inRegOutput,
outputCallbackMap, callback) != null;
}
@Override
public void unRegisterInput(DatagramChannel channel) {
unRegisterSelection(channel, readSelector, inputCallbackMap, inRegInput);
}
@Override
public void unRegisterOutput(DatagramChannel channel) {
unRegisterSelection(channel, writeSelector, outputCallbackMap, inRegOutput);
}
private static SelectionKey registerSelection(DatagramChannel channel, Selector selector,
int registerOps, AtomicBoolean locker,
HashMap<SelectionKey, Runnable> map,
Runnable runnable) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (locker) {
// 设置锁定状态
locker.set(true);
try {
// 唤醒当前的selector,让selector不处于select()状态
selector.wakeup();
SelectionKey key = null;
if (channel.isRegistered()) {
// 查询是否已经注册过
key = channel.keyFor(selector);
}
if (key != null) {
key.interestOps(key.readyOps() | registerOps);
}
if (key == null) {
// 注册selector得到Key
key = channel.register(selector, registerOps);
// 注册回调
map.put(key, runnable);
}
return key;
} catch (ClosedChannelException
| CancelledKeyException
| ClosedSelectorException e) {
e.printStackTrace();
return null;
} finally {
// 解除锁定状态
locker.set(false);
try {
// 通知
locker.notify();
} catch (Exception ignored) {
}
}
}
}
private static void unRegisterSelection(DatagramChannel channel, Selector selector,
Map<SelectionKey, Runnable> map,
AtomicBoolean locker) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (locker) {
locker.set(true);
selector.wakeup();
try {
if (channel.isRegistered()) {
SelectionKey key = channel.keyFor(selector);
if (key != null) {
// 取消监听的方法
key.cancel();
map.remove(key);
}
}
} finally {
locker.set(false);
try {
locker.notifyAll();
} catch (Exception ignored) {
}
}
}
}
private static void waitSelection(final AtomicBoolean locker) {
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (locker) {
if (locker.get()) {
try {
locker.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void close() throws IOException {
if (isClosed.compareAndSet(false, true)) {
dataHandlePool.shutdown();
inputCallbackMap.clear();
outputCallbackMap.clear();
CloseUtils.close(readSelector);
}
}
}