【翻译】Java NIO Selector简介

本文翻译自Introduction to the Java NIO Selector | Baeldung

1. 概述

本文将探讨Java NIO’s Selector组件的介绍。

Selector提供了一种机制,用于监视一个或多个 NIO 通道,识别一个或多个通道何时可用于数据传输。

这样就可以实现单个线程可以管理多个通道,从而管理多个网络连接。

2. 为什么使用Selector

通过Selector,我们可以使用一个而不是多个线程来管理多个通道。线程之间的上下文切换对于操作系统来说是昂贵的,并且每个线程都会占用内存

因此,我们使用的线程越少越好。值得一提的是,现代操作系统和 CPU 在多任务处理方面变得越来越好,因此多线程的开销随着时间的推移不断减少。

在这里,我们将讨论如何使用Selector通过单个线程处理多个通道。

另请注意,Selector不仅可以帮你读数据,还可以监听传入的网络连接并通过慢速通道写数据。

3. 设置

要使用Selector,我们不需要任何特殊设置。我们需要的所有类都在核心java.nio包中,我们只需导入我们需要的类即可。

之后,我们可以使用Selector对象注册多个通道。当任何通道上发生 I/O 活动时,Selector会通知我们。这就是我们如何在单个线程上读取大量数据源的方法。

我们向Selector注册的任何通道都必须是SelectableChannel的子类,它是一种特殊类型的通道,可以置于非阻塞模式。

4. 创建Selector

可以通过调用Selector类的静态方法open来创Selector,该方法将使用系统默认的Selector提供程序来创建新的Selector:

Selector selector = Selector.open();

5. 注册SelectableChannel

为了让Selector监视通道,我们必须向Selector注册这些通道。我们通过调用SelectableChannel的register方法来做到这一点。

但在通道注册到Selector之前,它必须处于非阻塞模式:

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

我们不能将FileChannel与Selector一起使用,因为FileChannel无法像我们处理套接字通道那样切换到非阻塞模式。

register方法的第一个参数是我们之前创建的Selector对象,第二个参数定义了一个兴趣集,用来监听channel中我们感兴趣的事件。

我们可以监听四种不同的事件,每个事件都由SelectionKey类中的常量表示:

  • Connect当客户端尝试连接到服务器时。由SelectionKey.OP_CONNECT表示
  • Accept当服务器接受来自客户端的连接时。由SelectionKey.OP_ACCEPT表示
  • Read当服务器准备好从通道读取时。由SelectionKey.OP_READ表示
  • Write当服务器准备好写入通道时。由SelectionKey.OP_WRITE表示

返回的对象SelectionKey表示SelectableChannel向Selector的注册信息。我们将在下一节中进一步讨论它。

6. SelectionKey对象

正如我们在上一节中看到的,当我们向Selector注册channel 时,我们会得到一个SelectionKey对象。该对象保存表示通道注册的数据。

它包含一些重要的属性,我们必须充分理解这些属性才能在channel上使用selector。我们将在下面的小节中讨论这些属性。

6.1. 兴趣集

兴趣集定义了我们希望Selector此通道上要注意的事件集合。它是一个整数值,我们可以通过以下方式获取这些信息。

首先,我们通过SelectionKeyinterestOps方法返回事件集,通过前面的章节,我们知道了SelectionKey中有事件常量。

然后,我们对这两个值进行与运算,我们会得到一个布尔值,这个值表征了该事件是否正在被监听:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

6.2. 就绪集

就绪集定义了通道已准备好的事件集合。它也是一个整数值,我们可以通过以下方式获取这些信息。

我们通过SelectionKeyreadOps方法返回就绪集,我们像在兴趣集的那样将该值与事件常量相与后,我们会得到一个布尔值,这个值表征了通道是否准备好接收特定值。

另一种更为简单的方法是使用SelectionKey的便捷方法来实现相同的目的:

selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

6.3.通道

从SelectionKey对象获取到正在被观察的channel非常简单。我们只需调用channel方法:

Channel channel = key.channel();

6.4. Selector

就像获取通道那样,我们也非常容易从SelectionKey 中获取到Selector对象。

Selector selector = key.selector();

6.5. 附加对象

有时我们可能想要为通道提供自定义 ID 或附加我们可能想要跟踪的任何类型的 Java 对象,这时我们可以将一个对象附加到SelectionKey

附加对象是一种方便的方法。以下是从SelectionKey附加和获取对象的方法:

key.attach(Object);

Object object = key.attachment();

或者,我们可以选择在通道注册期间附加一个对象。我们将其作为第三个参数添加到通道的register方法中,如下所示:

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

7. Channel Key Selection

到目前为止,我们已经了解了如何创建Selector、向其注册Channel以及检查表征Channel向Selector注册过的SelectionKey对象的属性。

这只是过程的一半,现在我们必须执行一个连续的过程来选择我们之前看到的就绪集。我们使用Selector的select方法进行选择,如下所示:

int channels = selector.select();

此方法会阻塞,直到至少一个通道准备好执行操作。返回的整数表示通道已准备好进行操作的key的数量。

接下来,我们通常检索选定的key集合进行处理:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

我们获得的集合是SelectionKey对象,每个key代表一个已注册的通道,准备进行操作。

之后,我们通常会迭代这个集合,对于每个key,我们获取通道并执行我们感兴趣的集合中出现的任何操作。

在通道的生命周期内,它可能会被选择多次,因为它的key出现在不同事件的就绪集中。这就是为什么我们必须有一个连续的循环来捕获和处理通道事件。

8. 完整示例

为了巩固我们在前面几节中获得的知识,我们将构建一个完整的客户端-服务器示例。

为了便于测试我们的代码,我们将构建一个回显服务器和一个回显客户端。在这种设置中,客户端连接到服务器并开始向其发送消息。服务器回显每个客户端发送的消息。

当服务器遇到特定消息(例如end )时,会将其解释为通信结束并关闭与客户端的连接。

8.1. 服务端

EchoServer.java:

public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        int r = client.read(buffer);
        if (r == -1 || new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

这里, 我们通过调用静态方法open创建一个Selector对象。然后,我们还通过调用ServerSocketChannel的静态方法open来创建一个其通道实例。

这是因为ServerSocketChannel是可选的并且适合面向流的监听套接字

然后我们将其绑定到我们选择的端口。请记住我们之前说过,在将可选通道注册到Selector之前,我们必须首先将其设置为非阻塞模式。接下来我们执行此操作,然后将通道注册到Selector。

我们现阶段不需要该通道的SelectionKey实例,因此我们不会关注到它。

Java NIO 使用面向缓冲区的模型而不是面向流的模型。因此,套接字通信通常是通过写入和读取缓冲区来进行的。

因此,我们创建一个新的ByteBuffer,服务器将对其进行写入和读取。我们将其初始化为 256 字节,根据我们计划来回传输的数据量,我们可以修改成其他大小的数值字节数。

最后,我们执行选择过程。我们选择准备好的通道,检索它们的选择键,迭代这些键,并执行每个通道准备好的操作。

我们在无限循环中执行此操作,因为无论是否有活动,服务器通常都需要保持运行。

ServerSocketChannel可以处理的唯一操作是ACCEPT操作。当我们接受来自客户端的连接时,我们获得一个可以进行读写操作的SocketChannel对象。我们将其设置为非阻塞模式并将其注册为Selector的 READ 操作。

在后续选择中,新通道将变为可读状态。我们检索它并将其内容读入缓冲区。确实,作为回显服务器,我们必须将此内容写回客户端。

当我们想要写入正在读取的缓冲区时,我们必须调用Flip()方法**。

最后,我们通过调用Flip方法将缓冲区设置为写入模式,然后简单地写入它。

定义start ()方法是为了在单元测试期间可以将 echo 服务器作为单独的进程启动。

8.2. 客户端

EchoClient.java:

public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

客户端比服务器简单。

我们使用单例模式在start静态方法中实例化它,在start方法中调用私有构造函数。

在私有构造函数中,我们在绑定服务器通道的同一端口上打开一个连接,并且仍在同一主机上。

然后我们创建一个可以写入和读取的缓冲区。

最后,我们有一个sendMessage方法,它读取将我们传递给它的任何字符串包装到字节缓冲区中,该缓冲区通过通道传输到服务器。

然后我们从客户端通道读取以获取服务器发送的消息。我们将此作为我们消息的回声返回。

8.3. 测试

在名为EchoTest.java的类中,我们将创建一个测试用例,用于启动服务器、向服务器发送消息,并且仅在从服务器接收回相同消息时才通过。作为最后一步,测试用例在完成之前停止服务器。

我们现在可以运行测试:

public class EchoTest {

    Process server;
    EchoClient client;

    @Before
    public void setup() throws IOException, InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }

    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {
        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        assertEquals("hello", resp1);
        assertEquals("world", resp2);
    }

    @After
    public void teardown() throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}

9. Selector.wakeup()

正如我们之前看到的,调用selector.select()会阻塞当前线程,直到监视的通道之一变为可操作状态。我们可以通过从另一个线程调用selector.wakeup()来覆盖它。

结果是阻塞线程立即返回,而不是继续等待,无论通道是否已就绪

我们可以使用CountDownLatch并跟踪代码执行步骤来演示这一点:

@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
    Pipe pipe = Pipe.open();
    Selector selector = Selector.open();
    SelectableChannel channel = pipe.source();
    channel.configureBlocking(false);
    channel.register(selector, OP_READ);

    List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

    CountDownLatch latch = new CountDownLatch(1);

    new Thread(() -> {
        invocationStepsTracker.add(">> Count down");
        latch.countDown();
        try {
            invocationStepsTracker.add(">> Start select");
            selector.select();
            invocationStepsTracker.add(">> End select");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }).start();

    invocationStepsTracker.add(">> Start await");
    latch.await();
    invocationStepsTracker.add(">> End await");

    invocationStepsTracker.add(">> Wakeup thread");
    selector.wakeup();
    //clean up
    channel.close();

    assertThat(invocationStepsTracker)
      .containsExactly(
        ">> Start await",
        ">> Count down",
        ">> Start select",
        ">> End await",
        ">> Wakeup thread",
        ">> End select"
    );
}

在这个例子中,我们使用Java NIO的Pipe类打开一个通道用于测试目的。我们在线程安全列表中跟踪代码执行步骤。通过分析这几个步骤,我们可以看到selector.wakeup()是如何释放被selector.select()阻塞的线程的。

10. 结论

在本文中,我们介绍了 Java NIO Selector 组件的基本用法。

我的GitHub 项目中提供了本文的完整源代码和所有代码片段。(注:推荐阅读里面的其他相关文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容