Flink SQL实战演练之CDC Connector

简介:公司实时项目组处理的业务数据以前是由业务团队把数据push到rabbit mq,然后我们通过flink转运到kafka,然后再做实时计算的,由于新业务逻辑变化会较大,导致推送过来的数据偶尔会出现偏差,故项目组决定直接通过binlog的方式对接业务数据,所以最近对cdc connector相关的知识点进行整理。

前言

CDC(Change Data Capture)即变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,笔者将会在以下几个方面介绍它的使用。

  • 相关依赖
  • 使用fink cdc connector直连binlog,DataStream API实现
  • 使用fink cdc connector直连binlog,SQL实现
  • canal基本配置,采集数据到Kafka
  • flink sql对接kafka数据进行实时计算
  • 以changelog json的方式转换canal数据流
  1. 添加cdc connector所需依赖
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${cdc.version}</version>
</dependency>
  1. DataStream API实现binlog采集和实时计算
package com.dpf.flink;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Properties;

public class BinlogStreamExample {
    public static void main(String[] args) throws Exception {
        Properties extralPro = new Properties();
        extralPro.setProperty("AllowPublicKeyRetrieval", "true");
        SourceFunction<String> sourceFunction = MySQLSource.<SourceRecord>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("renren_fast") // monitor all tables under inventory database
            .username("root")
            .password("12345678")
            .debeziumProperties(extralPro)
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().registerKryoType(SourceRecord.class);
        env.setParallelism(1);
        final DataStreamSource<String> source = env.addSource(sourceFunction);
        source.print();

        env.execute();
    }

    public static String extractBeforeData(Struct value, Schema schema) {
        final Struct before = value.getStruct("before");
        final List<Field> fields = before.schema().fields();
        JSONObject jsonObject = new JSONObject();
        for (Field field : fields) {
            jsonObject.put(field.name(), before.get(field));
        }
        return jsonObject.toJSONString();
    }

    public static String extractAfterData(Struct value, Schema schema) {
        final Struct after = value.getStruct("after");
        final List<Field> fields = after.schema().fields();
        JSONObject jsonObject = new JSONObject();
        for (Field field : fields) {
            jsonObject.put(field.name(), after.get(field));
        }
        return jsonObject.toJSONString();
    }

    public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {

        @Override
        public TypeInformation getProducedType() {
            return TypeInformation.of(String.class);
        }

        @Override
        public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
            final Operation op = Envelope.operationFor(sourceRecord);
            final String source = sourceRecord.topic();
            Struct value = (Struct) sourceRecord.value();
            final Schema schema = sourceRecord.valueSchema();
            if (op != Operation.CREATE && op != Operation.READ) {
                if (op == Operation.DELETE) {
                    String data = extractBeforeData(value, schema);
                    String record = new JSONObject()
                        .fluentPut("source", source)
                        .fluentPut("data", data)
                        .fluentPut("op", RowKind.DELETE.shortString())
                        .toJSONString();
                    collector.collect(record);

                } else {
                    String beforeData = extractBeforeData(value, schema);
                    String beforeRecord = new JSONObject()
                        .fluentPut("source", source)
                        .fluentPut("data", beforeData)
                        .fluentPut("op", RowKind.UPDATE_BEFORE.shortString())
                        .toJSONString();
                    collector.collect(beforeRecord);
                    String afterData = extractAfterData(value, schema);
                    String afterRecord = new JSONObject()
                        .fluentPut("source", source)
                        .fluentPut("data", afterData)
                        .fluentPut("op", RowKind.UPDATE_AFTER.shortString())
                        .toJSONString();
                    collector.collect(afterRecord);

                }
            } else {
                String data = extractAfterData(value, schema);

                String record = new JSONObject()
                    .fluentPut("source", source)
                    .fluentPut("data", data)
                    .fluentPut("op", RowKind.INSERT.shortString())
                    .toJSONString();
                collector.collect(record);
            }
        }
    }
}

该方法中默认的序列化类是StringDebeziumDeserializationSchema和RowDataDebeziumDeserializeSchema,显然不能满足我们的需求,笔者自定义了JsonDebeziumDeserializeSchema序列化类,将原始数据转化为了json格式,后续用户可以根据不同的表封装成相应的POJO类。用户在使用的过程中还需要将AllowPublicKeyRetrieval参数设置为true,这是mysql权限相关的配置。

  1. Flink SQL实现binlog数据采集和实时计算
package com.dpf.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class BinlogTableExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        tableEnvironment.executeSql("" +
            " CREATE TABLE mysql_binlog ( " +
            "  user_id INT, " +
            "  username STRING, " +
            "  mobile STRING, " +
            "  password STRING, " +
            "  create_time STRING " +
            " ) WITH ( " +
            "  'connector' = 'mysql-cdc', " +
            "  'hostname' = 'localhost', " +
            "  'port' = '3306', " +
            "  'username' = 'root', " +
            "  'password' = '12345678', " +
            "  'database-name' = 'renren_fast', " +
            "  'table-name' = 'tb_user' " +
            " )" +
            "");

        tableEnvironment.executeSql("" +
            "CREATE TABLE kafka_binlog ( " +
            "  user_id INT, " +
            "  user_name STRING, " +
            "  mobile STRING, " +
            "  password STRING, " +
            "  create_time STRING, " +
            "  PRIMARY KEY (user_id) NOT ENFORCED" +
            ") WITH ( " +
            "  'connector' = 'upsert-kafka', " +
            "  'topic' = 'mysql_binlog', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'key.format' = 'json', " +
            "  'value.format' = 'json' " +
            ")" +
            "");

        tableEnvironment.executeSql("insert into kafka_binlog select * from mysql_binlog");
    }
}

此方法明显简洁很多,需要注意的是数据在转运到kafka的时候,kafka connector必须设置为upsert语意,原因是cdc connector返回的是回撤流,到kafka这块如果变成追加流的话,写入会报错:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafka_binlog' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_binlog]], fields=[user_id, username, mobile, password, create_time])
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:348)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:337)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:336)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Range.foreach(Range.scala:160)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  1. canal安装后面抽时间细讲,这里给出binlog大概的数据格式
{
  "data": [
    {
      "user_id": "2",
      "username": "aaa",
      "mobile": "ddd",
      "password": "ddd",
      "create_time": "2018-03-23 22:37:41"
    }
  ],
  "database": "renren_fast",
  "es": 1624418195000,
  "id": 2,
  "isDdl": false,
  "mysqlType": {
    "user_id": "bigint",
    "username": "varchar(50)",
    "mobile": "varchar(20)",
    "password": "varchar(64)",
    "create_time": "datetime"
  },
  "old": [
    {
      "password": "ccc"
    }
  ],
  "pkNames": [
    "user_id"
  ],
  "sql": "",
  "sqlType": {
    "user_id": -5,
    "username": 12,
    "mobile": 12,
    "password": 12,
    "create_time": 93
  },
  "table": "tb_user",
  "ts": 1624418196154,
  "type": "UPDATE"
}
  1. Flink SQL对接Kafka中json格式的binlog数据
package com.dpf.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 读取kafka中canal json的数据,解析之后以json的方式存入kafka dwd层
 */
public class KafkaTranslateJson {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        tableEnvironment.executeSql("" +
            "CREATE TABLE ods_binlog ( " +
            "  user_id INT, " +
            "  username STRING, " +
            "  mobile STRING, " +
            "  password STRING, " +
            "  create_time STRING " +
            ") WITH ( " +
            "  'connector' = 'kafka', " +
            "  'topic' = 'ods_binlog', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'properties.enable.auto.commit' = 'false', " +
            "  'properties.session.timeout.ms' = '90000', " +
            "  'properties.request.timeout.ms' = '325000', " +
            "  'value.format' = 'canal-json' " +
            ")" +
            "");

        tableEnvironment.executeSql("" +
            "CREATE TABLE kafka_binlog ( " +
            "  user_id INT, " +
            "  user_name STRING, " +
            "  mobile STRING, " +
            "  password STRING, " +
            "  create_time STRING, " +
            "  PRIMARY KEY (user_id) NOT ENFORCED" +
            ") WITH ( " +
            "  'connector' = 'upsert-kafka', " +
            "  'topic' = 'mysql_binlog', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'key.format' = 'json', " +
            "  'value.format' = 'json' " +
            ")" +
            "");

        tableEnvironment.executeSql("insert into kafka_binlog select * from ods_binlog");
    }
}

这里对Flink CDC原生的功能进行了扩展,为了实现根据canal type的过滤,源码修改如下:

  • 修改org.apache.flink.formats.json.canal.CanalJsonDecodingFormat
static enum ReadableMetadata {
        DATABASE("database", (DataType)DataTypes.STRING().nullable(), DataTypes.FIELD("database", DataTypes.STRING()), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.getString(pos);
            }
        }),
        TABLE("table", (DataType)DataTypes.STRING().nullable(), DataTypes.FIELD("table", DataTypes.STRING()), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.getString(pos);
            }
        }),
        //todo 修改元数据,添加binlog-type
        BINLOG_TYPE("binlog-type", (DataType)DataTypes.STRING().nullable(), DataTypes.FIELD("type", DataTypes.STRING()), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.getString(pos);
            }
        }),
        SQL_TYPE("sql-type", (DataType)DataTypes.MAP((DataType)DataTypes.STRING().nullable(), (DataType)DataTypes.INT().nullable()).nullable(), DataTypes.FIELD("sqlType", DataTypes.MAP((DataType)DataTypes.STRING().nullable(), (DataType)DataTypes.INT().nullable())), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.getMap(pos);
            }
        }),
        PK_NAMES("pk-names", (DataType)DataTypes.ARRAY(DataTypes.STRING()).nullable(), DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.getArray(pos);
            }
        }),
        INGESTION_TIMESTAMP("ingestion-timestamp", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), DataTypes.FIELD("ts", DataTypes.BIGINT()), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.isNullAt(pos) ? null : TimestampData.fromEpochMillis(row.getLong(pos));
            }
        }),
        EVENT_TIMESTAMP("event-timestamp", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), DataTypes.FIELD("es", DataTypes.BIGINT()), new MetadataConverter() {
            private static final long serialVersionUID = 1L;

            public Object convert(GenericRowData row, int pos) {
                return row.isNullAt(pos) ? null : TimestampData.fromEpochMillis(row.getLong(pos));
            }
        });

        final String key;
        final DataType dataType;
        final Field requiredJsonField;
        final MetadataConverter converter;

        private ReadableMetadata(String key, DataType dataType, Field requiredJsonField, MetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.requiredJsonField = requiredJsonField;
            this.converter = converter;
        }
    }

不改会报如下错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Invalid metadata key 'value.binlog-type' in column 'binlog_type' of table 'default_catalog.default_database.ods_binlog'. The DynamicTableSource class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' supports the following metadata keys for reading:
value.database
value.table
value.sql-type
value.pk-names
value.ingestion-timestamp
value.event-timestamp
topic
partition
headers
leader-epoch
offset
timestamp
timestamp-type
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$6(DynamicSourceUtils.java:403)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateAndApplyMetadata(DynamicSourceUtils.java:395)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:158)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
  • 修改org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema
    private static RowType createJsonRowType(
            DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
        // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
        DataType root =
                DataTypes.ROW(
                        DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
                        DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)),
                        DataTypes.FIELD("type", DataTypes.STRING()),
                        ReadableMetadata.DATABASE.requiredJsonField,
                        ReadableMetadata.TABLE.requiredJsonField);
        // append fields that are required for reading metadata in the root
        final List<DataTypes.Field> rootMetadataFields =
                readableMetadata.stream()
                        .filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE && m != ReadableMetadata.BINLOG_TYPE)
                        .map(m -> m.requiredJsonField)
                        .distinct()
                        .collect(Collectors.toList());
        return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
    }

不改会报如下错误:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [type]
    at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272)
    at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157)
    at org.apache.flink.table.types.utils.DataTypeUtils.appendRowFields(DataTypeUtils.java:181)
    at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.createJsonRowType(CanalJsonDeserializationSchema.java:370)
    at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.<init>(CanalJsonDeserializationSchema.java:111)
    at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.<init>(CanalJsonDeserializationSchema.java:61)
    at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema$Builder.build(CanalJsonDeserializationSchema.java:188)
    at org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.createRuntimeDecoder(CanalJsonDecodingFormat.java:104)
    at org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.createRuntimeDecoder(CanalJsonDecodingFormat.java:46)
    at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:427)
    at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:199)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
  1. 以changelog json的方式转换canal数据流
    connector地址:https://github.com/ververica/flink-cdc-connectors/tree/master/flink-format-changelog-json
    实验用例:
package com.dpf.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 读取kafka中canal json的数据,解析之后以json的方式存入kafka dwd层
 */
public class KafkaTranslateChangelogJson {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        tableEnvironment.executeSql("" +
            "CREATE TABLE ods_binlog ( " +
            "  user_id INT, " +
            "  username STRING, " +
            "  mobile STRING, " +
            "  password STRING, " +
            "  create_time STRING " +
            ") WITH ( " +
            "  'connector' = 'kafka', " +
            "  'topic' = 'ods_binlog', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'properties.enable.auto.commit' = 'false', " +
            "  'properties.session.timeout.ms' = '90000', " +
            "  'properties.request.timeout.ms' = '325000', " +
            "  'value.format' = 'canal-json' " +
            ")" +
            "");

        tableEnvironment.executeSql("" +
            "CREATE TABLE dwd_binlog ( " +
            "  user_id INT, " +
            "  user_name STRING, " +
            "  mobile STRING, " +
            "  password STRING, " +
            "  create_time STRING " +
            ") WITH ( " +
            "  'connector' = 'kafka', " +
            "  'topic' = 'dwd_binlog', " +
            "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
            "  'format' = 'changelog-json'" +
            ")" +
            "");

        tableEnvironment.executeSql("insert into dwd_binlog select * from ods_binlog");
    }
}

输出结果

{
    "data": {
        "user_id": 8,
        "user_name": "gg",
        "mobile": "gg",
        "password": "gg",
        "create_time": "2020-03-23 22:37:41"
    },
    "op": "+I"
}

flink-cdc相关的知识就总结到这里,讲比较基础,希望对大家的工作有帮助。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容