简析StarRocks JNI Connector及其在数据湖Reader中的应用

前言

最近在进行StarRocks与数据湖集成方面的一些工作(重点是SR 3.2与Paimon 0.6的适配),同时阅读和修改了部分代码,发现StarRocks JNI Connector是个称得上精妙的模块,为各类大数据组件适配StarRocks提供了方便的入口,从而丰富其联邦查询能力。本文尝试简单分析一下它的设计思路,并通过Paimon Reader看一下StarRocks的数据湖Reader是如何通过它实现的。

总体设计

StarRocks JNI Connector背后的思想比较简单,就是介于C++-based的BE和Java-based的大数据组件之间的抽象中间层,可以直接复用Java SDK,规避了对BE代码的侵入以及使用C++访问大数据存储的诸多不便。示意图如下。

可见,JNI Connector在Java一侧提供了统一的规约,接入方实现open() / getNext() / close()三个方法(均位于ConnectorScanner抽象类下),并提供必要的信息(如数据类型等),就可以将数据读出。在JNI Connector内部,会把要处理的数据写入C++能识别到的native memory区域(对于Java来说则是堆外内存)。BE侧通过读取这部分内存进行Scan操作。ConnectorScanner抽象类的代码如下。

public abstract class ConnectorScanner {
    private OffHeapTable offHeapTable;
    private String[] fields;
    private ColumnType[] types;
    private int tableSize;

    /**
     * Initialize the reader with parameters passed by the class constructor and allocate necessary resources.
     * Developers can call {@link ConnectorScanner#initOffHeapTableWriter(ColumnType[], String[], int)} method here
     * to allocate memory spaces.
     */
    public abstract void open() throws IOException;

    /**
     * Close the reader and release resources.
     */
    public abstract void close() throws IOException;

    /**
     * Scan original data and save it to off-heap table.
     *
     * @return The number of rows scanned.
     * The specific implementation needs to call the {@link ConnectorScanner#appendData(int, Object)} method
     * to save data to off-heap table.
     * The number of rows scanned must less than or equal to {@link ConnectorScanner#tableSize}
     */
    public abstract int getNext() throws IOException;

    /**
     * This method need be called before {@link ConnectorScanner#getNext()}
     *
     * @param requiredTypes  column types to scan
     * @param requiredFields columns names to scan
     * @param fetchSize      number of rows
     */
    protected void initOffHeapTableWriter(ColumnType[] requiredTypes, String[] requiredFields, int fetchSize) {
        this.tableSize = fetchSize;
        this.types = requiredTypes;
        this.fields = requiredFields;
    }

    protected void appendData(int index, ColumnValue value) {
        offHeapTable.appendData(index, value);
    }

    protected int getTableSize() {
        return tableSize;
    }

    public OffHeapTable getOffHeapTable() {
        return offHeapTable;
    }

    public long getNextOffHeapChunk() throws IOException {
        initOffHeapTable();
        int numRows = 0;
        try {
            numRows = getNext();
        } catch (IOException e) {
            releaseOffHeapTable();
            throw e;
        }
        return finishOffHeapTable(numRows);
    }

    private void initOffHeapTable() {
        offHeapTable = new OffHeapTable(types, fields, tableSize);
    }

    private long finishOffHeapTable(int numRows) {
        offHeapTable.setNumRows(numRows);
        return offHeapTable.getMetaNativeAddress();
    }

    protected void releaseOffHeapColumnVector(int fieldId) {
        offHeapTable.releaseOffHeapColumnVector(fieldId);
    }

    protected void releaseOffHeapTable() {
        if (offHeapTable != null) {
            offHeapTable.close();
        }
    }
}

从上述设计和代码可以看出,JNI Connector需要特别关注的点有两个:一是如何在读取时兼容不同大数据组件的存储类型(ColumnValueColumnType),二是如何保证BE侧正确而高效地访问包含外表数据的内存区域(OffHeapColumnVectorOffHeapTable)。下面分别讨论。

类型兼容性

JNI Connector设计了接口ColumnValue用来表示不同组件的不同数据类型的取值规范,目前有三种实现,分别对应Hive、Hudi和Paimon,类图如下。

可见是提供了对常见基础类型和复合类型的支持。以paimon-reader模块中的PaimonColumnValue为例,部分基础类型取值的部分代码如下。

    @Override
    public long getLong() {
        return (long) fieldData;
    }

    @Override
    public double getDouble() {
        return (double) fieldData;
    }

    @Override
    public String getString(ColumnType.TypeValue type) {
        if (type == ColumnType.TypeValue.DATE) {
            int epoch = (int) fieldData;
            LocalDate date = LocalDate.ofEpochDay(epoch);
            return PaimonScannerUtils.formatDate(date);
        } else {
            return fieldData.toString();
        }
    }

    @Override
    public String getTimestamp(ColumnType.TypeValue type) {
        if (type == ColumnType.TypeValue.DATETIME_MILLIS) {
            Timestamp ts = (Timestamp) fieldData;
            LocalDateTime dateTime = ts.toLocalDateTime();
            return PaimonScannerUtils.formatDateTime(dateTime);
        } else {
            return fieldData.toString();
        }
    }

对于复合类型(即Array、Map和Struct),则需要分别处理每个元素,如PaimonColumnValue#unpackMap()方法:

    @Override
    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
        InternalMap map = (InternalMap) fieldData;
        DataType keyType;
        DataType valueType;
        if (dataType instanceof MapType) {
            keyType = ((MapType) dataType).getKeyType();
            valueType = ((MapType) dataType).getValueType();
        } else {
            throw new UnsupportedOperationException("Unsupported type: " + dataType);
        }

        InternalArray keyArray = map.keyArray();
        toPaimonColumnValue(keys, keyArray, keyType);

        InternalArray valueArray = map.valueArray();
        toPaimonColumnValue(values, valueArray, valueType);
    }

    private void toPaimonColumnValue(List<ColumnValue> values, InternalArray array, DataType dataType) {
        for (int i = 0; i < array.size(); i++) {
            PaimonColumnValue cv = null;
            Object o = InternalRowUtils.get(array, i, dataType);
            if (o != null) {
                cv = new PaimonColumnValue(o, dataType);
            }
            values.add(cv);
        }
    }

而上文中出现的ColumnType类主要是显式声明了StarRocks支持的19种定长基础类型(INT、LONG、TIMESTAMP等等)、2种变长基础类型(VARCHAR、BINARY)和3种复合类型,以及提供了解析复合类型的helper方法,读者可自行参考代码。

堆外内存布局与访问

如果想让SR BE能够处理JNI Connector读取的表数据,就必然要求堆外内存中的数据存储方法是BE原生可以识别的。BE的C++代码中对上述三类数据的原生存储列举如下:

  • 定长基础类型的列FixedLengthColumn需要一个容器,为vector<CppType> _data
  • 变长基础类型的列BinaryColumnBase需要两个容器,分别为数据容器vector<uint8_t> _bytes,以及偏移量容器vector<uint32_t> _offsets,分别记录该列每一行的数据,以及每行数据对应的起始地址;
  • 复合类型的列则视情况而定,例如数组类型ArrayColumn需要两个容器ColumnPtr _elementsFixedLengthColumn<uint32_t>::Ptr _offsets(采用ColumnPtr是为了兼容嵌套类型),而映射类型MapColumn需要三个容器,读者可自行推测。

特别地,如果一个列可以为空,那么根据NullableColumn的定义,还需要一个额外的空标记容器FixedLengthColumn<uint8_t>::Ptr _null_column来存储该列每一行是否为空。

OffHeapColumnVector

基于以上知识,JNI Connector设计了基于堆外内存的外表列数据容器OffHeapColumnVector,解释一下它的几个关键属性,简单易懂。

    // 空标记,对应_null_column
    private long nulls;
    // 实际数据,对应_data
    private long data;
    // 偏移量,对应_offsets
    private long offsetData;
    // 初始化容量
    private int capacity;
    // 列类型
    private ColumnType type;
    // 空元素计数
    private int numNulls;
    // 已写入的元素数
    protected int elementsAppended;
    // 嵌套的OffHeapColumnVector,用于变长类型和复合类型
    private OffHeapColumnVector[] childColumns;

OffHeapColumnVector的内存分配采用了类似Spark Tungsten(之前的博客讲过)的风格,测试环境下会调用JVM Unsafe API,正式环境则会调用SR BE的Memory Tracker Native API。通过reserveInternal()方法,我们可以清楚地看到不同类型的内存分配逻辑。

    private void reserveInternal(int newCapacity) {
        int oldCapacity = (nulls == 0L) ? 0 : capacity;
        long oldOffsetSize = (nulls == 0) ? 0 : (capacity + 1) * 4L;
        long newOffsetSize = (newCapacity + 1) * 4L;
        int typeSize = type.getPrimitiveTypeValueSize();
        if (type.isUnknown()) {
            // don't do anything.
        } else if (typeSize != -1) {
            this.data = Platform.reallocateMemory(data, oldCapacity * typeSize, newCapacity * typeSize);
        } else if (type.isByteStorageType()) {
            this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize);
            int childCapacity = newCapacity * DEFAULT_STRING_LENGTH;
            this.childColumns = new OffHeapColumnVector[1];
            this.childColumns[0] = new OffHeapColumnVector(childCapacity, new ColumnType(type.name + "#data",
                    ColumnType.TypeValue.BYTE));
        } else if (type.isArray() || type.isMap() || type.isStruct()) {
            if (type.isArray() || type.isMap()) {
                this.offsetData = Platform.reallocateMemory(offsetData, oldOffsetSize, newOffsetSize);
            }
            int size = type.childTypes.size();
            this.childColumns = new OffHeapColumnVector[size];
            for (int i = 0; i < size; i++) {
                this.childColumns[i] = new OffHeapColumnVector(newCapacity, type.childTypes.get(i));
            }
        } else {
            throw new RuntimeException("Unhandled type: " + type);
        }
        this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
        Platform.setMemory(nulls + oldCapacity, (byte) 0, newCapacity - oldCapacity);
        capacity = newCapacity;

        if (offsetData != 0) {
            // offsetData[0] == 0 always.
            // we have to set it explicitly otherwise it's undefined value here.
            Platform.putInt(null, offsetData, 0);
        }
    }

来看一下写入和读取定长类型的方法,以INT为例,同样是Spark Tungsten风格,如同操作C++指针。Platform类实际上就是从Spark中的同名类稍加修改而来,getInt()putInt()方法均是直接调用Unsafe API。

    public int appendInt(int v) {
        reserve(elementsAppended + 1);
        putInt(elementsAppended, v);
        return elementsAppended++;
    }

    private void putInt(int rowId, int value) {
        Platform.putInt(null, data + 4L * rowId, value);
    }

    public int getInt(int rowId) {
        return Platform.getInt(null, data + 4L * rowId);
    }

至于变长类型,则需要额外写入每行的偏移量信息,读取时根据偏移量从字节流中取出对应的区块并转化即可。

    private int appendByteArray(byte[] value, int offset, int length) {
        int copiedOffset = arrayData().appendBytes(length, value, offset);
        reserve(elementsAppended + 1);
        putArrayOffset(elementsAppended, copiedOffset, length);
        return elementsAppended++;
    }

    private void putArrayOffset(int rowId, int offset, int length) {
        Platform.putInt(null, offsetData + 4L * rowId, offset);
        Platform.putInt(null, offsetData + 4L * (rowId + 1), offset + length);
    }

    public String getUTF8String(int rowId) {
        if (isNullAt(rowId)) {
            return null;
        }
        int start = getArrayOffset(rowId);
        int end = getArrayOffset(rowId + 1);
        int size = end - start;
        byte[] bytes = arrayData().getBytes(start, size);
        return new String(bytes, StandardCharsets.UTF_8);
    }

复合类型涉及到的更多是childColumns的操作,与上面的思想相通,此处不再赘述。

OffHeapTable

顾名思义,OffHeapTable是统一管理一张表对应的所有OffHeapColumnVector的组件。它的实现也非常简洁,部分代码摘录如下。

public class OffHeapTable {
    public OffHeapColumnVector[] vectors;
    public String[] fields;
    public OffHeapColumnVector meta;
    public int numRows;
    public boolean[] released;

    public OffHeapTable(ColumnType[] types, String[] fields, int capacity) {
        this.fields = fields;
        this.vectors = new OffHeapColumnVector[types.length];
        this.released = new boolean[types.length];
        int metaSize = 0;
        for (int i = 0; i < types.length; i++) {
            vectors[i] = new OffHeapColumnVector(capacity, types[i]);
            metaSize += types[i].computeColumnSize();
            released[i] = false;
        }
        this.meta = new OffHeapColumnVector(metaSize, new ColumnType("#meta", ColumnType.TypeValue.LONG));
        this.numRows = 0;
    }

    public void appendData(int fieldId, ColumnValue o) {
        vectors[fieldId].appendValue(o);
    }

    public void releaseOffHeapColumnVector(int fieldId) {
        if (!released[fieldId]) {
            vectors[fieldId].close();
            released[fieldId] = true;
        }
    }

    public long getMetaNativeAddress() {
        meta.appendLong(numRows);
        for (OffHeapColumnVector v : vectors) {
            v.updateMeta(meta);
        }
        return meta.valuesNativeAddress();
    }
}

除了列名称、行数、释放标记等必要信息,需要特别注意的是,OffHeapTable还额外维护了一个名为meta的存储元数据的OffHeapColumnVector,里面存有各个数据容器的起始内存地址,方便快速定位。更新元数据的操作如下所示。

    public void updateMeta(OffHeapColumnVector meta) {
        if (type.isUnknown()) {
            meta.appendLong(0);
        } else if (type.isByteStorageType()) {
            meta.appendLong(nullsNativeAddress());
            meta.appendLong(arrayOffsetNativeAddress());
            meta.appendLong(arrayDataNativeAddress());
        } else if (type.isArray() || type.isMap() || type.isStruct()) {
            meta.appendLong(nullsNativeAddress());
            if (type.isArray() || type.isMap()) {
                meta.appendLong(arrayOffsetNativeAddress());
            }
            for (OffHeapColumnVector c : childColumns) {
                c.updateMeta(meta);
            }
        } else {
            meta.appendLong(nullsNativeAddress());
            meta.appendLong(valuesNativeAddress());
        }
    }

从Paimon Reader到BE Scan

介绍完类型兼容和堆外内存访问的设计,接下来就可以通过Paimon Reader中的PaimonSplitScanner看看JNI Connector是如何与BE联动的。

JNI Connector强制要求ConnectorScanner的实现类传入两个固定的构造参数,分别是读取的数据行数,以及表类型特定的参数(如列信息、谓词条件等):

    public PaimonSplitScanner(int fetchSize, Map<String, String> params) {
        this.fetchSize = fetchSize;
        this.requiredFields = params.get("required_fields").split(",");
        this.nestedFields = params.getOrDefault("nested_fields", "").split(",");
        this.splitInfo = params.get("split_info");
        this.predicateInfo = params.get("predicate_info");
        this.encodedTable = params.get("native_table");
        this.classLoader = this.getClass().getClassLoader();
    }

前面已经提到接入方需要实现open() / getNext() / close()三个方法,来看下PaimonSplitScanner#open()方法的实现。

    @Override
    public void open() throws IOException {
        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
            table = PaimonScannerUtils.decodeStringToObject(encodedTable);
            parseRequiredTypes();
            initOffHeapTableWriter(requiredTypes, requiredFields, fetchSize);
            initReader();
        } catch (Exception e) {
            close();
            String msg = "Failed to open the paimon reader.";
            LOG.error(msg, e);
            throw new IOException(msg, e);
        }
    }

    private void initReader() throws IOException {
        ReadBuilder readBuilder = table.newReadBuilder();
        RowType rowType = table.rowType();
        List<String> fieldNames = PaimonScannerUtils.fieldNames(rowType);
        int[] projected = Arrays.stream(requiredFields).mapToInt(fieldNames::indexOf).toArray();
        readBuilder.withProjection(projected);
        List<Predicate> predicates = PaimonScannerUtils.decodeStringToObject(predicateInfo);
        readBuilder.withFilter(predicates);
        Split split = PaimonScannerUtils.decodeStringToObject(splitInfo);
        RecordReader<InternalRow> reader = readBuilder.newRead().executeFilter().createReader(split);
        iterator = new RecordReaderIterator<>(reader);
    }

可见,open()方法通过解析表名,获取列及类型信息,创建OffHeapTable实例,并通过调用Paimon SDK中的相关方法构造带有列裁剪、谓词下推等信息的RecordReader实例,最终产生实际读取Paimon数据的迭代器。getNext()方法就通过此迭代器读取数据,并转换为定义好的PaimonColumnValue实例,然后调用基类的方法将其写入各个OffHeapColumnVector,水到渠成。

// 最终会被ConnectorScanner#getNextOffHeapChunk()调用
    @Override
    public int getNext() throws IOException {
        try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
            int numRows = 0;
            while (iterator.hasNext() && numRows < fetchSize) {
                InternalRow row = iterator.next();
                if (row == null) {
                    break;
                }
                for (int i = 0; i < requiredFields.length; i++) {
                    Object fieldData = InternalRowUtils.get(row, i, logicalTypes[i]);
                    if (fieldData == null) {
                        appendData(i, null);
                    } else {
                        ColumnValue fieldValue = new PaimonColumnValue(fieldData, logicalTypes[i]);
                        appendData(i, fieldValue);
                    }
                }
                numRows++;
            }
            return numRows;
        } catch (Exception e) {
            close();
            String msg = "Failed to get the next off-heap table chunk of paimon.";
            LOG.error(msg, e);
            throw new IOException(msg, e);
        }
    }

最后一步,来看下BE是如何利用上面讲到的所有内容的。BE侧对应的C++类名为JniScanner,而在JniScanner::_init_jni_table_scanner()方法中,我们可以看到通过ScannerFactory工厂实例(Java代码略)获取对应的ConnectorScanner实例及其参数的逻辑:

Status JniScanner::_init_jni_table_scanner(JNIEnv* _jni_env, RuntimeState* runtime_state) {
    jclass scanner_factory_class = _jni_env->FindClass(_jni_scanner_factory_class.c_str());
    jmethodID scanner_factory_constructor = _jni_env->GetMethodID(scanner_factory_class, "<init>", "()V");
    jobject scanner_factory_obj = _jni_env->NewObject(scanner_factory_class, scanner_factory_constructor);
    jmethodID get_scanner_method =
            _jni_env->GetMethodID(scanner_factory_class, "getScannerClass", "()Ljava/lang/Class;");
    _jni_scanner_cls = (jclass)_jni_env->CallObjectMethod(scanner_factory_obj, get_scanner_method);
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to init the scanner class."));
    _jni_env->DeleteLocalRef(scanner_factory_class);
    _jni_env->DeleteLocalRef(scanner_factory_obj);

    jmethodID scanner_constructor = _jni_env->GetMethodID(_jni_scanner_cls, "<init>", "(ILjava/util/Map;)V");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get a scanner class constructor."));

    jclass hashmap_class = _jni_env->FindClass("java/util/HashMap");
    jmethodID hashmap_constructor = _jni_env->GetMethodID(hashmap_class, "<init>", "(I)V");
    jobject hashmap_object = _jni_env->NewObject(hashmap_class, hashmap_constructor, _jni_scanner_params.size());
    jmethodID hashmap_put =
            _jni_env->GetMethodID(hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get the HashMap methods."));
}

以及初始化ConnectorScanneropen() / getNext() / close()相关方法的逻辑:

Status JniScanner::_init_jni_method(JNIEnv* _jni_env) {
    // init jmethod
    _jni_scanner_open = _jni_env->GetMethodID(_jni_scanner_cls, "open", "()V");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `open` jni method"));

    _jni_scanner_get_next_chunk = _jni_env->GetMethodID(_jni_scanner_cls, "getNextOffHeapChunk", "()J");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `getNextOffHeapChunk` jni method"));

    _jni_scanner_close = _jni_env->GetMethodID(_jni_scanner_cls, "close", "()V");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `close` jni method"));

    _jni_scanner_release_column = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapColumnVector", "(I)V");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `releaseOffHeapColumnVector` jni method"));

    _jni_scanner_release_table = _jni_env->GetMethodID(_jni_scanner_cls, "releaseOffHeapTable", "()V");
    RETURN_IF_ERROR(_check_jni_exception(_jni_env, "Failed to get `releaseOffHeapTable` jni method"));
    return Status::OK();
}

BE通过JNI调用open()getNextOffHeapChunk()方法取得数据并放入native memory后,继续调用JniScanner::_fill_column()方法获取对应列的行数据和元数据(即内存地址),再根据不同类型调用不同的append方法。需要注意,由于OffHeapColumnVector固定包含空标记字段,所以这里通过内存数据还原出来的都是NullableColumn,并通过调用C++的memcpy()函数将元数据中指向的内存区域复制到实际的NullableColumn里。

Status JniScanner::_fill_column(FillColumnArgs* pargs) {
    FillColumnArgs& args = *pargs;
    if (args.must_nullable && !args.column->is_nullable()) {
        return Status::DataQualityError(fmt::format("NOT NULL column[{}] is not supported.", args.slot_name));
    }

    void* ptr = next_chunk_meta_as_ptr();
    if (ptr == nullptr) {
        // struct field mismatch.
        args.column->append_default(args.num_rows);
        return Status::OK();
    }

    if (args.column->is_nullable()) {
        // if column is nullable, we parse `null_column`,
        // and update `args.nulls` and set `data_column` to `args.column`
        bool* null_column_ptr = static_cast<bool*>(ptr);
        auto* nullable_column = down_cast<NullableColumn*>(args.column);

        NullData& null_data = nullable_column->null_column_data();
        null_data.resize(args.num_rows);
        memcpy(null_data.data(), null_column_ptr, args.num_rows);
        nullable_column->update_has_null();

        auto* data_column = nullable_column->data_column().get();
        pargs->column = data_column;
        pargs->nulls = null_data.data();
    } else {
        // otherwise we skip this chunk meta, because in Java side
        // we assume every column starts with `null_column`.
    }

    LogicalType column_type = args.slot_type.type;
    if (column_type == LogicalType::TYPE_BOOLEAN) {
        RETURN_IF_ERROR((_append_primitive_data<TYPE_BOOLEAN>(args)));
    } else if (column_type == LogicalType::TYPE_TINYINT) {
        RETURN_IF_ERROR((_append_primitive_data<TYPE_TINYINT>(args)));
    } else if (column_type == LogicalType::TYPE_SMALLINT) {
        RETURN_IF_ERROR((_append_primitive_data<TYPE_SMALLINT>(args)));
    } 
    // ...以下各种类型略去...
    else {
        return Status::InternalError(fmt::format("Type {} is not supported for off-heap table scanner", column_type));
    }
    return Status::OK();
}

这样,Paimon Reader读取的数据就转交到了BE Scan流程,接下来就可以进行后续的计算了。

The End

放松一下,准备去看双红会。

民那晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容