12、IO_Uring从Netty到内核

1、环境

//netty实现版本 下载地址 https://github.com/netty/netty-incubator-transport-io_uring
<dependency>
    <groupId>io.netty.incubator</groupId>
    <artifactId>netty-incubator-transport-native-io_uring</artifactId>
    <version>0.0.26.Final-SNAPSHOT</version>
    <classifier>linux-x86_64</classifier>
</dependency>
//内核版本
linux-6.2.1 下载地址 https://mirrors.edge.kernel.org/pub/linux/kernel/v6.x/linux-6.2.1.tar.gz

2、java是用案例

//以官方案例入手
public class EchoIOUringServer {
    private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    public static void main(String []args) {
        //使用方式与Epoll相同支持此处新实现了IOUringEventLoopGroup
        EventLoopGroup bossGroup = new IOUringEventLoopGroup(1);
        EventLoopGroup workerGroup = new IOUringEventLoopGroup(1);
        final EchoIOUringServerHandler serverHandler = new EchoIOUringServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                     //其次它的channel也是新实现的IOUringServerSocketChannel
                    .channel(IOUringServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(serverHandler);
                        }
                    });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
      ...
}

根据上方代码得出两个关键入口IOUringEventLoopGroup与IOUringServerSocketChannel。

3、源码实现

public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
     ...
    @Override
    //在创建group线程组的时候会构建出执行器而构建方法就是newChild,若对着方面不清楚的可以先学习下netty基础
    protected EventLoop newChild(Executor executor, Object... args) {
        if (args.length != 4) {
            throw new IllegalArgumentException("Illegal amount of extra arguments");
        }
        //读取ringSize的大小 用来限制iouring中接收请求大小内核中将会设置为2的倍数roundup_pow_of_two 后续介绍
        int ringSize = ObjectUtil.checkPositiveOrZero((Integer) args[0], "ringSize");
        if (ringSize == 0) {
            ringSize = Native.DEFAULT_RING_SIZE;
        }

        int iosqeAsyncThreshold =  ObjectUtil.checkPositiveOrZero((Integer) args[1], "iosqeAsyncThreshold");
        RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
        EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
        return new IOUringEventLoop(this, executor, ringSize, iosqeAsyncThreshold,
                rejectedExecutionHandler, taskQueueFactory);
    }
}

//在EchoIOUringServer中最后一行代码是 b.bind(PORT).sync();它进入此函数
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    //最终调用到此处
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            //doBind0中会调用regFuture中Channel的bind(这里很重要)
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        }
        ....
    }
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        ...
        //构建channel, 此处由于.channel(IOUringServerSocketChannel.class)的封装所以理解为new IOUringServerSocketChannel即可
        channel = channelFactory.newChannel();
        //register很重要将触发accept的注册
        ChannelFuture regFuture = config().group().register(channel);
    }
}
//由此进入核心类
public final class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel {
    private final IOUringServerSocketChannelConfig config;

    public IOUringServerSocketChannel() {
        //首先创建一个socket对象核心是一个fd标识
        //Netty实现的Socket类中的newSocketStreamFd(boolean var0);方法
        super(LinuxSocket.newSocketStream(), false);
    }
    //当有新的链接时需要创建一个新的channel则是此方法
    @Override
    Channel newChildChannel(int fd, long acceptedAddressMemoryAddress, long acceptedAddressLengthMemoryAddress) {
        ...
        return new IOUringSocketChannel(this, new LinuxSocket(fd), address);
    }
   //前文doBind0中会调用此函数
    @Override
    public void doBind(SocketAddress localAddress) throws Exception {
        super.doBind(localAddress);
        if (IOUring.isTcpFastOpenServerSideAvailable()) {
            int fastOpen = config().getTcpFastopen();
            if (fastOpen > 0) {
                socket.setTcpFastOpen(fastOpen);
            }
        }
        socket.listen(config.getBacklog());
        active = true;
    }
}
//上方类super
abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
    private final ByteBuffer acceptedAddressMemory;
    private final ByteBuffer acceptedAddressLengthMemory;

    private final long acceptedAddressMemoryAddress;
    private final long acceptedAddressLengthMemoryAddress;
    protected AbstractIOUringServerChannel(LinuxSocket socket, boolean active) {
        super(null, socket, active);
        //分配一个struct sockaddr_storage结构大小的内存,用于存储接收连接地址信息
        acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
        //根据分配buffer获取内存地址,以便后续jni直接使用
        acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
        //上方结构是最新的结构,兼容目前所有的协议ipv4/ipv6等,但是每种协议的长度不同
        //故而需要在此处在分配一个存储长度的空间,此处使用Long的长度为8个字节
        acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
        // Needs to be initialized to the size of acceptedAddressMemory.
        // See https://man7.org/linux/man-pages/man2/accept.2.html
        //当内核接收到请求后默认读取大小为前方结构的大小SIZEOF_SOCKADDR_STORAGE
        acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
        //将长度赋值给Address
        acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
    }
}

至此server的fd已经构建完成,当触发 config().group().register(channel);调用时将会进入io.netty.incubator.channel.uring.AbstractIOUringStreamChannel#doRegister函数再内部调用schedulePollRdHup函数

    //此函数在AbstractIOUringChannel中定义
    final void schedulePollRdHup() {
        assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
        //而当前对象是IOUringServerSocketChannel,故而此处的fd便是前方创建server的fd
        IOUringSubmissionQueue submissionQueue = submissionQueue();
        //有意思的是此处添加的事件并发accept,而是addPollRdHup
        submissionQueue.addPollRdHup(fd().intValue());
        ioState |= POLL_RDHUP_SCHEDULED;
    }
    boolean addPollRdHup(int fd) {
        return addPoll(fd, Native.POLLRDHUP);
    }
    //最终添加的sqe事件为IORING_OP_POLL_ADD,用来监听server fd的任何事件 包括accept
    private boolean addPoll(int fd, int pollMask) {
        return enqueueSqe(Native.IORING_OP_POLL_ADD, 0, pollMask, fd, 0, 0, 0, (short) pollMask);
    }

server的监听事件至此构建完成。接下来继续完善IOUringEventLoop的构建来讲述IOUringSubmissionQueue的来源。

//在group中每一个线程都是一个IOUringEventLoop,也就代表RingBuffer每个线程都有一个各自的
public final class IOUringEventLoop extends SingleThreadEventLoop {
    private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<>(4096);
    private final RingBuffer ringBuffer;
    private final FileDescriptor eventfd;
    // The maximum number of bytes for an InetAddress / Inet6Address
    private final byte[] inet4AddressArray = new byte[SockaddrIn.IPV4_ADDRESS_LENGTH];
    private final byte[] inet6AddressArray = new byte[SockaddrIn.IPV6_ADDRESS_LENGTH];
    private final IOUringCompletionQueueCallback callback = IOUringEventLoop.this::handle;
    private final Runnable submitIOTask = () -> getRingBuffer().ioUringSubmissionQueue().submit();
    IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, int iosqeAsyncThreshold,
                     RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray
        //检测iouring是否可用,内部static代码块尝试构建一个ringbuffer所以只会检测一次也即IOUring加载的一次
        IOUring.ensureAvailability();
        //创建当前线程的ringBuffer 
        ringBuffer = Native.createRingBuffer(ringSize, iosqeAsyncThreshold);
        //当前线程在执行时若存在空队列那么将会睡眠,此fd是兜底方便外部线程唤醒使用
        eventfd = Native.newBlockingEventFd();
        logger.trace("New EventLoop: {}", this.toString());
    }
 //io_uring创建的核心方法
static RingBuffer createRingBuffer(int ringSize, int iosqeAsyncThreshold) {
        //调用本地方法创建iouring,并且返回数组0是提交队列的配置信息,1是完成队列的提交信息
        long[][] values = ioUringSetup(ringSize);
        assert values.length == 2;
        long[] submissionQueueArgs = values[0];
        assert submissionQueueArgs.length == 11;
        //通过返回的信息创建出对应的提交队列和完成队列,核心并不再次而在ioUringSetup
        IOUringSubmissionQueue submissionQueue = new IOUringSubmissionQueue(...);
        long[] completionQueueArgs = values[1];
        assert completionQueueArgs.length == 9;
        IOUringCompletionQueue completionQueue = new IOUringCompletionQueue(...);
        return new RingBuffer(submissionQueue, completionQueue);
    }

接下来根据上方的本地方法调用进入c代码中

{"ioUringSetup", "(I)[[J", (void *) netty_io_uring_setup};
static jobjectArray netty_io_uring_setup(JNIEnv *env, jclass clazz, jint entries) {
    struct io_uring_params p;
    memset(&p, 0, sizeof(p));
    //首先创建一个数组其长度为2,与上方long第一维数组所对应
    jobjectArray array = (*env)->NewObjectArray(env, 2, longArrayClass, NULL);
    //其次创建第一维0下标的数组其长度为11也即提交队列的信息
    jlongArray submissionArray = (*env)->NewLongArray(env, 11);
    //完成队列信息
    jlongArray completionArray = (*env)->NewLongArray(env, 9);
    //根据传入的ring队列大小创建出io_uring
    //其创建的参数回传给当前p,同时p也可以设置参数进去
    int ring_fd = sys_io_uring_setup((int)entries, &p);
    struct io_uring io_uring_ring;
    //此步骤非常重要,它涉及为何我们在java层面提交事件而在内核中能够提取到
    int ret = setup_io_uring(ring_fd, &io_uring_ring, &p);
    //根据创建回传的信息填充到数组中,读者可以与对应的java类参数所对应
    jlong submissionArrayElements[] = {
        (jlong)io_uring_ring.sq.khead,
        (jlong)io_uring_ring.sq.ktail,
        (jlong)io_uring_ring.sq.kring_mask,
        (jlong)io_uring_ring.sq.kring_entries,
        (jlong)io_uring_ring.sq.kflags,
        (jlong)io_uring_ring.sq.kdropped,
        (jlong)io_uring_ring.sq.array,
        (jlong)io_uring_ring.sq.sqes,
        (jlong)io_uring_ring.sq.ring_sz,
        (jlong)io_uring_ring.sq.ring_ptr,
        (jlong)ring_fd
    };
    (*env)->SetLongArrayRegion(env, submissionArray, 0, 11, submissionArrayElements);

    jlong completionArrayElements[] = {
        (jlong)io_uring_ring.cq.khead,
        (jlong)io_uring_ring.cq.ktail,
        (jlong)io_uring_ring.cq.kring_mask,
        (jlong)io_uring_ring.cq.kring_entries,
        (jlong)io_uring_ring.cq.koverflow,
        (jlong)io_uring_ring.cq.cqes,
        (jlong)io_uring_ring.cq.ring_sz,
        (jlong)io_uring_ring.cq.ring_ptr,
        (jlong)ring_fd
    };
    (*env)->SetLongArrayRegion(env, completionArray, 0, 9, completionArrayElements);

    (*env)->SetObjectArrayElement(env, array, 0, submissionArray);
    (*env)->SetObjectArrayElement(env, array, 1, completionArray);
    return array;
}

首先进入内核中查看sys_io_uring_setup的实现

long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
    struct io_uring_params p;
    int i;
        //为了安全首先将用户的参数复制给内核中
    if (copy_from_user(&p, params, sizeof(p)))
        return -EFAULT;
    for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
        if (p.resv[i])
            return -EINVAL;
    }
        //检查参数是否合法 此处均为0所以忽略
    if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
            IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
            IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
            IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
            IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
            IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
            IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN))
        return -EINVAL;
        //创建io_uring
    return io_uring_create(entries, &p, params);
}

static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
                  struct io_uring_params __user *params)
{
    struct io_ring_ctx *ctx;
    struct file *file;
    int ret;
        //根据传入的ringSize创建提交队列的元素数前文提到过他将是2的倍数,便在此处完成
        //
    p->sq_entries = roundup_pow_of_two(entries);
    if (p->flags & IORING_SETUP_CQSIZE) {
        ...//此处是创建完成队列的个数计算忽略因为传入参数flags为0
    } else {
                //默认是提交队列的二倍,由此可见netty隐藏了部分配置是的不那么灵活
        p->cq_entries = 2 * p->sq_entries;
    }
        //此处创建ctx对象与初始化内部的各种队列,暂且忽略并不影响主流程
    ctx = io_ring_ctx_alloc(p);
    //将sq与cq的队列大小加起来在根据各自的队列的结构创建出数组的大小
    ret = io_allocate_scq_urings(ctx, p);
    //io_uring有几种模式 其中有一种是创建内核线程由他帮忙处理提交的事件
        //需要flags中包含IORING_SETUP_SQPOLL而ctx->flags = p->flags; 所以此处并未使用它,而是默认由应用自己处理提交
        //这也是为何对于iouring并未上锁使用的原因之一,全是当前线程在处理
    ret = io_sq_offload_create(ctx, p);
    if (ret)
        goto err;
        //此处是根据p中的内容获取对应的偏移,需要注意的是此处是获取偏移而非具体地址
    memset(&p->sq_off, 0, sizeof(p->sq_off));
    p->sq_off.head = offsetof(struct io_rings, sq.head);
    p->sq_off.tail = offsetof(struct io_rings, sq.tail);
    p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
    p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
    p->sq_off.flags = offsetof(struct io_rings, sq_flags);
    p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
    p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;

    memset(&p->cq_off, 0, sizeof(p->cq_off));
    p->cq_off.head = offsetof(struct io_rings, cq.head);
    p->cq_off.tail = offsetof(struct io_rings, cq.tail);
    p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
    p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
    p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
    p->cq_off.cqes = offsetof(struct io_rings, cqes);
    p->cq_off.flags = offsetof(struct io_rings, cq_flags);

    p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
            IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
            IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
            IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
            IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
            IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
            IORING_FEAT_LINKED_FILE;
        //将p拷贝到用户传递的params中
    if (copy_to_user(params, p, sizeof(*p))) {
        ret = -EFAULT;
        goto err;
    }
        //获取文件操作对象
    file = io_uring_get_file(ctx);
        //根据对象插入到当前进程的fd管理结构中,方便通过fd直接操作iouring,万物皆文件
        ret = io_uring_install_fd(ctx, file);
    trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
    return ret;
}

io_uring已经创建完成。接下来讲述jvm与内核的内存映射。

 //回到netty的实现
//创建后执行的配置方法
static int setup_io_uring(int ring_fd, struct io_uring *io_uring_ring,
                    struct io_uring_params *p) {
    return io_uring_mmap(ring_fd, p, &io_uring_ring->sq, &io_uring_ring->cq);
}
static int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq, struct io_uring_cq *cq) {
    size_t size;
    int ret;
    //根据回传的p信息获取两个队列的大小
    sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
    cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);

    //通过mmap得到队列的地址,核心在与fd与前文所提到的
    sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
    if (sq->ring_ptr == MAP_FAILED) {
        return -errno;
    }
    //根据得到的mmap的地址在加上创建时提供的偏移得到具体的地址io_uring_install_fd,稍后展开
    sq->khead = sq->ring_ptr + p->sq_off.head;
    sq->ktail = sq->ring_ptr + p->sq_off.tail;
    sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
    sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
    sq->kflags = sq->ring_ptr + p->sq_off.flags;
    sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
    sq->array = sq->ring_ptr + p->sq_off.array;

    size = p->sq_entries * sizeof(struct io_uring_sqe);
    //完成队列也一样,所以前文netty在jni中是取sq与cq中的信息而不是内核返回的p中的(p只是对象字段的偏移,此处才是具体内存的地址计算)
    sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
    if (sq->sqes == MAP_FAILED) {
        ret = -errno;
        goto err;
    }

    cq->khead = cq->ring_ptr + p->cq_off.head;
    cq->ktail = cq->ring_ptr + p->cq_off.tail;
    cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
    cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
    cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
    cq->cqes = cq->ring_ptr + p->cq_off.cqes;
    //至此netty结束进入内核
    return 0;
}
//linux万物皆文件刚才io_uring的fd便是文件的,通过操作fd可以得到以下结构。在mmap方法中判断是否传入fd若是则会调用当前fd的file_operations 
//也即以下内容
static const struct file_operations io_uring_fops = {
    .release    = io_uring_release,
    .mmap       = io_uring_mmap,
    .get_unmapped_area = io_uring_nommu_get_unmapped_area,
    .mmap_capabilities = io_uring_nommu_mmap_capabilities,
    .poll       = io_uring_poll,
    .show_fdinfo    = io_uring_show_fdinfo,
};
//首先mmap中会先调用get_unmapped_area然后时mmap
static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
    unsigned long addr, unsigned long len,
    unsigned long pgoff, unsigned long flags)
{
    void *ptr;
    ptr = io_uring_validate_mmap_request(file, pgoff, len);
    return (unsigned long) ptr;
}

static void *io_uring_validate_mmap_request(struct file *file,
                        loff_t pgoff, size_t sz)
{
        //在上方mmap最后一个参数传递了IORING_OFF_SQES与IORING_OFF_SQ_RING便是pgoff 
    struct io_ring_ctx *ctx = file->private_data;
    loff_t offset = pgoff << PAGE_SHIFT;
    struct page *page;
    void *ptr;
        //通过匹配pgoff 回去对应的偏移
    switch (offset) {
    case IORING_OFF_SQ_RING:
    case IORING_OFF_CQ_RING:
        ptr = ctx->rings; //此处直接返回的rings的首地址,而不是传统的分配内存
        break;
    case IORING_OFF_SQES:
        ptr = ctx->sq_sqes;
        break;
    default:
        return ERR_PTR(-EINVAL);
    }
        //返回该地址即可
    return ptr;
}

static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
{
        //设置当前内存状态为共享
    return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
}

至此,ring的队列为何能在java中操作原理与io_uring创建完成。接下来讲述netty对它的使用。

    //前文讲述,addPoll函数中最终调用了此方法
    boolean enqueueSqe(byte op, int flags, int rwFlags, int fd,
                       long bufferAddress, int length, long offset, short data) {
        int pending = tail - head;
        boolean submit = pending == ringEntries;
        //判断当前提交队列是否满了,ringEntries是允许提交的个数,ring的核心在于这个环是可复用的当尾部追到头部时代表满了
        if (submit) {
            //若满了则直接submit操作去完成提交任务
            int submitted = submit();
            if (submitted == 0) {
                // We have a problem, could not submit to make more room in the ring
                throw new RuntimeException("SQ ring full and no submissions accepted");
            }
        }
        //否则根据(IOUringSubmissionQueue创建时)计算的sub地址加上尾部+1得到最新的提交地址
        long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
        //根据提交地址挨个设置对应的值即可
        setData(sqe, op, flags, rwFlags, fd, bufferAddress, length, offset, data);
        return submit;
    }

   private void setData(long sqe, byte op, int flags, int rwFlags, int fd, long bufferAddress, int length,
                         long offset, short data) {
        //set sqe(submission queue) properties
        //都是根据sqe基地址 通过jni直接操作内存提交
        PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op);
        PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) flags);
        ....
    }

有意思的是,设置地址并未设置tail的值给内核,这样就代表并不会得到最新的tail,因为前文说过其submit也是netty自己完成,所以内核无需知道(内核没有线程读取队列)。由此完成了事件的提交,那什么时候执行呢?(目前只注册了server fd的监听事件)

//在IOUringEventLoop中存在run函数,是netty启动后线程池自动运行的
protected void run() {
        //首先获取提交队列与完成队列
        final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
        final IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
        //提交当前loop的fd到提交队列中,防止无法唤醒的问题
        // Let's add the eventfd related events before starting to do any real work.
        addEventFdRead(submissionQueue);
        //提交执行
        final int initialFlushResult = submissionQueue.submit();
        if (initialFlushResult != 1) {
            throw new AssertionError("Failed to submit EventFdRead. Result: " + initialFlushResult);
        }
        for (;;) {
            try {
             
                try {
                    if (!hasTasks()) {
                        //若当前完成队列为空则执行等待完成,前方的submit只是提交不管任务是否完成都会返回
                        if (!completionQueue.hasCompletions()) {
                            submissionQueue.submitAndWait();
                        }
                    }
                } finally {
                    if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
                        pendingWakeup = true;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            ....
            // Avoid blocking for as long as possible - loop until available work exhausted
            boolean maybeMoreWork = true;
            do {
                try {
                   //若被唤醒则代表有执行队列,当然也可以没有因为前方的等待是有条件的比如hasTasks
                    maybeMoreWork = completionQueue.process(callback) != 0 | runAllTasks();
                } catch (Throwable t) {
                    handleLoopException(t);
                }
               ....
            } while (maybeMoreWork);
        }
    }

此方法的核心有两处1.submit 2.process。

    int submit() {
        int submit = tail - head;
        //submit的参数都是0
        return submit > 0 ? submit(submit, 0, 0) : 0;
    }

    int submitAndWait() {
        int submit = tail - head;
        if (submit > 0) {
            //若存在提交队列则提交对应的数量并且执行,1代表最少完成一个
            return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
        }
        assert submit == 0;
        //否则则代表没有提交队列,0个提交最少完成一个,则会wait等待提交
        int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
        if (ret < 0) {
            throw new RuntimeException("ioUringEnter syscall returned " + ret);
        }
        return ret; // should be 0
    }

    private int submit(int toSubmit, int minComplete, int flags) {
        //在此处更新的tail值
        PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
        int ret = Native.ioUringEnter(ringFd, toSubmit, minComplete, flags);
        head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
        if (ret != toSubmit) {
            if (ret < 0) {
                throw new RuntimeException("ioUringEnter syscall returned " + ret);
            }
            logger.warn("Not all submissions succeeded");
        }
        return ret;
    }

核心进入ioUringEnter函数

{"ioUringEnter", "(IIII)I", (void *) netty_io_uring_enter}
static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint to_submit,
                                 jint min_complete, jint flags) {
    int result;
    int err;
    do {
        result = sys_io_uring_enter(ring_fd, to_submit, min_complete, flags, NULL);
        if (result >= 0) {
            return result;
        }
    } while ((err = errno) == EINTR); //此处吃了内核的中断异常
    return -err;
}
//进入内核
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
        u32, min_complete, u32, flags, const void __user *, argp,
        size_t, argsz)
{
    struct io_ring_ctx *ctx;
    struct fd f;
    long ret;
    f = fdget(fd);
    ctx = f.file->private_data;
    ret = 0;
        //若开启了内核线程则等待内核线程执行提交队列
    if (ctx->flags & IORING_SETUP_SQPOLL) {
        io_cqring_overflow_flush(ctx);
            //若是唤醒则直接唤醒内核线程即可
        if (flags & IORING_ENTER_SQ_WAKEUP)
            wake_up(&ctx->sq_data->wait);
        if (flags & IORING_ENTER_SQ_WAIT) {
                        //否则等待执行
            ret = io_sqpoll_wait_sq(ctx);
            if (ret)
                goto out;
        }
                //若ret为0代表正常则设置ret为用户提交的数量
        ret = to_submit;
    } else if (to_submit) { //当前线程完成指定提交数量的任务
        ret = io_uring_add_tctx_node(ctx);
        if (unlikely(ret))
            goto out;
                //内核对于提交队列的操作都是上锁的
        mutex_lock(&ctx->uring_lock);
                //执行指定数目的任务
        ret = io_submit_sqes(ctx, to_submit);
                //若不相等则代表异常
        if (ret != to_submit) {
            mutex_unlock(&ctx->uring_lock);
            goto out;
        }
        if (flags & IORING_ENTER_GETEVENTS) {
            if (ctx->syscall_iopoll)
                goto iopoll_locked;
            if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
                (void)io_run_local_work_locked(ctx);
        }
        mutex_unlock(&ctx->uring_lock);
    }
        //若没有提交只是为了获取完成队列则进入此处 netty也提供了该参数
    if (flags & IORING_ENTER_GETEVENTS) {
        int ret2;
                //假设当前没有此内容
        if (ctx->syscall_iopoll) {
            
        } else {
            const sigset_t __user *sig;
            struct __kernel_timespec __user *ts;
            ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
            if (likely(!ret2)) {
                min_complete = min(min_complete,
                           ctx->cq_entries);
                                //默认等待完成队列,并且让出当前CPU挂起当前线程
                ret2 = io_cqring_wait(ctx, min_complete, sig,
                              argsz, ts);
            }
        }
out:
    fdput(f);
    return ret;
}

此处只用关注io_submit_sqes与io_cqring_wait即可,因为netty有使用到。

int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
    __must_hold(&ctx->uring_lock)
{
    unsigned int entries = io_sqring_entries(ctx);
    unsigned int left;
    int ret;

    if (unlikely(!entries))
        return 0;
    //根据传入的to_submit与sq的个数还有当前存在元素取最小值作为循环
    ret = left = min3(nr, ctx->sq_entries, entries);
    do {
        const struct io_uring_sqe *sqe;
        struct io_kiocb *req;

        if (unlikely(!io_alloc_req_refill(ctx)))
            break;
                //根据ctx构建一个req
        req = io_alloc_req(ctx);
                //获取ring中head的任务
        sqe = io_get_sqe(ctx);
        if (unlikely(!sqe)) {
            io_req_add_to_cache(req, ctx);
            break;
        }
                //执行失败则退出循环
        if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
            !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
            left--;
            break;
        }
    } while (--left);//每次成功都会--操作
        //每次io_get_sqe并不是操作的head而是ctx->cached_sq_head++
        //故而当最终完成时需要设置head,便是此函数smp_store_release(&rings->sq.head, ctx->cached_sq_head);
        io_commit_sqring(ctx);
    return ret;
}

static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
             const struct io_uring_sqe *sqe)
    __must_hold(&ctx->uring_lock)
{
    struct io_submit_link *link = &ctx->submit_state.link;
    int ret;
        //初始化req,前面只是分配创建,此处是将sqe任务中的信息设置给req
    ret = io_init_req(ctx, req, sqe);
        //若当前提交状态存在同步队列则尝试加入,若加入失败则io_queue_sqe执行
        //失败的可能性是当前任务也需要是同步类型
    if (unlikely(link->head)) {
    } 
        //最终执行此函数
    io_queue_sqe(req);
    return 0;
}

static inline void io_queue_sqe(struct io_kiocb *req)
    __must_hold(&req->ctx->uring_lock)
{
    int ret;
        //执行提交队列
    ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
}
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{
    //opcode还记得前文中nettyadd时的事件类型嘛比如IORING_OP_POLL_ADD,此处opcode便是它
    const struct io_op_def *def = &io_op_defs[req->opcode];
     //调用该结构的issue函数
     ret = def->issue(req, issue_flags);
      if (ret == IOU_OK) {
                //当req完成时则进入此函数
        io_req_complete_post(req, issue_flags);
    } else if (ret != IOU_ISSUE_SKIP_COMPLETE)
        return ret;
}
//此数组中定义很多的类型,比如以下两个
const struct io_op_def io_op_defs[] {
      [IORING_OP_POLL_ADD] = {
        .needs_file     = 1,
        .unbound_nonreg_file    = 1,
        .audit_skip     = 1,
        .name           = "POLL_ADD",
        .prep           = io_poll_add_prep,
        .issue          = io_poll_add,
      },
      [IORING_OP_ACCEPT] = {
        .needs_file     = 1,
        .unbound_nonreg_file    = 1,
        .pollin         = 1,
        .poll_exclusive     = 1,
        .ioprio         = 1,    /* used for flags */
        .name           = "ACCEPT",
        .prep           = io_accept_prep,
        .issue          = io_accept,
        .prep           = io_eopnotsupp_prep,
    },
}

int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
{
    struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll);
    struct io_poll_table ipt;
    int ret;
    ipt.pt._qproc = io_poll_queue_proc;
    if (req->ctx->flags & (IORING_SETUP_SQPOLL|IORING_SETUP_SINGLE_ISSUER))
        req->flags |= REQ_F_HASH_LOCKED;
    ret = __io_arm_poll_handler(req, poll, &ipt, poll->events, issue_flags);
    if (ret > 0) {
        io_req_set_res(req, ipt.result_mask, 0);
        return IOU_OK;
    }
    return ret ?: IOU_ISSUE_SKIP_COMPLETE;
}

static int __io_arm_poll_handler(struct io_kiocb *req,
                 struct io_poll *poll,
                 struct io_poll_table *ipt, __poll_t mask,
                 unsigned issue_flags)
{
        ...
        //最终调用vfs的poll函数 其内部是调用当前req->file->poll方法由此进入了socket中的poll实现,此处不在深入
    mask = vfs_poll(req->file, &ipt->pt) & poll->events;
}

与io_poll_add一样io_accept则进入到net中的accept中,其监听的便是传入req中所对应的fd,也即serverfd。继续回到当任务完成时。

void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{              
                ...
        struct io_ring_ctx *ctx = req->ctx;
        mutex_lock(&ctx->uring_lock);
                //当上锁后则执行
        __io_req_complete_post(req);
        mutex_unlock(&ctx->uring_lock);
}

static void __io_req_complete_post(struct io_kiocb *req)
{
    struct io_ring_ctx *ctx = req->ctx;
        //获取完成队列锁
    io_cq_lock(ctx);
        //若不跳过,默认便是不跳过
    if (!(req->flags & REQ_F_CQE_SKIP))
                //则将req赋值给cq中
        io_fill_cqe_req(ctx, req);
        ...
    io_cq_unlock_post(ctx);
}

static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
                   struct io_kiocb *req)
{
        //若失败代表可能完成队列满了则进去下方  
    if (likely(__io_fill_cqe_req(ctx, req)))
        return true;
    return io_req_cqe_overflow(req);
}

static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
                     struct io_kiocb *req)
{
    struct io_uring_cqe *cqe;
        //从完成队列中获取一个元素
    cqe = io_get_cqe(ctx);
    if (unlikely(!cqe))
        return false;
        //将req中的cqe赋值给刚才获取的数据,cqe中 req->cqe.fd = READ_ONCE(sqe->fd); req->cqe.user_data = READ_ONCE(sqe->user_data);
        //都是在前方init时赋值的,而它的res则是每个issue中调用io_req_set_res设置的,比如poll中
    memcpy(cqe, &req->cqe, sizeof(*cqe));
    return true;
}

void io_cq_unlock_post(struct io_ring_ctx *ctx)
    __releases(ctx->completion_lock)
{
    io_commit_cqring(ctx);
    spin_unlock(&ctx->completion_lock);
    io_commit_cqring_flush(ctx);
        //还记得前文提到的睡眠函数io_cqring_wait,它的唤醒操作便在此处
    io_cqring_wake(ctx);
}

至此,事件提交到了sq中为IORING_OP_POLL_ADD,监听的fd是serverfd。当有连接时则会唤醒submitAndWait,由此继续执行process。

private final IOUringCompletionQueueCallback callback = IOUringEventLoop.this::handle;

 int process(IOUringCompletionQueueCallback callback) {
        int tail = PlatformDependent.getIntVolatile(kTailAddress);
        int i = 0;
        while (ringHead != tail) {
            //通过完成队列计算当前head的地址
            long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
            //通过head得到第一个元素得到cqe中的参数 udata与res flag
            long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
            int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
            int flags = PlatformDependent.getInt(cqeAddress + CQE_FLAGS_FIELD);
            //将head++
            ringHead++;
            //设置其处理head
            PlatformDependent.putIntOrdered(kHeadAddress, ringHead);

            i++;
            //解码当前udata
            decode(res, flags, udata, callback);
        }
        return i;
    }
     //在提交任务时udata存储了fd op data信息此处将其解出来 然后调用callback
    static void decode(int res, int flags, long udata, IOUringCompletionQueueCallback callback) {
        int fd = (int) (udata & 0xFFFFFFFFL);
        byte op = (byte) ((udata >>>= 32) & 0xFFL);
        short data = (short) (udata >>> 16);
        //此函数是上方定义IOUringEventLoop.this::handle 故而执行此函数
        callback.handle(fd, res, flags, op, data);
    }

private void handle(int fd, int res, int flags, byte op, short data) {
        
            // Remaining events should be channel-specific
            final AbstractIOUringChannel channel = channels.get(fd);
            if (channel == null) {
                return;
            }
            if (op == Native.IORING_OP_RECV || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG ||
               op == Native.IORING_OP_READ) {
                //读处理
                handleRead(channel, res, data);
            } else if (op == Native.IORING_OP_WRITEV ||
                    op == Native.IORING_OP_SEND || op == Native.IORING_OP_SENDMSG || op == Native.IORING_OP_WRITE) {
                handleWrite(channel, res, data); //写处理
            } else if (op == Native.IORING_OP_POLL_ADD) {
                //前文register中addPoll添加的事件便是IORING_OP_POLL_ADD,也就代表当连接触发时会执行该方法
                handlePollAdd(channel, res, data);
            } else if (op == Native.IORING_OP_POLL_REMOVE) {
              
            } else if (op == Native.IORING_OP_CONNECT) {
                handleConnect(channel, res);
            }
            channel.ioUringUnsafe().processDelayedClose();
        }
  
    private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) {
        if ((pollMask & Native.POLLOUT) != 0) {
            channel.ioUringUnsafe().pollOut(res);
        }
        if ((pollMask & Native.POLLIN) != 0) {
            channel.ioUringUnsafe().pollIn(res);
        }
        //由于当时data传入的是POLLRDHUP故而执行此函数
        if ((pollMask & Native.POLLRDHUP) != 0) {
            channel.ioUringUnsafe().pollRdHup(res);
        }
    }

      final void pollRdHup(int res) {
            ioState &= ~POLL_RDHUP_SCHEDULED;
            if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
                return;
            }

            // Mark that we received a POLLRDHUP and so need to continue reading until all the input ist drained.
            recvBufAllocHandle().rdHupReceived();

            if (isActive()) {
                //最终执行scheduleRead0
                scheduleFirstReadIfNeeded();
            } else {
                // Just to be safe make sure the input marked as closed.
                shutdownInput(true);
            }
        }
    
        protected int scheduleRead0() {
            final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
            allocHandle.attemptedBytesRead(1);

            IOUringSubmissionQueue submissionQueue = submissionQueue();
            //由此注册accept事件 然后到达前文提到过的io_accept
            submissionQueue.addAccept(fd().intValue(),
                    acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0);
            return 1;
        }
         //当accept事件触发后则进入此函数
        private void handleRead(AbstractIOUringChannel channel, int res, int data) {
            channel.ioUringUnsafe().readComplete(res, data);
        }
        //最终执行该函数
        protected void readComplete0(int res, int data, int outstanding) {
            final IOUringRecvByteAllocatorHandle allocHandle =
                    (IOUringRecvByteAllocatorHandle) unsafe()
                            .recvBufAllocHandle();
            final ChannelPipeline pipeline = pipeline();
            allocHandle.lastBytesRead(res);

            if (res >= 0) {
                allocHandle.incMessagesRead(1);
                try {
                    Channel channel = newChildChannel(
                            res, acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
                    //在流水线内将会再次出发前面的register函数然后执行schedulePollRdHup注册事件
                    //与server的区别在于handlePollAdd中scheduleRead0的执行
                    pipeline.fireChannelRead(channel);
                    if (allocHandle.continueReading()) { //所以此处通过判断是否继续注册accept事件
                        //此函数再次回到scheduleRead0中注册accept事件,那么为何第一个不直接注册呢因为还有udp,它只用接收数据而不用accept
                        scheduleRead();
                    } else {
                        allocHandle.readComplete();
                        pipeline.fireChannelReadComplete();
                    }
                } catch (Throwable cause) {
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
                    pipeline.fireExceptionCaught(cause);
                }
            } else {
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                // Check if we did fail because there was nothing to accept atm.
                if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
                    // Something bad happened. Convert to an exception.
                    pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
                }
            }
        }
        //客户端注册
        protected int scheduleRead0() {
            assert readBuffer == null;

            final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
            ByteBuf byteBuf = allocHandle.allocate(alloc());
            IOUringSubmissionQueue submissionQueue = submissionQueue();
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());

            readBuffer = byteBuf;
            //此处则是注册的接收事件 buffer地址何时使用呢?io_op_defs结构中.prep函数便是配置处
            //比如recv对应的是io_recvmsg_prep函数内部sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr)); 封装得到
            submissionQueue.addRecv(socket.intValue(), byteBuf.memoryAddress(),
                                    byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
            return 1;
        }

至此netty到内核完成,可能有些读者想要了解内核线程模型是做什么的此处简单讲解。

//在创建uring时同时创建了一个内核线程专门执行io_sq_thread函数
 create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);


static int io_sq_thread(void *data)
{
        ...
    mutex_lock(&sqd->lock);
    while (1) {
        bool cap_entries, sqt_spin = false;

        if (io_sqd_events_pending(sqd) || signal_pending(current)) {
            if (io_sqd_handle_event(sqd))
                break;
            timeout = jiffies + sqd->sq_thread_idle;
        }

        cap_entries = !list_is_singular(&sqd->ctx_list);
                //遍历sqd中所有的ctx,刚才我们一直操作的都是每个线程自己的ctx,而该线程读取的是所有的ctx
        list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
                        //最终执行此函数
            int ret = __io_sq_thread(ctx, cap_entries);
            if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
                sqt_spin = true;
        }
             ....
    }
        ...
    do_exit(0);
}
static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
{
    unsigned int to_submit;
    int ret = 0;
    to_submit = io_sqring_entries(ctx);
    if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
        to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;

    if (!wq_list_empty(&ctx->iopoll_list) || to_submit) {
        const struct cred *creds = NULL;

        if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
            !(ctx->flags & IORING_SETUP_R_DISABLED))
                        //其他都可以忽略关注此处即可,与netty手动执行一样的很熟
            ret = io_submit_sqes(ctx, to_submit);
        mutex_unlock(&ctx->uring_lock);
    }
    return ret;
}

至此IO_URING全部完结,笔者认为他就是集大成者,将所有的操作全部封装起来。通过源码得到该netty的该实现只适用于批量io的情况。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容