1、环境
//netty实现版本 下载地址 https://github.com/netty/netty-incubator-transport-io_uring
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.26.Final-SNAPSHOT</version>
<classifier>linux-x86_64</classifier>
</dependency>
//内核版本
linux-6.2.1 下载地址 https://mirrors.edge.kernel.org/pub/linux/kernel/v6.x/linux-6.2.1.tar.gz
2、java是用案例
//以官方案例入手
public class EchoIOUringServer {
private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String []args) {
//使用方式与Epoll相同支持此处新实现了IOUringEventLoopGroup
EventLoopGroup bossGroup = new IOUringEventLoopGroup(1);
EventLoopGroup workerGroup = new IOUringEventLoopGroup(1);
final EchoIOUringServerHandler serverHandler = new EchoIOUringServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//其次它的channel也是新实现的IOUringServerSocketChannel
.channel(IOUringServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
...
}
根据上方代码得出两个关键入口IOUringEventLoopGroup与IOUringServerSocketChannel。
3、源码实现
public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
...
@Override
//在创建group线程组的时候会构建出执行器而构建方法就是newChild,若对着方面不清楚的可以先学习下netty基础
protected EventLoop newChild(Executor executor, Object... args) {
if (args.length != 4) {
throw new IllegalArgumentException("Illegal amount of extra arguments");
}
//读取ringSize的大小 用来限制iouring中接收请求大小内核中将会设置为2的倍数roundup_pow_of_two 后续介绍
int ringSize = ObjectUtil.checkPositiveOrZero((Integer) args[0], "ringSize");
if (ringSize == 0) {
ringSize = Native.DEFAULT_RING_SIZE;
}
int iosqeAsyncThreshold = ObjectUtil.checkPositiveOrZero((Integer) args[1], "iosqeAsyncThreshold");
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
return new IOUringEventLoop(this, executor, ringSize, iosqeAsyncThreshold,
rejectedExecutionHandler, taskQueueFactory);
}
}
//在EchoIOUringServer中最后一行代码是 b.bind(PORT).sync();它进入此函数
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
//最终调用到此处
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//doBind0中会调用regFuture中Channel的bind(这里很重要)
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
...
//构建channel, 此处由于.channel(IOUringServerSocketChannel.class)的封装所以理解为new IOUringServerSocketChannel即可
channel = channelFactory.newChannel();
//register很重要将触发accept的注册
ChannelFuture regFuture = config().group().register(channel);
}
}
//由此进入核心类
public final class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel {
private final IOUringServerSocketChannelConfig config;
public IOUringServerSocketChannel() {
//首先创建一个socket对象核心是一个fd标识
//Netty实现的Socket类中的newSocketStreamFd(boolean var0);方法
super(LinuxSocket.newSocketStream(), false);
}
//当有新的链接时需要创建一个新的channel则是此方法
@Override
Channel newChildChannel(int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) {
...
return new IOUringSocketChannel(this, new LinuxSocket(fd), address);
}
//前文doBind0中会调用此函数
@Override
public void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);
if (IOUring.isTcpFastOpenServerSideAvailable()) {
int fastOpen = config().getTcpFastopen();
if (fastOpen > 0) {
socket.setTcpFastOpen(fastOpen);
}
}
socket.listen(config.getBacklog());
active = true;
}
}
//上方类super
abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
private final ByteBuffer acceptedAddressMemory;
private final ByteBuffer acceptedAddressLengthMemory;
private final long acceptedAddressMemoryAddress;
private final long acceptedAddressLengthMemoryAddress;
protected AbstractIOUringServerChannel(LinuxSocket socket, boolean active) {
super(null, socket, active);
//分配一个struct sockaddr_storage结构大小的内存,用于存储接收连接地址信息
acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
//根据分配buffer获取内存地址,以便后续jni直接使用
acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
//上方结构是最新的结构,兼容目前所有的协议ipv4/ipv6等,但是每种协议的长度不同
//故而需要在此处在分配一个存储长度的空间,此处使用Long的长度为8个字节
acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
// Needs to be initialized to the size of acceptedAddressMemory.
// See https://man7.org/linux/man-pages/man2/accept.2.html
//当内核接收到请求后默认读取大小为前方结构的大小SIZEOF_SOCKADDR_STORAGE
acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
//将长度赋值给Address
acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
}
}
至此server的fd已经构建完成,当触发 config().group().register(channel);调用时将会进入io.netty.incubator.channel.uring.AbstractIOUringStreamChannel#doRegister函数再内部调用schedulePollRdHup函数
//此函数在AbstractIOUringChannel中定义
final void schedulePollRdHup() {
assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
//而当前对象是IOUringServerSocketChannel,故而此处的fd便是前方创建server的fd
IOUringSubmissionQueue submissionQueue = submissionQueue();
//有意思的是此处添加的事件并发accept,而是addPollRdHup
submissionQueue.addPollRdHup(fd().intValue());
ioState |= POLL_RDHUP_SCHEDULED;
}
boolean addPollRdHup(int fd) {
return addPoll(fd, Native.POLLRDHUP);
}
//最终添加的sqe事件为IORING_OP_POLL_ADD,用来监听server fd的任何事件 包括accept
private boolean addPoll(int fd, int pollMask) {
return enqueueSqe(Native.IORING_OP_POLL_ADD, 0, pollMask, fd, 0, 0, 0, (short) pollMask);
}
server的监听事件至此构建完成。接下来继续完善IOUringEventLoop的构建来讲述IOUringSubmissionQueue的来源。
//在group中每一个线程都是一个IOUringEventLoop,也就代表RingBuffer每个线程都有一个各自的
public final class IOUringEventLoop extends SingleThreadEventLoop {
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<>(4096);
private final RingBuffer ringBuffer;
private final FileDescriptor eventfd;
// The maximum number of bytes for an InetAddress / Inet6Address
private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
private final IOUringCompletionQueueCallback callback = IOUringEventLoop.this::handle;
private final Runnable submitIOTask = () -> getRingBuffer().ioUringSubmissionQueue().submit();
IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, int iosqeAsyncThreshold,
RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
//检测iouring是否可用,内部static代码块尝试构建一个ringbuffer所以只会检测一次也即IOUring加载的一次
IOUring.ensureAvailability();
//创建当前线程的ringBuffer
ringBuffer = Native.createRingBuffer(ringSize, iosqeAsyncThreshold);
//当前线程在执行时若存在空队列那么将会睡眠,此fd是兜底方便外部线程唤醒使用
eventfd = Native.newBlockingEventFd();
logger.trace("New EventLoop: {}", this.toString());
}
//io_uring创建的核心方法
static RingBuffer createRingBuffer(int ringSize, int iosqeAsyncThreshold) {
//调用本地方法创建iouring,并且返回数组0是提交队列的配置信息,1是完成队列的提交信息
long[][] values = ioUringSetup(ringSize);
assert values.length == 2;
long[] submissionQueueArgs = values[0];
assert submissionQueueArgs.length == 11;
//通过返回的信息创建出对应的提交队列和完成队列,核心并不再次而在ioUringSetup
IOUringSubmissionQueue submissionQueue = new IOUringSubmissionQueue(...);
long[] completionQueueArgs = values[1];
assert completionQueueArgs.length == 9;
IOUringCompletionQueue completionQueue = new IOUringCompletionQueue(...);
return new RingBuffer(submissionQueue, completionQueue);
}
接下来根据上方的本地方法调用进入c代码中
{"ioUringSetup", "(I)[[J", (void *) netty_io_uring_setup};
static jobjectArray netty_io_uring_setup(JNIEnv *env, jclass clazz, jint entries) {
struct io_uring_params p;
memset(&p, 0, sizeof(p));
//首先创建一个数组其长度为2,与上方long第一维数组所对应
jobjectArray array = (*env)->NewObjectArray(env, 2, longArrayClass, NULL);
//其次创建第一维0下标的数组其长度为11也即提交队列的信息
jlongArray submissionArray = (*env)->NewLongArray(env, 11);
//完成队列信息
jlongArray completionArray = (*env)->NewLongArray(env, 9);
//根据传入的ring队列大小创建出io_uring
//其创建的参数回传给当前p,同时p也可以设置参数进去
int ring_fd = sys_io_uring_setup((int)entries, &p);
struct io_uring io_uring_ring;
//此步骤非常重要,它涉及为何我们在java层面提交事件而在内核中能够提取到
int ret = setup_io_uring(ring_fd, &io_uring_ring, &p);
//根据创建回传的信息填充到数组中,读者可以与对应的java类参数所对应
jlong submissionArrayElements[] = {
(jlong)io_uring_ring.sq.khead,
(jlong)io_uring_ring.sq.ktail,
(jlong)io_uring_ring.sq.kring_mask,
(jlong)io_uring_ring.sq.kring_entries,
(jlong)io_uring_ring.sq.kflags,
(jlong)io_uring_ring.sq.kdropped,
(jlong)io_uring_ring.sq.array,
(jlong)io_uring_ring.sq.sqes,
(jlong)io_uring_ring.sq.ring_sz,
(jlong)io_uring_ring.sq.ring_ptr,
(jlong)ring_fd
};
(*env)->SetLongArrayRegion(env, submissionArray, 0, 11, submissionArrayElements);
jlong completionArrayElements[] = {
(jlong)io_uring_ring.cq.khead,
(jlong)io_uring_ring.cq.ktail,
(jlong)io_uring_ring.cq.kring_mask,
(jlong)io_uring_ring.cq.kring_entries,
(jlong)io_uring_ring.cq.koverflow,
(jlong)io_uring_ring.cq.cqes,
(jlong)io_uring_ring.cq.ring_sz,
(jlong)io_uring_ring.cq.ring_ptr,
(jlong)ring_fd
};
(*env)->SetLongArrayRegion(env, completionArray, 0, 9, completionArrayElements);
(*env)->SetObjectArrayElement(env, array, 0, submissionArray);
(*env)->SetObjectArrayElement(env, array, 1, completionArray);
return array;
}
首先进入内核中查看sys_io_uring_setup的实现
long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
struct io_uring_params p;
int i;
//为了安全首先将用户的参数复制给内核中
if (copy_from_user(&p, params, sizeof(p)))
return -EFAULT;
for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
if (p.resv[i])
return -EINVAL;
}
//检查参数是否合法 此处均为0所以忽略
if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN))
return -EINVAL;
//创建io_uring
return io_uring_create(entries, &p, params);
}
static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params)
{
struct io_ring_ctx *ctx;
struct file *file;
int ret;
//根据传入的ringSize创建提交队列的元素数前文提到过他将是2的倍数,便在此处完成
//
p->sq_entries = roundup_pow_of_two(entries);
if (p->flags & IORING_SETUP_CQSIZE) {
...//此处是创建完成队列的个数计算忽略因为传入参数flags为0
} else {
//默认是提交队列的二倍,由此可见netty隐藏了部分配置是的不那么灵活
p->cq_entries = 2 * p->sq_entries;
}
//此处创建ctx对象与初始化内部的各种队列,暂且忽略并不影响主流程
ctx = io_ring_ctx_alloc(p);
//将sq与cq的队列大小加起来在根据各自的队列的结构创建出数组的大小
ret = io_allocate_scq_urings(ctx, p);
//io_uring有几种模式 其中有一种是创建内核线程由他帮忙处理提交的事件
//需要flags中包含IORING_SETUP_SQPOLL而ctx->flags = p->flags; 所以此处并未使用它,而是默认由应用自己处理提交
//这也是为何对于iouring并未上锁使用的原因之一,全是当前线程在处理
ret = io_sq_offload_create(ctx, p);
if (ret)
goto err;
//此处是根据p中的内容获取对应的偏移,需要注意的是此处是获取偏移而非具体地址
memset(&p->sq_off, 0, sizeof(p->sq_off));
p->sq_off.head = offsetof(struct io_rings, sq.head);
p->sq_off.tail = offsetof(struct io_rings, sq.tail);
p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
p->sq_off.flags = offsetof(struct io_rings, sq_flags);
p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
memset(&p->cq_off, 0, sizeof(p->cq_off));
p->cq_off.head = offsetof(struct io_rings, cq.head);
p->cq_off.tail = offsetof(struct io_rings, cq.tail);
p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
p->cq_off.cqes = offsetof(struct io_rings, cqes);
p->cq_off.flags = offsetof(struct io_rings, cq_flags);
p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
IORING_FEAT_LINKED_FILE;
//将p拷贝到用户传递的params中
if (copy_to_user(params, p, sizeof(*p))) {
ret = -EFAULT;
goto err;
}
//获取文件操作对象
file = io_uring_get_file(ctx);
//根据对象插入到当前进程的fd管理结构中,方便通过fd直接操作iouring,万物皆文件
ret = io_uring_install_fd(ctx, file);
trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
return ret;
}
io_uring已经创建完成。接下来讲述jvm与内核的内存映射。
//回到netty的实现
//创建后执行的配置方法
static int setup_io_uring(int ring_fd, struct io_uring *io_uring_ring,
struct io_uring_params *p) {
return io_uring_mmap(ring_fd, p, &io_uring_ring->sq, &io_uring_ring->cq);
}
static int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq, struct io_uring_cq *cq) {
size_t size;
int ret;
//根据回传的p信息获取两个队列的大小
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
//通过mmap得到队列的地址,核心在与fd与前文所提到的
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (sq->ring_ptr == MAP_FAILED) {
return -errno;
}
//根据得到的mmap的地址在加上创建时提供的偏移得到具体的地址io_uring_install_fd,稍后展开
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
size = p->sq_entries * sizeof(struct io_uring_sqe);
//完成队列也一样,所以前文netty在jni中是取sq与cq中的信息而不是内核返回的p中的(p只是对象字段的偏移,此处才是具体内存的地址计算)
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
goto err;
}
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
//至此netty结束进入内核
return 0;
}
//linux万物皆文件刚才io_uring的fd便是文件的,通过操作fd可以得到以下结构。在mmap方法中判断是否传入fd若是则会调用当前fd的file_operations
//也即以下内容
static const struct file_operations io_uring_fops = {
.release = io_uring_release,
.mmap = io_uring_mmap,
.get_unmapped_area = io_uring_nommu_get_unmapped_area,
.mmap_capabilities = io_uring_nommu_mmap_capabilities,
.poll = io_uring_poll,
.show_fdinfo = io_uring_show_fdinfo,
};
//首先mmap中会先调用get_unmapped_area然后时mmap
static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
unsigned long addr, unsigned long len,
unsigned long pgoff, unsigned long flags)
{
void *ptr;
ptr = io_uring_validate_mmap_request(file, pgoff, len);
return (unsigned long) ptr;
}
static void *io_uring_validate_mmap_request(struct file *file,
loff_t pgoff, size_t sz)
{
//在上方mmap最后一个参数传递了IORING_OFF_SQES与IORING_OFF_SQ_RING便是pgoff
struct io_ring_ctx *ctx = file->private_data;
loff_t offset = pgoff << PAGE_SHIFT;
struct page *page;
void *ptr;
//通过匹配pgoff 回去对应的偏移
switch (offset) {
case IORING_OFF_SQ_RING:
case IORING_OFF_CQ_RING:
ptr = ctx->rings; //此处直接返回的rings的首地址,而不是传统的分配内存
break;
case IORING_OFF_SQES:
ptr = ctx->sq_sqes;
break;
default:
return ERR_PTR(-EINVAL);
}
//返回该地址即可
return ptr;
}
static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
{
//设置当前内存状态为共享
return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
}
至此,ring的队列为何能在java中操作原理与io_uring创建完成。接下来讲述netty对它的使用。
//前文讲述,addPoll函数中最终调用了此方法
boolean enqueueSqe(byte op, int flags, int rwFlags, int fd,
long bufferAddress, int length, long offset, short data) {
int pending = tail - head;
boolean submit = pending == ringEntries;
//判断当前提交队列是否满了,ringEntries是允许提交的个数,ring的核心在于这个环是可复用的当尾部追到头部时代表满了
if (submit) {
//若满了则直接submit操作去完成提交任务
int submitted = submit();
if (submitted == 0) {
// We have a problem, could not submit to make more room in the ring
throw new RuntimeException("SQ ring full and no submissions accepted");
}
}
//否则根据(IOUringSubmissionQueue创建时)计算的sub地址加上尾部+1得到最新的提交地址
long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
//根据提交地址挨个设置对应的值即可
setData(sqe, op, flags, rwFlags, fd, bufferAddress, length, offset, data);
return submit;
}
private void setData(long sqe, byte op, int flags, int rwFlags, int fd, long bufferAddress, int length,
long offset, short data) {
//set sqe(submission queue) properties
//都是根据sqe基地址 通过jni直接操作内存提交
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op);
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) flags);
....
}
有意思的是,设置地址并未设置tail的值给内核,这样就代表并不会得到最新的tail,因为前文说过其submit也是netty自己完成,所以内核无需知道(内核没有线程读取队列)。由此完成了事件的提交,那什么时候执行呢?(目前只注册了server fd的监听事件)
//在IOUringEventLoop中存在run函数,是netty启动后线程池自动运行的
protected void run() {
//首先获取提交队列与完成队列
final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
final IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
//提交当前loop的fd到提交队列中,防止无法唤醒的问题
// Let's add the eventfd related events before starting to do any real work.
addEventFdRead(submissionQueue);
//提交执行
final int initialFlushResult = submissionQueue.submit();
if (initialFlushResult != 1) {
throw new AssertionError("Failed to submit EventFdRead. Result: " + initialFlushResult);
}
for (;;) {
try {
try {
if (!hasTasks()) {
//若当前完成队列为空则执行等待完成,前方的submit只是提交不管任务是否完成都会返回
if (!completionQueue.hasCompletions()) {
submissionQueue.submitAndWait();
}
}
} finally {
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
pendingWakeup = true;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
....
// Avoid blocking for as long as possible - loop until available work exhausted
boolean maybeMoreWork = true;
do {
try {
//若被唤醒则代表有执行队列,当然也可以没有因为前方的等待是有条件的比如hasTasks
maybeMoreWork = completionQueue.process(callback) != 0 | runAllTasks();
} catch (Throwable t) {
handleLoopException(t);
}
....
} while (maybeMoreWork);
}
}
此方法的核心有两处1.submit 2.process。
int submit() {
int submit = tail - head;
//submit的参数都是0
return submit > 0 ? submit(submit, 0, 0) : 0;
}
int submitAndWait() {
int submit = tail - head;
if (submit > 0) {
//若存在提交队列则提交对应的数量并且执行,1代表最少完成一个
return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
}
assert submit == 0;
//否则则代表没有提交队列,0个提交最少完成一个,则会wait等待提交
int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall returned " + ret);
}
return ret; // should be 0
}
private int submit(int toSubmit, int minComplete, int flags) {
//在此处更新的tail值
PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
if (ret != toSubmit) {
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall returned " + ret);
}
logger.warn("Not all submissions succeeded");
}
return ret;
}
核心进入ioUringEnter函数
{"ioUringEnter", "(IIII)I", (void *) netty_io_uring_enter}
static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint to_submit,
jint min_complete, jint flags) {
int result;
int err;
do {
result = sys_io_uring_enter(ring_fd, to_submit, min_complete, flags, NULL);
if (result >= 0) {
return result;
}
} while ((err = errno) == EINTR); //此处吃了内核的中断异常
return -err;
}
//进入内核
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)
{
struct io_ring_ctx *ctx;
struct fd f;
long ret;
f = fdget(fd);
ctx = f.file->private_data;
ret = 0;
//若开启了内核线程则等待内核线程执行提交队列
if (ctx->flags & IORING_SETUP_SQPOLL) {
io_cqring_overflow_flush(ctx);
//若是唤醒则直接唤醒内核线程即可
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sq_data->wait);
if (flags & IORING_ENTER_SQ_WAIT) {
//否则等待执行
ret = io_sqpoll_wait_sq(ctx);
if (ret)
goto out;
}
//若ret为0代表正常则设置ret为用户提交的数量
ret = to_submit;
} else if (to_submit) { //当前线程完成指定提交数量的任务
ret = io_uring_add_tctx_node(ctx);
if (unlikely(ret))
goto out;
//内核对于提交队列的操作都是上锁的
mutex_lock(&ctx->uring_lock);
//执行指定数目的任务
ret = io_submit_sqes(ctx, to_submit);
//若不相等则代表异常
if (ret != to_submit) {
mutex_unlock(&ctx->uring_lock);
goto out;
}
if (flags & IORING_ENTER_GETEVENTS) {
if (ctx->syscall_iopoll)
goto iopoll_locked;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
(void)io_run_local_work_locked(ctx);
}
mutex_unlock(&ctx->uring_lock);
}
//若没有提交只是为了获取完成队列则进入此处 netty也提供了该参数
if (flags & IORING_ENTER_GETEVENTS) {
int ret2;
//假设当前没有此内容
if (ctx->syscall_iopoll) {
} else {
const sigset_t __user *sig;
struct __kernel_timespec __user *ts;
ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
if (likely(!ret2)) {
min_complete = min(min_complete,
ctx->cq_entries);
//默认等待完成队列,并且让出当前CPU挂起当前线程
ret2 = io_cqring_wait(ctx, min_complete, sig,
argsz, ts);
}
}
out:
fdput(f);
return ret;
}
此处只用关注io_submit_sqes与io_cqring_wait即可,因为netty有使用到。
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
__must_hold(&ctx->uring_lock)
{
unsigned int entries = io_sqring_entries(ctx);
unsigned int left;
int ret;
if (unlikely(!entries))
return 0;
//根据传入的to_submit与sq的个数还有当前存在元素取最小值作为循环
ret = left = min3(nr, ctx->sq_entries, entries);
do {
const struct io_uring_sqe *sqe;
struct io_kiocb *req;
if (unlikely(!io_alloc_req_refill(ctx)))
break;
//根据ctx构建一个req
req = io_alloc_req(ctx);
//获取ring中head的任务
sqe = io_get_sqe(ctx);
if (unlikely(!sqe)) {
io_req_add_to_cache(req, ctx);
break;
}
//执行失败则退出循环
if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
!(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
left--;
break;
}
} while (--left);//每次成功都会--操作
//每次io_get_sqe并不是操作的head而是ctx->cached_sq_head++
//故而当最终完成时需要设置head,便是此函数smp_store_release(&rings->sq.head, ctx->cached_sq_head);
io_commit_sqring(ctx);
return ret;
}
static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct io_uring_sqe *sqe)
__must_hold(&ctx->uring_lock)
{
struct io_submit_link *link = &ctx->submit_state.link;
int ret;
//初始化req,前面只是分配创建,此处是将sqe任务中的信息设置给req
ret = io_init_req(ctx, req, sqe);
//若当前提交状态存在同步队列则尝试加入,若加入失败则io_queue_sqe执行
//失败的可能性是当前任务也需要是同步类型
if (unlikely(link->head)) {
}
//最终执行此函数
io_queue_sqe(req);
return 0;
}
static inline void io_queue_sqe(struct io_kiocb *req)
__must_hold(&req->ctx->uring_lock)
{
int ret;
//执行提交队列
ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
}
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{
//opcode还记得前文中nettyadd时的事件类型嘛比如IORING_OP_POLL_ADD,此处opcode便是它
const struct io_op_def *def = &io_op_defs[req->opcode];
//调用该结构的issue函数
ret = def->issue(req, issue_flags);
if (ret == IOU_OK) {
//当req完成时则进入此函数
io_req_complete_post(req, issue_flags);
} else if (ret != IOU_ISSUE_SKIP_COMPLETE)
return ret;
}
//此数组中定义很多的类型,比如以下两个
const struct io_op_def io_op_defs[] {
[IORING_OP_POLL_ADD] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.audit_skip = 1,
.name = "POLL_ADD",
.prep = io_poll_add_prep,
.issue = io_poll_add,
},
[IORING_OP_ACCEPT] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.poll_exclusive = 1,
.ioprio = 1, /* used for flags */
.name = "ACCEPT",
.prep = io_accept_prep,
.issue = io_accept,
.prep = io_eopnotsupp_prep,
},
}
int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll);
struct io_poll_table ipt;
int ret;
ipt.pt._qproc = io_poll_queue_proc;
if (req->ctx->flags & (IORING_SETUP_SQPOLL|IORING_SETUP_SINGLE_ISSUER))
req->flags |= REQ_F_HASH_LOCKED;
ret = __io_arm_poll_handler(req, poll, &ipt, poll->events, issue_flags);
if (ret > 0) {
io_req_set_res(req, ipt.result_mask, 0);
return IOU_OK;
}
return ret ?: IOU_ISSUE_SKIP_COMPLETE;
}
static int __io_arm_poll_handler(struct io_kiocb *req,
struct io_poll *poll,
struct io_poll_table *ipt, __poll_t mask,
unsigned issue_flags)
{
...
//最终调用vfs的poll函数 其内部是调用当前req->file->poll方法由此进入了socket中的poll实现,此处不在深入
mask = vfs_poll(req->file, &ipt->pt) & poll->events;
}
与io_poll_add一样io_accept则进入到net中的accept中,其监听的便是传入req中所对应的fd,也即serverfd。继续回到当任务完成时。
void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
...
struct io_ring_ctx *ctx = req->ctx;
mutex_lock(&ctx->uring_lock);
//当上锁后则执行
__io_req_complete_post(req);
mutex_unlock(&ctx->uring_lock);
}
static void __io_req_complete_post(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
//获取完成队列锁
io_cq_lock(ctx);
//若不跳过,默认便是不跳过
if (!(req->flags & REQ_F_CQE_SKIP))
//则将req赋值给cq中
io_fill_cqe_req(ctx, req);
...
io_cq_unlock_post(ctx);
}
static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
//若失败代表可能完成队列满了则进去下方
if (likely(__io_fill_cqe_req(ctx, req)))
return true;
return io_req_cqe_overflow(req);
}
static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
struct io_uring_cqe *cqe;
//从完成队列中获取一个元素
cqe = io_get_cqe(ctx);
if (unlikely(!cqe))
return false;
//将req中的cqe赋值给刚才获取的数据,cqe中 req->cqe.fd = READ_ONCE(sqe->fd); req->cqe.user_data = READ_ONCE(sqe->user_data);
//都是在前方init时赋值的,而它的res则是每个issue中调用io_req_set_res设置的,比如poll中
memcpy(cqe, &req->cqe, sizeof(*cqe));
return true;
}
void io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
{
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
io_commit_cqring_flush(ctx);
//还记得前文提到的睡眠函数io_cqring_wait,它的唤醒操作便在此处
io_cqring_wake(ctx);
}
至此,事件提交到了sq中为IORING_OP_POLL_ADD,监听的fd是serverfd。当有连接时则会唤醒submitAndWait,由此继续执行process。
private final IOUringCompletionQueueCallback callback = IOUringEventLoop.this::handle;
int process(IOUringCompletionQueueCallback callback) {
int tail = PlatformDependent.getIntVolatile(kTailAddress);
int i = 0;
while (ringHead != tail) {
//通过完成队列计算当前head的地址
long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
//通过head得到第一个元素得到cqe中的参数 udata与res flag
long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
int flags = PlatformDependent.getInt(cqeAddress + CQE_FLAGS_FIELD);
//将head++
ringHead++;
//设置其处理head
PlatformDependent.putIntOrdered(kHeadAddress, ringHead);
i++;
//解码当前udata
decode(res, flags, udata, callback);
}
return i;
}
//在提交任务时udata存储了fd op data信息此处将其解出来 然后调用callback
static void decode(int res, int flags, long udata, IOUringCompletionQueueCallback callback) {
int fd = (int) (udata & 0xFFFFFFFFL);
byte op = (byte) ((udata >>>= 32) & 0xFFL);
short data = (short) (udata >>> 16);
//此函数是上方定义IOUringEventLoop.this::handle 故而执行此函数
callback.handle(fd, res, flags, op, data);
}
private void handle(int fd, int res, int flags, byte op, short data) {
// Remaining events should be channel-specific
final AbstractIOUringChannel channel = channels.get(fd);
if (channel == null) {
return;
}
if (op == Native.IORING_OP_RECV || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG ||
op == Native.IORING_OP_READ) {
//读处理
handleRead(channel, res, data);
} else if (op == Native.IORING_OP_WRITEV ||
op == Native.IORING_OP_SEND || op == Native.IORING_OP_SENDMSG || op == Native.IORING_OP_WRITE) {
handleWrite(channel, res, data); //写处理
} else if (op == Native.IORING_OP_POLL_ADD) {
//前文register中addPoll添加的事件便是IORING_OP_POLL_ADD,也就代表当连接触发时会执行该方法
handlePollAdd(channel, res, data);
} else if (op == Native.IORING_OP_POLL_REMOVE) {
} else if (op == Native.IORING_OP_CONNECT) {
handleConnect(channel, res);
}
channel.ioUringUnsafe().processDelayedClose();
}
private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) {
if ((pollMask & Native.POLLOUT) != 0) {
channel.ioUringUnsafe().pollOut(res);
}
if ((pollMask & Native.POLLIN) != 0) {
channel.ioUringUnsafe().pollIn(res);
}
//由于当时data传入的是POLLRDHUP故而执行此函数
if ((pollMask & Native.POLLRDHUP) != 0) {
channel.ioUringUnsafe().pollRdHup(res);
}
}
final void pollRdHup(int res) {
ioState &= ~POLL_RDHUP_SCHEDULED;
if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
return;
}
// Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
recvBufAllocHandle().rdHupReceived();
if (isActive()) {
//最终执行scheduleRead0
scheduleFirstReadIfNeeded();
} else {
// Just to be safe make sure the input marked as closed.
shutdownInput(true);
}
}
protected int scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.attemptedBytesRead(1);
IOUringSubmissionQueue submissionQueue = submissionQueue();
//由此注册accept事件 然后到达前文提到过的io_accept
submissionQueue.addAccept(fd().intValue(),
acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0);
return 1;
}
//当accept事件触发后则进入此函数
private void handleRead(AbstractIOUringChannel channel, int res, int data) {
channel.ioUringUnsafe().readComplete(res, data);
}
//最终执行该函数
protected void readComplete0(int res, int data, int outstanding) {
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline();
allocHandle.lastBytesRead(res);
if (res >= 0) {
allocHandle.incMessagesRead(1);
try {
Channel channel = newChildChannel(
res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
//在流水线内将会再次出发前面的register函数然后执行schedulePollRdHup注册事件
//与server的区别在于handlePollAdd中scheduleRead0的执行
pipeline.fireChannelRead(channel);
if (allocHandle.continueReading()) { //所以此处通过判断是否继续注册accept事件
//此函数再次回到scheduleRead0中注册accept事件,那么为何第一个不直接注册呢因为还有udp,它只用接收数据而不用accept
scheduleRead();
} else {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
} catch (Throwable cause) {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
}
} else {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// Check if we did fail because there was nothing to accept atm.
if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
// Something bad happened. Convert to an exception.
pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
}
}
}
//客户端注册
protected int scheduleRead0() {
assert readBuffer == null;
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(alloc());
IOUringSubmissionQueue submissionQueue = submissionQueue();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
readBuffer = byteBuf;
//此处则是注册的接收事件 buffer地址何时使用呢?io_op_defs结构中.prep函数便是配置处
//比如recv对应的是io_recvmsg_prep函数内部sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr)); 封装得到
submissionQueue.addRecv(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
return 1;
}
至此netty到内核完成,可能有些读者想要了解内核线程模型是做什么的此处简单讲解。
//在创建uring时同时创建了一个内核线程专门执行io_sq_thread函数
create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
static int io_sq_thread(void *data)
{
...
mutex_lock(&sqd->lock);
while (1) {
bool cap_entries, sqt_spin = false;
if (io_sqd_events_pending(sqd) || signal_pending(current)) {
if (io_sqd_handle_event(sqd))
break;
timeout = jiffies + sqd->sq_thread_idle;
}
cap_entries = !list_is_singular(&sqd->ctx_list);
//遍历sqd中所有的ctx,刚才我们一直操作的都是每个线程自己的ctx,而该线程读取的是所有的ctx
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
//最终执行此函数
int ret = __io_sq_thread(ctx, cap_entries);
if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
sqt_spin = true;
}
....
}
...
do_exit(0);
}
static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
{
unsigned int to_submit;
int ret = 0;
to_submit = io_sqring_entries(ctx);
if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
if (!wq_list_empty(&ctx->iopoll_list) || to_submit) {
const struct cred *creds = NULL;
if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
!(ctx->flags & IORING_SETUP_R_DISABLED))
//其他都可以忽略关注此处即可,与netty手动执行一样的很熟
ret = io_submit_sqes(ctx, to_submit);
mutex_unlock(&ctx->uring_lock);
}
return ret;
}
至此IO_URING全部完结,笔者认为他就是集大成者,将所有的操作全部封装起来。通过源码得到该netty的该实现只适用于批量io的情况。