java之NIO处理UDP收发

本文大纲如下:

  • 1、写作背景
  • 2、基本的UDP包收发用法
  • 3、采用NIO方式处理UDP

一、背景

本篇内容,主要来源是在对公司代码重构。公司一个项目是采用UDP方式通信,在UDP的不可靠基础上,封装成可靠的通信协议。其本质是UDP+协议的方式,因今天的重点是UDP通信,所以只讲解UDP模块。由于APP有N个的通信对象,之前的代码中,也就有了N个线程监听接收的消息,N个线程发送消息。这样就会使用大量的线程,而且监听的线程一直处于阻塞状态,效率低下。在这种情况下,也就有必要对此模块进行重构了。

二、基本的UDP包收发用法

这也是公司之前的用法,比较简单粗暴,好处是开发成本低,但后期业务增加的时候,性能会有所下降

对于收发UDP包,需要localIp + localPort + remoteIp + remotePort,属于端对端的通信

1)、UDP发送数据
   public static void Send(byte[] data, int offset, int length, int localPort, InetAddress remoteAddress, 
                                              int remotePort) throws Exception {
        if (remoteAddress == null || remotePort <= 0) {
            throw new Exception("Null remote address !!!");
        }

        if (data == null || offset < 0 || length <= 0) {
            throw new Exception("null send data !!!");
        }

        // 会分配一个可用的本地端口
        DatagramSocket socket = new DatagramSocket(null);
       // 多个UDP socket绑定相同的端口
        socket.setReuseAddress(true);
       // 绑定本地端口
        socket.bind(new InetSocketAddress(localPort));

        // 封装成Packet
        DatagramPacket packet = new DatagramPacket(data, offset, length, remoteAddress, remotePort);
        socket.send(packet);

        socket.close();
    }

发送UDP包流程:

  • 构建DatagramSocket
  • 绑定本地发送端口
  • 构建发送的UDP数据包
  • 发送
  • 关闭Socket
2)、UDP接收数据

DatagramSocket socket = new MulticastSocket(null);
socket.setReuseAddress(true);
socket.bind(new InetSocketAddress(listenPort));

protected Runnable listenLoop = new Runnable() {
        @Override
        public void run() {
            byte[] receiveBuffer = new byte[1024];
            DatagramPacket packet = new DatagramPacket(receiveBuffer, receiveBuffer.length);

            while (listenRunning) {
                if (socket != null && !socket.isClosed()) {
                    try {
                        socket.receive(packet);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

接收UDP包流程:

  • 构建DatagramSocket
  • 绑定本地发送端口
  • 构建接收的UDP数据包
  • socket.receive(packet);
  • 关闭Socket

三、NIO重构UDP收发模块

1)、思路

NIO是同步非阻塞方式,将DatagramChannel向Selector选择器注册,使用一个Thread轮询Selector,当网卡准备数据时,就能告知用户开始处理发送或接收事件。总之,一切的数据发送和接收前,都得到Selector注册,得到了Selector的“允许”后,才能处理后续的工作。


接收和发送

2)、核心代码

// 发送接口
public interface Sender extends Closeable {

    // 触发异步的发送请求
    boolean postSendAsync() throws IOException;

    void send(String message,InetSocketAddress remoteAddress);
}
// 接收接口
public interface Receiver extends Closeable {

    // 触发异步的接收请求
    boolean postReceiveAsync() throws IOException;

    // 开始监听
    void start();
}
// 用于Channel向Selector注册
public interface IoProvider extends Closeable {

    boolean registerInput(DatagramChannel channel, HandleProviderCallback callback);

    boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback);

    void unRegisterInput(DatagramChannel channel);

    void unRegisterOutput(DatagramChannel channel);


    abstract class HandleProviderCallback implements Runnable {

        @Override
        public final void run() {
            onProviderIo();
        }

        /**
         * 可以进行接收或者发送时的回调
         *
         */
        protected abstract void onProviderIo();

    }

}

// 实现了Sender和Receiver
class DatagramChannelAdapter implements Sender,Receiver,Closeable {

    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicBoolean isSending = new AtomicBoolean();
    private final DatagramChannel channel;
    private final IoProvider ioProvider;
    private final UdpDataDispatcher dispatcher;

    private final Queue<UDPSendSnapshot> queue = new ConcurrentLinkedQueue<>();
    private final ReceiveUdpListener receiverUdpListener;

    DatagramChannelAdapter(DatagramChannel channel, IoProvider ioProvider, ReceiveUdpListener receiverUdpListener) throws IOException {
        this.channel = channel;
        this.ioProvider = ioProvider;
        this.receiverUdpListener = receiverUdpListener;
        dispatcher = new UdpDataDispatcher(channel);

        // 非阻塞模式下操作
        channel.configureBlocking(false);
    }

    @Override
    public boolean postReceiveAsync() throws IOException {
        if (isClosed.get()) {
            throw new IOException("Current channel is closed!");
        }

        // 注册能不能输入
        return ioProvider.registerInput(channel, inputCallback);
    }

    @Override
    public void start() {
        try {
            postReceiveAsync();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean postSendAsync() throws IOException {
        if (isClosed.get()) {
            throw new IOException("Current channel is closed!");
        }

        // 当前发送的数据附加到回调中
        return ioProvider.registerOutput(channel, outputCallback);
    }

    @Override
    public void send(String message,InetSocketAddress remoteAddress) {
        queue.offer(new UDPSendSnapshot(message,remoteAddress));
        requestSend();
    }

    private void requestSend() {
        if (isSending.compareAndSet(false,true) ) {
            if (queue.size() <= 0){
                isSending.set(false);
                return;
            }
            try {
                if (!postSendAsync()) {
                    isSending.set(false);
                }
            } catch (IOException e) {
                e.printStackTrace();
                CloseUtils.close(this);
            }
        }

    }

    @Override
    public void close() throws IOException {
        if (isClosed.compareAndSet(false, true)) {
            // 解除注册回调
            ioProvider.unRegisterInput(channel);
            ioProvider.unRegisterOutput(channel);
            // 关闭
            CloseUtils.close(channel);
        }
    }

    // 输入的数据操作
    private final IoProvider.HandleProviderCallback inputCallback = new IoProvider.HandleProviderCallback() {
        @Override
        protected void onProviderIo() {


            if (isClosed.get()) {
                return;
            }
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!inputCallback");
            ReceiveUdpData receiveUdp = dispatcher.receive();

            try {
                if (receiveUdp == null) {
                    throw new IOException();
                }
                postReceiveAsync();
                receiverUdpListener.onReceiveUdpListener(receiveUdp.getBytes(),receiveUdp.getTotal(),receiveUdp.getAddress(),receiveUdp.getPort());
            } catch (IOException e) {
                CloseUtils.close(DatagramChannelAdapter.this);
            }
        }
    };

    // 输出的数据操作
    private final IoProvider.HandleProviderCallback outputCallback = new IoProvider.HandleProviderCallback() {
        @Override
        protected void onProviderIo() {
            if (isClosed.get() || queue.size() == 0) {
                return;
            }
            System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!outputCallback");
            synchronized (isSending) {
                UDPSendSnapshot snapshot = queue.poll();
                dispatcher.sendMessage(snapshot.getMessage(),snapshot.getRemoteAddress());
                isSending.set(false);
            }
        }
    };


    /**
     * 收到监听UDP消息之后的回调
     */
    interface ReceiveUdpListener {
        void onReceiveUdpListener(byte[] data, int length, InetSocketAddress address, int port);
    }
}
public class IoSelectorProvider implements IoProvider {

    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    // 是否处于某个过程
    private final AtomicBoolean inRegInput = new AtomicBoolean(false);
    private final AtomicBoolean inRegOutput = new AtomicBoolean(false);

    // 读和写的数据选择器
    private final Selector readSelector;
    private final Selector writeSelector;
    private final ExecutorService dataHandlePool;

    private final HashMap<SelectionKey, Runnable> inputCallbackMap = new HashMap<>();
    private final HashMap<SelectionKey, Runnable> outputCallbackMap = new HashMap<>();


    public IoSelectorProvider() throws IOException {
        readSelector = Selector.open();
        writeSelector = Selector.open();
        dataHandlePool = Executors.newFixedThreadPool(4,
                new Factory.NameableThreadFactory("IoProvider-Thread-"));

        // 开始输出输入的监听
        startRead();
        startWrite();
    }

    private void startRead() {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (!isClosed.get()) {
                    try {
                        if (readSelector.select() == 0) {
                            waitSelection(inRegInput);
                            continue;
                        } else if (inRegInput.get()) {
                            waitSelection(inRegInput);
                        }

                        Set<SelectionKey> selectionKeys = readSelector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            if (selectionKey.isValid()) {
                                // 对应着下面的两种形式 可读
                                System.out.println("可读的回调");
                                handleSelection(selectionKey,
                                        SelectionKey.OP_READ, inputCallbackMap, dataHandlePool, inRegInput);

                            }
                            iterator.remove();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ClosedSelectorException ignored) {
                        break;
                    }
                }
            }
        };

        // 启动线程
        new Thread(runnable)
                .start();
    }

    private void startWrite() {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (!isClosed.get()) {
                    try {
                        if (writeSelector.select() == 0) {
                            waitSelection(inRegOutput);
                            continue;
                        } else if (inRegOutput.get()) {
                            waitSelection(inRegOutput);
                        }

                        Set<SelectionKey> selectionKeys = writeSelector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            if (selectionKey.isValid()) {

                                // 可写
                                if (selectionKey.isWritable()) {
                                    System.out.println("可写的回调");
                                    handleSelection(selectionKey,
                                            SelectionKey.OP_WRITE, outputCallbackMap, dataHandlePool, inRegOutput);
                                }

                            }
                            iterator.remove();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ClosedSelectorException ignored) {
                        break;
                    }
                }
            }
        };

        // 启动线程
        new Thread(runnable)
                .start();
    }


    private static void handleSelection(SelectionKey key, int keyOps,
                                        HashMap<SelectionKey, Runnable> map,
                                        ExecutorService pool, AtomicBoolean locker) {
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            try {
                // 重点
                // 取消继续对keyOps的监听
                key.interestOps(key.readyOps() & ~keyOps);
            } catch (CancelledKeyException e) {
                return;
            }
        }

        Runnable runnable = null;
        try {
            runnable = map.get(key);
        } catch (Exception ignored) {

        }

        if (runnable != null && !pool.isShutdown()) {
            // 异步调度
            pool.execute(runnable);
        }
    }


    @Override
    public boolean registerInput(DatagramChannel channel, HandleProviderCallback callback) {
        return registerSelection(channel, readSelector, SelectionKey.OP_READ, inRegInput,
                inputCallbackMap, callback) != null;
    }

    @Override
    public boolean registerOutput(DatagramChannel channel, HandleProviderCallback callback) {
        return registerSelection(channel, writeSelector, SelectionKey.OP_WRITE, inRegOutput,
                outputCallbackMap, callback) != null;
    }

    @Override
    public void unRegisterInput(DatagramChannel channel) {
        unRegisterSelection(channel, readSelector, inputCallbackMap, inRegInput);
    }

    @Override
    public void unRegisterOutput(DatagramChannel channel) {
        unRegisterSelection(channel, writeSelector, outputCallbackMap, inRegOutput);
    }

    private static SelectionKey registerSelection(DatagramChannel channel, Selector selector,
                                                  int registerOps, AtomicBoolean locker,
                                                  HashMap<SelectionKey, Runnable> map,
                                                  Runnable runnable) {

        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            // 设置锁定状态
            locker.set(true);

            try {
                // 唤醒当前的selector,让selector不处于select()状态
                selector.wakeup();

                SelectionKey key = null;
                if (channel.isRegistered()) {
                    // 查询是否已经注册过
                    key = channel.keyFor(selector);

                }

                if (key != null) {
                    key.interestOps(key.readyOps() | registerOps);
                }

                if (key == null) {
                    // 注册selector得到Key
                    key = channel.register(selector, registerOps);
                    // 注册回调
                    map.put(key, runnable);
                }

                return key;
            } catch (ClosedChannelException
                    | CancelledKeyException
                    | ClosedSelectorException e) {
                e.printStackTrace();
                return null;
            } finally {
                // 解除锁定状态
                locker.set(false);
                try {
                    // 通知
                    locker.notify();
                } catch (Exception ignored) {
                }
            }
        }
    }

    private static void unRegisterSelection(DatagramChannel channel, Selector selector,
                                            Map<SelectionKey, Runnable> map,
                                            AtomicBoolean locker) {
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            locker.set(true);
            selector.wakeup();
            try {
                if (channel.isRegistered()) {
                    SelectionKey key = channel.keyFor(selector);
                    if (key != null) {
                        // 取消监听的方法
                        key.cancel();
                        map.remove(key);
                    }
                }
            } finally {
                locker.set(false);
                try {
                    locker.notifyAll();
                } catch (Exception ignored) {
                }
            }
        }
    }

    private static void waitSelection(final AtomicBoolean locker) {
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (locker) {
            if (locker.get()) {
                try {
                    locker.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (isClosed.compareAndSet(false, true)) {
            dataHandlePool.shutdown();

            inputCallbackMap.clear();
            outputCallbackMap.clear();

            CloseUtils.close(readSelector);
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容