本文翻译自Introduction to the Java NIO Selector | Baeldung
1. 概述
本文将探讨Java NIO’s Selector组件的介绍。
Selector提供了一种机制,用于监视一个或多个 NIO 通道,识别一个或多个通道何时可用于数据传输。
这样就可以实现单个线程可以管理多个通道,从而管理多个网络连接。
2. 为什么使用Selector
通过Selector,我们可以使用一个而不是多个线程来管理多个通道。线程之间的上下文切换对于操作系统来说是昂贵的,并且每个线程都会占用内存。
因此,我们使用的线程越少越好。值得一提的是,现代操作系统和 CPU 在多任务处理方面变得越来越好,因此多线程的开销随着时间的推移不断减少。
在这里,我们将讨论如何使用Selector通过单个线程处理多个通道。
另请注意,Selector不仅可以帮你读数据,还可以监听传入的网络连接并通过慢速通道写数据。
3. 设置
要使用Selector,我们不需要任何特殊设置。我们需要的所有类都在核心java.nio包中,我们只需导入我们需要的类即可。
之后,我们可以使用Selector对象注册多个通道。当任何通道上发生 I/O 活动时,Selector会通知我们。这就是我们如何在单个线程上读取大量数据源的方法。
我们向Selector注册的任何通道都必须是SelectableChannel的子类,它是一种特殊类型的通道,可以置于非阻塞模式。
4. 创建Selector
可以通过调用Selector类的静态方法open来创Selector,该方法将使用系统默认的Selector提供程序来创建新的Selector:
Selector selector = Selector.open();
5. 注册SelectableChannel
为了让Selector监视通道,我们必须向Selector注册这些通道。我们通过调用SelectableChannel的register方法来做到这一点。
但在通道注册到Selector之前,它必须处于非阻塞模式:
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
我们不能将FileChannel与Selector一起使用,因为FileChannel无法像我们处理套接字通道那样切换到非阻塞模式。
register方法的第一个参数是我们之前创建的Selector对象,第二个参数定义了一个兴趣集,用来监听channel中我们感兴趣的事件。
我们可以监听四种不同的事件,每个事件都由SelectionKey类中的常量表示:
- Connect–当客户端尝试连接到服务器时。由SelectionKey.OP_CONNECT表示
- Accept–当服务器接受来自客户端的连接时。由SelectionKey.OP_ACCEPT表示
- Read–当服务器准备好从通道读取时。由SelectionKey.OP_READ表示
- Write–当服务器准备好写入通道时。由SelectionKey.OP_WRITE表示
返回的对象SelectionKey表示SelectableChannel向Selector的注册信息。我们将在下一节中进一步讨论它。
6. SelectionKey对象
正如我们在上一节中看到的,当我们向Selector注册channel 时,我们会得到一个SelectionKey对象。该对象保存表示通道注册的数据。
它包含一些重要的属性,我们必须充分理解这些属性才能在channel上使用selector。我们将在下面的小节中讨论这些属性。
6.1. 兴趣集
兴趣集定义了我们希望Selector此通道上要注意的事件集合。它是一个整数值,我们可以通过以下方式获取这些信息。
首先,我们通过SelectionKey的interestOps方法返回事件集,通过前面的章节,我们知道了SelectionKey中有事件常量。
然后,我们对这两个值进行与运算,我们会得到一个布尔值,这个值表征了该事件是否正在被监听:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
6.2. 就绪集
就绪集定义了通道已准备好的事件集合。它也是一个整数值,我们可以通过以下方式获取这些信息。
我们通过SelectionKey的readOps方法返回就绪集,我们像在兴趣集的那样将该值与事件常量相与后,我们会得到一个布尔值,这个值表征了通道是否准备好接收特定值。
另一种更为简单的方法是使用SelectionKey的便捷方法来实现相同的目的:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();
6.3.通道
从SelectionKey对象获取到正在被观察的channel非常简单。我们只需调用channel方法:
Channel channel = key.channel();
6.4. Selector
就像获取通道那样,我们也非常容易从SelectionKey 中获取到Selector对象。
Selector selector = key.selector();
6.5. 附加对象
有时我们可能想要为通道提供自定义 ID 或附加我们可能想要跟踪的任何类型的 Java 对象,这时我们可以将一个对象附加到SelectionKey
附加对象是一种方便的方法。以下是从SelectionKey附加和获取对象的方法:
key.attach(Object);
Object object = key.attachment();
或者,我们可以选择在通道注册期间附加一个对象。我们将其作为第三个参数添加到通道的register方法中,如下所示:
SelectionKey key = channel.register(
selector, SelectionKey.OP_ACCEPT, object);
7. Channel Key Selection
到目前为止,我们已经了解了如何创建Selector、向其注册Channel以及检查表征Channel向Selector注册过的SelectionKey对象的属性。
这只是过程的一半,现在我们必须执行一个连续的过程来选择我们之前看到的就绪集。我们使用Selector的select方法进行选择,如下所示:
int channels = selector.select();
此方法会阻塞,直到至少一个通道准备好执行操作。返回的整数表示通道已准备好进行操作的key的数量。
接下来,我们通常检索选定的key集合进行处理:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
我们获得的集合是SelectionKey对象,每个key代表一个已注册的通道,准备进行操作。
之后,我们通常会迭代这个集合,对于每个key,我们获取通道并执行我们感兴趣的集合中出现的任何操作。
在通道的生命周期内,它可能会被选择多次,因为它的key出现在不同事件的就绪集中。这就是为什么我们必须有一个连续的循环来捕获和处理通道事件。
8. 完整示例
为了巩固我们在前面几节中获得的知识,我们将构建一个完整的客户端-服务器示例。
为了便于测试我们的代码,我们将构建一个回显服务器和一个回显客户端。在这种设置中,客户端连接到服务器并开始向其发送消息。服务器回显每个客户端发送的消息。
当服务器遇到特定消息(例如end )时,会将其解释为通信结束并关闭与客户端的连接。
8.1. 服务端
EchoServer.java:
public class EchoServer {
private static final String POISON_PILL = "POISON_PILL";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel client = (SocketChannel) key.channel();
int r = client.read(buffer);
if (r == -1 || new String(buffer.array()).trim().equals(POISON_PILL)) {
client.close();
System.out.println("Not accepting client messages anymore");
}
else {
buffer.flip();
client.write(buffer);
buffer.clear();
}
}
private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
public static Process start() throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();
ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);
return builder.start();
}
}
这里, 我们通过调用静态方法open创建一个Selector对象。然后,我们还通过调用ServerSocketChannel的静态方法open来创建一个其通道实例。
这是因为ServerSocketChannel是可选的并且适合面向流的监听套接字。
然后我们将其绑定到我们选择的端口。请记住我们之前说过,在将可选通道注册到Selector之前,我们必须首先将其设置为非阻塞模式。接下来我们执行此操作,然后将通道注册到Selector。
我们现阶段不需要该通道的SelectionKey实例,因此我们不会关注到它。
Java NIO 使用面向缓冲区的模型而不是面向流的模型。因此,套接字通信通常是通过写入和读取缓冲区来进行的。
因此,我们创建一个新的ByteBuffer,服务器将对其进行写入和读取。我们将其初始化为 256 字节,根据我们计划来回传输的数据量,我们可以修改成其他大小的数值字节数。
最后,我们执行选择过程。我们选择准备好的通道,检索它们的选择键,迭代这些键,并执行每个通道准备好的操作。
我们在无限循环中执行此操作,因为无论是否有活动,服务器通常都需要保持运行。
ServerSocketChannel可以处理的唯一操作是ACCEPT操作。当我们接受来自客户端的连接时,我们获得一个可以进行读写操作的SocketChannel对象。我们将其设置为非阻塞模式并将其注册为Selector的 READ 操作。
在后续选择中,新通道将变为可读状态。我们检索它并将其内容读入缓冲区。确实,作为回显服务器,我们必须将此内容写回客户端。
当我们想要写入正在读取的缓冲区时,我们必须调用Flip()方法**。
最后,我们通过调用Flip方法将缓冲区设置为写入模式,然后简单地写入它。
定义start ()方法是为了在单元测试期间可以将 echo 服务器作为单独的进程启动。
8.2. 客户端
EchoClient.java:
public class EchoClient {
private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;
public static EchoClient start() {
if (instance == null)
instance = new EchoClient();
return instance;
}
public static void stop() throws IOException {
client.close();
buffer = null;
}
private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}
public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}
客户端比服务器简单。
我们使用单例模式在start静态方法中实例化它,在start方法中调用私有构造函数。
在私有构造函数中,我们在绑定服务器通道的同一端口上打开一个连接,并且仍在同一主机上。
然后我们创建一个可以写入和读取的缓冲区。
最后,我们有一个sendMessage方法,它读取将我们传递给它的任何字符串包装到字节缓冲区中,该缓冲区通过通道传输到服务器。
然后我们从客户端通道读取以获取服务器发送的消息。我们将此作为我们消息的回声返回。
8.3. 测试
在名为EchoTest.java的类中,我们将创建一个测试用例,用于启动服务器、向服务器发送消息,并且仅在从服务器接收回相同消息时才通过。作为最后一步,测试用例在完成之前停止服务器。
我们现在可以运行测试:
public class EchoTest {
Process server;
EchoClient client;
@Before
public void setup() throws IOException, InterruptedException {
server = EchoServer.start();
client = EchoClient.start();
}
@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");
assertEquals("hello", resp1);
assertEquals("world", resp2);
}
@After
public void teardown() throws IOException {
server.destroy();
EchoClient.stop();
}
}
9. Selector.wakeup()
正如我们之前看到的,调用selector.select()会阻塞当前线程,直到监视的通道之一变为可操作状态。我们可以通过从另一个线程调用selector.wakeup()来覆盖它。
结果是阻塞线程立即返回,而不是继续等待,无论通道是否已就绪。
我们可以使用CountDownLatch并跟踪代码执行步骤来演示这一点:
@Test
public void whenWakeUpCalledOnSelector_thenBlockedThreadReturns() {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel channel = pipe.source();
channel.configureBlocking(false);
channel.register(selector, OP_READ);
List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
invocationStepsTracker.add(">> Count down");
latch.countDown();
try {
invocationStepsTracker.add(">> Start select");
selector.select();
invocationStepsTracker.add(">> End select");
} catch (IOException e) {
e.printStackTrace();
}
}).start();
invocationStepsTracker.add(">> Start await");
latch.await();
invocationStepsTracker.add(">> End await");
invocationStepsTracker.add(">> Wakeup thread");
selector.wakeup();
//clean up
channel.close();
assertThat(invocationStepsTracker)
.containsExactly(
">> Start await",
">> Count down",
">> Start select",
">> End await",
">> Wakeup thread",
">> End select"
);
}
在这个例子中,我们使用Java NIO的Pipe类打开一个通道用于测试目的。我们在线程安全列表中跟踪代码执行步骤。通过分析这几个步骤,我们可以看到selector.wakeup()是如何释放被selector.select()阻塞的线程的。
10. 结论
在本文中,我们介绍了 Java NIO Selector 组件的基本用法。
我的GitHub 项目中提供了本文的完整源代码和所有代码片段。(注:推荐阅读里面的其他相关文章
)