NIO学习_Client

NIO客户端

写这个程序的初衷是在儒猿的分布式小文件系统所受到的启发,当时老师设计和我的想法有点出入,我按照自己实现的思路实现了一套逻辑,分布式小文件系统客户端考虑的要点是高效的传输文件,在这个基础上就要求我们必须维持和DataNode的长连接,减少和DataNode连接损耗,在此,我们就需要在第一个请求建立连接后,将连接缓存在客户端,方便下次直接使用。

先说说主要的一些集合容器的作用:

  • waitingConnectHosts 等待连接Host的缓存
  • connections 所有的连接集合,这里主要是方便使用Host获取到SelectionKey进行感知状态变换
  • connectState 所有连接状态缓存,当一个请求尝试连接的时候,如果在该集合中发现连接状态是SUCCESS,那么就会直接获取连接进行文件传输
  • waitingRequests 等待发送的请求队列,当客户端进行请求提交的时候,请求首先会进入该队列进行缓存
  • toSendRequests 当时机恰当的时候,会将waitingRequests的请求拉取缓存到toSendRequests中,请求在该队列是在客户端最后一个缓存队列,之后就是发送了
  • unfinishedResponses 未完成的响应,因为TCP的拆包问题,在一个Read事件中可能无法将一个请求进行完整的解析,这就要求我们将未读完的请求缓存起来,等待下次Read事件进行追加读取,完成整个响应解析。
  • finishedResponses 已完成响应缓存,在该缓存中存储是已经完成响应但未被客户端获取的
  • callBackMap 回调缓存,当传入回调函数的时候会进入其中

核心类是NetworkManager,这个是管理所有连接的管理器,在这个类里面负责请求的解析,缓存,响应解析,回调函数调用

主要的完整流程如下:


image.png

这里主要说下要点:

  1. 在这里处理了粘包和拆包的问题,主要定义好该次请求的数据长度,例如文件上传请求,请求ID + 请求类型 + 文件名长度 + 文件长度 + 文件名 + 文件内容,如果请求ID为UUID,请求类型为int类型代表,文件名和文件长度都用int代表,那么该次请求总的大小为32 + 4 + 4 + 4 + fileNameLength + fileSize

  2. 响应的解析也是按照服务端定义的响应类型处理的

  3. 存在一个问题,所有的请求最后实例化为ByteBuffer,在高并发情况下,内存占用是个问题

NetworkManager
package org.zymf.nio.example3.client;

import org.zymf.nio.example3.constant.Constant;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: 通讯管理器
 * @date 2021-07-20 21:29
 */
public class NetworkManager {

    private Selector selector;

    private ClientHandleThread moniter;

    // 等待建立连接的机器
    private ConcurrentLinkedQueue<Host> waitingConnectHosts;

    // 所有的连接
    private Map<Host, SelectionKey> connections;

    // 每个数据节点的连接状态
    private Map<Host, Integer> connectState;

    //等待发送的请求
    private Map<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> waitingRequests;

    // 马上准备要发送的网络请求
    private Map<Host,  ConcurrentLinkedQueue<NetWorkClientRequest>> toSendRequests;

    // 未完成解析的响应
    private Map<Host, NetWorkClientRespond> unfinishedResponses;

    // 已经完成请求的响应
    private Map<Host, Map<String, NetWorkClientRespond>> finishedResponses;

    private Map<String,NetWorkRespondCallBack> callBackMap;

    public NetworkManager() {
        try {
            this.selector = Selector.open();
            this.moniter = new ClientHandleThread();
            this.waitingConnectHosts = new ConcurrentLinkedQueue<>();
            this.connections = new ConcurrentHashMap<>();
            this.connectState = new ConcurrentHashMap<>();
            this.waitingRequests = new ConcurrentHashMap<>();
            toSendRequests = new ConcurrentHashMap<>();
            this.finishedResponses = new ConcurrentHashMap<>();
            this.unfinishedResponses = new ConcurrentHashMap<>();
            this.callBackMap = new ConcurrentHashMap<>();
            moniter.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /** 
     * @description: 尝试连接 
     * @param: * @param: host 
     * @return: void 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:53
     */
    public void tryConnect(Host host) throws Exception {
        if (!connectState.containsKey(host) || connectState.get(host) == Constant.FAIL_CONNECT){
            waitingConnectHosts.offer(host);
            connectState.put(host, Constant.WAITING_CONNECT);
        }
    }

    /** 
     * @description: 验证是否完成连接 
     * @param: * @param: host
     * @param: sync 同步等待完成
     * @return: int 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:53
     */
    public int finishConnect(Host host,boolean sync) throws Exception {
        boolean containsKey = connectState.containsKey(host);
        if (!containsKey){
            throw new RuntimeException("该连接不存在");
        }
        int status = connectState.get(host);
        if (Constant.WAITING_CONNECT == status && sync){
            while (true){
                if (Constant.WAITING_CONNECT != connectState.get(host)){
                    return connectState.get(host);
                }
                Thread.sleep(200);
            }
        }
        return status;
    }

    /** 
     * @description: 发送请求 
     * @param: * @param: request 
     * @return: void 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:52
     */
    public void sendRequest(NetWorkClientRequest request) {
        Host host = request.getHost();
        waitingRequests.get(host).offer(request);
    }

    /**
     * @description: 发送回调请求
     * @param: * @param: request
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:52
     */
    public void sendCallBackRequest(NetWorkClientRequest request) {
        Host host = request.getHost();
        waitingRequests.get(host).offer(request);
        callBackMap.put(request.getRequestId(),request.getNetWorkRespondCallBack());
    }

    /** 
     * @description: 同步返回响应结果,如果还没返回就进行等待
     * @param: * @param: request 
     * @return: org.zymf.nio.example3.client.NetWorkClientRespond 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:52
     */
    public NetWorkClientRespond waitResponseSync(NetWorkClientRequest request) throws Exception {
        Host host = request.getHost();
        Map<String, NetWorkClientRespond> respondMap = finishedResponses.get(host);
        NetWorkClientRespond respond = null;
        while ((respond = respondMap.get(request.getRequestId())) == null){
            Thread.sleep(200);
        }
        return respond;
    }

    class ClientHandleThread extends Thread {

        @Override
        public void run() {
            while (true) {
                //连接注册,状态更新
                registerConnect();
                //准备请求,改变连接关注事件
                prepareSendRequest();
                //事件监听
                poll();
            }
        }

        /**
         * @description: 注册连接
         * @param: * @param:
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:33
         */
        private void registerConnect() {
            Host host = null;
            while ((host = waitingConnectHosts.poll()) != null) {
                try {
                    SocketChannel channel = SocketChannel.open();
                    channel.configureBlocking(false);
                    channel.connect(new InetSocketAddress(host.getIp(), host.getPort()));
                    channel.register(selector, SelectionKey.OP_CONNECT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * @description: 准备请求
         * @param: * @param:
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:22
         */
        private void prepareSendRequest(){
            for (Map.Entry<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> node : waitingRequests.entrySet()) {
                //该连接已成功连接,并且有请求任务
                int count = 0;
                if (!node.getValue().isEmpty() && Constant.SUCCESS_CONNECT == connectState.get(node.getKey())){
                    System.out.println(">>>>>>>>>>>>准备请求");
                    ConcurrentLinkedQueue<NetWorkClientRequest> value = node.getValue();
                    ConcurrentLinkedQueue<NetWorkClientRequest> toSend = toSendRequests.get(node.getKey());
                    NetWorkClientRequest request = null;
                    while (count < Constant.MAX_SEND_REQUEST_SIZE && (request = value.poll()) != null){
                        count++;
                        System.out.println(">>>>>>>>>>>>加入toSendRequests请求池");
                        toSend.offer(request);
                    }
                    if (count != 0){
                        SelectionKey key = connections.get(node.getKey());
                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    }
                }
            }
        }

        /**
         * @description: 请求/响应读写
         * @param: * @param:
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:33
         */
        private void poll() {
            try {
                int select = selector.select(Constant.POLL_BLOCK_MAX_TIME);
                if (select > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        SocketChannel channel = (SocketChannel) key.channel();
                        if (key.isConnectable()) {
                            System.out.println(">>>>>>>>>>>>触发Connect操作");
                            finishConnect(key,channel);
                        }
                        if (key.isWritable()){
                            System.out.println(">>>>>>>>>>>>触发Write操作");
                            sendRequest(key,channel);
                        }
                        if (key.isReadable()){
                            System.out.println(">>>>>>>>>>>>触发Read操作");
                            readResponse(key, channel);
                        }
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * @description: 完成连接
         * @param: * @param: key
         * @param: channel
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:11
         */
        private void finishConnect(SelectionKey key,SocketChannel channel) {
            Host host = null;
            try {
                host = getHostByChannel(channel);
                if (channel.finishConnect()) {
                    System.out.println(host + ">>>>>>>>>>>>完成连接操作");
                    //修改连接状态
                    connectState.put(host,Constant.SUCCESS_CONNECT);
                    System.out.println(host + ">>>>>>>>>>>>完成连接状态重置");
                    //初始化请求队列
                    waitingRequests.put(host, new ConcurrentLinkedQueue<>());
                    System.out.println(host + ">>>>>>>>>>>>完成初始化请求队列");
                    //初始化发送请求队列
                    toSendRequests.put(host, new ConcurrentLinkedQueue<>());
                    System.out.println(host + ">>>>>>>>>>>>完成初始化发送请求队列");
                    //初始化响应集合
                    finishedResponses.put(host,new ConcurrentHashMap<>());
                    System.out.println(host + ">>>>>>>>>>>>完成初始化响应集合");
                    //增加连接映射关系
                    connections.put(host,key);
                    System.out.println(host + ">>>>>>>>>>>>完成加入连接集合");
                }
            } catch (Exception e) {
                e.printStackTrace();
                connectState.put(host, Constant.FAIL_CONNECT);
            }
        }

        /**
         * @description: 请求发送
         * @param: * @param: key
         * @param: channel
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:34
         */
        private void sendRequest(SelectionKey key, SocketChannel channel) {
            try {
                Host host = getHostByChannel(channel);
                ConcurrentLinkedQueue<NetWorkClientRequest> netWorkClientRequests = toSendRequests.get(host);
                NetWorkClientRequest request = null;
                while ((request = netWorkClientRequests.poll()) != null){
                    ByteBuffer buffer = request.getBuffer();
                    buffer.flip();
                    while (buffer.hasRemaining()){
                        channel.write(buffer);
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            //取消关注写事件
            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
        }

        /**
         * @description: 响应解析
         * @param: * @param: key
         * @param: channel
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:40
         */
        private void readResponse(SelectionKey key, SocketChannel channel) {
            try {
                Host host = getHostByChannel(channel);
                NetWorkClientRespond respond = unfinishedResponses.get(host);
                if (respond == null){
                    respond = new NetWorkClientRespond();
                    unfinishedResponses.put(host,respond);
                }
                //读取请求ID
                if (respond.getRequestBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getRequestBuffer());
                    if (respond.getRequestBuffer().hasRemaining()){
                        return;
                    }
                }
                if (respond.getRequestId() == null){
                    respond.getRequestBuffer().flip();
                    respond.setRequestId(new String(respond.getRequestBuffer().array()));
                }

                //读取响应状态
                if (respond.getStatusBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getStatusBuffer());
                    if (respond.getStatusBuffer().hasRemaining()){
                        return;
                    }
                }
                if (respond.getStatus() == null){
                    respond.getStatusBuffer().flip();
                    respond.setStatus(respond.getStatusBuffer().getInt(0));
                }

                //读取响应长度
                if (respond.getContentLengthBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getContentLengthBuffer());
                    if (respond.getContentLengthBuffer().hasRemaining()){
                        return;
                    }
                }
                if (respond.getContentLength() == null){
                    respond.getContentLengthBuffer().flip();
                    respond.setContentLength(respond.getContentLengthBuffer().getInt(0));
                }

                //读取响应内容
                if (respond.getByteBuffer() == null){
                    respond.setByteBuffer(ByteBuffer.allocate(respond.getContentLength()));
                }
                if (respond.getByteBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getByteBuffer());
                }
                if (!respond.getByteBuffer().hasRemaining()){
                    respond.getByteBuffer().flip();
                    respond.setFinished(true);
                    unfinishedResponses.remove(host);
                    if (callBackMap.containsKey(respond.getRequestId())){
                        callBackMap.get(respond.getRequestId()).process(respond);
                    }else {
                        finishedResponses.get(host).put(respond.getRequestId(),respond);
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        /** 
         * @description: 尝试读满数据 
         * @param: * @param: channel
         * @param: buffer 
         * @return: void 
         * @author zhuyuemufeng
         * @date: 2021-07-21 8:51
         */
        private void tryBestRead(SocketChannel channel,ByteBuffer buffer) throws Exception {
            int count = 0;
            while ((count = channel.read(buffer)) > 0){}
        }

        /**
         * @description: 从channel中获取Host
         * @param: * @param: channel
         * @return: org.zymf.nio.example3.client.Host
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:46
         */
        private Host getHostByChannel(SocketChannel channel){
            InetSocketAddress remoteAddress = null;
            Host host = null;
            try {
                remoteAddress = (InetSocketAddress) channel.getRemoteAddress();
                host = new Host(remoteAddress.getHostName(), remoteAddress.getPort());
            }catch (Exception e){
                throw new RuntimeException(e);
            }
            return host;
        }
    }
}
NetWorkClientRequest
package org.zymf.nio.example3.client;

import java.nio.ByteBuffer;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: 客户端发送请求
 * @date 2021-07-20 20:52
 */
public class NetWorkClientRequest {

    private String requestId;

    private Host host;

    private long sendTime;

    private ByteBuffer buffer;

    private boolean sync;

    private NetWorkRespondCallBack netWorkRespondCallBack;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Host getHost() {
        return host;
    }

    public void setHost(Host host) {
        this.host = host;
    }

    public long getSendTime() {
        return sendTime;
    }

    public void setSendTime(long sendTime) {
        this.sendTime = sendTime;
    }

    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public boolean isSync() {
        return sync;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public NetWorkRespondCallBack getNetWorkRespondCallBack() {
        return netWorkRespondCallBack;
    }

    public void setNetWorkRespondCallBack(NetWorkRespondCallBack netWorkRespondCallBack) {
        this.netWorkRespondCallBack = netWorkRespondCallBack;
    }
}
testLoadFileDemo
        Long start = System.currentTimeMillis();
        NetworkManager manager = new NetworkManager();
        List<NetWorkClientRequest> sendRequest = new ArrayList<>();
        for (int i = 1; i < 301; i++) {
            Host host = new Host("localhost", 7899);
            //尝试连接
            manager.tryConnect(host);
            //等待连接完成
            manager.finishConnect(host, true);
            String requestId = UUID.randomUUID().toString().replace("-", "");
            ByteBuffer fileUpload = RequestBufferBuilder.createFileUpload(new File("E:\\oss\\netty\\img\\" + i + ".jpg")
                    , i + ".jpg",
                    requestId);
            NetWorkClientRequest netWorkClientRequest = new NetWorkClientRequest();
            netWorkClientRequest.setBuffer(fileUpload);
            netWorkClientRequest.setHost(host);
            netWorkClientRequest.setRequestId(requestId);
            netWorkClientRequest.setSendTime(System.currentTimeMillis());
            //发送请求
            manager.sendRequest(netWorkClientRequest);
            sendRequest.add(netWorkClientRequest);
        }
        for (NetWorkClientRequest request : sendRequest) {
            //等待响应返回
            NetWorkClientRespond respond = manager.waitResponseSync(request);
            System.out.println(respond.getStatus());
           /*  System.out.println(respond.getRequestId());
            System.out.println(new String(respond.getByteBuffer().array()));*/
        }
        long l = System.currentTimeMillis() - start;
        System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>: " + l);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容