Kafka Connect For MySQL 实现增量数据同步

前言

最近公司需要搭建基于 Flink 框架的实时数仓,为了保证数仓的稳定性,决定引用消息中间件 Kafka 。Kafka 的作用在于 "削峰填谷" ,所谓的“削峰填谷”就是指缓冲上下游瞬时突发流量,使其更平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下游系统可能会直接被压垮导致全链路服务“雪崩”。但是,一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的“峰”填满到“谷”中,避免了流量的震荡。消息引擎系统的另一大好处在于发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。

那么问题来了,如何将源库中的全量、增量数据导入至 Kafka 主题中呢?本文章将采用 Kafka connector 的方式连接 MySQL ,来实现端到端的数据同步。

JDBC Connector

JDBC connector 允许您通过JDBC驱动程序将任何关系型数据库中的数据导入到Kafka的主题Topic中。通过使用JDBC,这个连接器可以支持各种数据库,不需要为每个数据库定制代码。
通过定期地执行SQL查询语句并为结果集中的每一行创建输出记录来加载数据。在默认情况下,在一个数据库中的所有表都会被复制,每个表都复制到自己的输出主题。数据库那些新的或删除的表被监视并自动适应调整。当从表中复制数据时,连接器可以仅仅加载新增修改的行通过指定哪些列应当被用来发现新增或修改的数据。

测试环境

IP 操作系统 配置 服务器用途
172.21.22.186 CentOS Linux release 7.3.1611 (Core) 4vC/8Gb/400Gb 测试集群
172.21.22.187 CentOS Linux release 7.3.1611 (Core) 4vC/8Gb/400Gb 测试集群
172.21.22.188 CentOS Linux release 7.3.1611 (Core) 4vC/8Gb/400Gb 测试集群
172.21.23.238 CentOS Linux release 7.3.1611 (Core) 4vC/8Gb/400Gb MySQL/1

前置条件

主机名 JDK ZK Kafka
master jdk1.8.0_251 zookeeper-3.4.14 kafka_2.11-2.2.0
slave1 jdk1.8.0_251 zookeeper-3.4.14 kafka_2.11-2.2.0
slave2 jdk1.8.0_251 zookeeper-3.4.14 kafka_2.11-2.2.0

以下测试请在能够正常生产消费的 kafka 集群中进行

1. incrementing 自增模式

准备工作

  1. 连接 mysql 创建测试库、表
[bigdata@slave2 ~]$ mysql -h 172.21.23.238 -u root -p
[bigdata@slave2 ~]$ # 密码省略
mysql> create database test_kafka_connector;
mysql> use test_kafka_connector;

# 创建 `test_kafka_connector` 库 omneo_incrementing测试表
CREATE TABLE IF NOT EXISTS test_kafka_connector.omneo_incrementing(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 创建 `kafka_connector` 库 kafka_omneo_incrementing测试表
CREATE TABLE IF NOT EXISTS kafka_connector.kafka_omneo_incrementing(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    PRIMARY KEY (pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  1. 切换至 $KAFKA_HOME/config 目录,修改/新增 source-incrementing-mysql.properties ,内容如下:
name=mysql-a-source-omneo
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://172.21.23.238:3306/test_kafka_connector?user=root&password=1qaz@WSX
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=omneo_incrementing
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-
  1. 修改、新增 sink-incrementing-mysql.properties ,内容如下:
name=mysql-a-sink-omneo
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-omneo_incrementing
# 配置JDBC链接
connection.url=jdbc:mysql://172.21.23.238:3306/kafka_connector?user=root&password=1qaz@WSX
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafka_omneo_incrementing
  1. 创建 kafka topic mysql-kafka-omneo-incrementing,命令如下
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-omneo-incrementing
  1. 启动 kafka connect ,命令如下
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/source-incrementing-mysql.properties $KAFKA_HOME/config/sink-incrementing-mysql.properties

启动成功如下图所示


  1. test_kafka_connector.omneo_incrementing 插入10条测试数据
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(1,'3280983542a7-5bdb-d564-3133-276ae3ce','Nicole','Spaggiari','06/03/1942','34270','Miami','female','maried');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(2,'9089753542a7-5bdb-d564-3133-276ae3ce','Gaelle','O'Dell','23/09/2002','05274','Ottawa','unknown','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(3,'8939383542a7-5bdb-d564-3133-276ae3ce','Kevin','Spaggiari','23/05/1965','H2M1N6','Maugio','unknown','single');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(4,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier','O'Dell','30/06/1998','05274','Cocagne','male','maried');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(5,'9186073542a7-5bdb-d564-3133-276ae3ce','Gauthier','Garbuet','20/03/2000','34920','Ballancourt','female','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(6,'8983983542a7-5bdb-d564-3133-276ae3ce','Gaelle','Berrouard','20/05/1946','91610','Cocagne','female','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(7,'4518023542a7-5bdb-d564-3133-276ae3ce','Nicole','Spaggiari','30/06/1998','34920','Le Cres','male','veuve');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(8,'9721973542a7-5bdb-d564-3133-276ae3ce','Maurice','Berrouard','23/05/1965','H2M1N6','Ballancourt','male','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(9,'4037673542a7-5bdb-d564-3133-276ae3ce','Gaelle','Berrouard','30/06/1998','H2M2V5','Ballancourt','male','single');

插入成功如下图所示


  1. 启动 kafka consumer 消费 mysql-kafka-omneo_incrementing
kafka-console-consumer.sh --bootstrap-server master:9092 --topic mysql-kafka-omneo_incrementing --from-beginning

观察 consumer 控制台,发现 omneo 表中的10条数据已经生产至了 mysql-kafka_omneo 主题当中

8. 再插入一条数据

insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(10,'6633863542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'23/09/2002','H2M2V5','Maugio','female','veuve');
  1. 观察 consumer 控制台,发现 mysql-kafka_omneo 主题中 pid=10 条数据被消费到了控制台。
  1. 更新 pid=10 的 firstname="world"
INSERT INTO omneo VALUES (10,'6633863542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'23/09/2002','H2M2V5','Maugio','female','veuve') ON DUPLICATE KEY UPDATE firstname="world";
  1. 查看 mysql 中的 omneo 表发现值已经被改变
  1. consumer 控制台 pid=10 的记录值并没有被更新

incrementing 结论

在 JDBC Sink Connector 官网中指出insert.mode有且仅有两个值insert.mode=insert 只接收标准的INSERT SQL新增语句。而insert.mode=upsert 接收新增和更新,当对主键修改时也可以洞察并且输出。而insert是无法满足此要求的,因此根据实际业务使用的场景选择insert.mode。

我将 insert.mode=insert 修改为 insert.mode=upsert,再次运行修改过后的数据并没有将修改过后的最新数据生产至 kafka topic
修改字段值之前

修改字段值之后

我并没看出 insertupsert 两个参数的区别,希望测试成功的人留言指正或者微信指正,感谢!!!

2. timestamp 时间戳模式

  1. 创建 test_kafka_connector 库 omneo_timestamp 测试表
CREATE TABLE IF NOT EXISTS test_kafka_connector.omneo_timestamp(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    commenttime timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

注意timestamp必须指定not null,否则会报错! (无法使用时间戳列进行增量查询因为时间戳字段是可为空的)

  1. 创建 kafka_connector 库 kafka_omneo_timestamp 测试表
CREATE TABLE IF NOT EXISTS kafka_connector.kafka_omneo_timestamp(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    commenttime timestamp NULL DEFAULT NULL,
    PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
  1. 切换至 $KAFKA_HOME/config 目录,修改/新增 source-timestamp-mysql.properties ,内容如下:
name=mysql-a-source-omneo
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://172.21.23.238:3306/test_kafka_connector?user=root&password=1qaz@WSX
# incrementing  自增
mode=timestamp
# 自增字段  pid
timestamp.column.name=commenttime
# 白名单表  person
table.whitelist=omneo_timestamp
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-
  1. 修改、新增 source-timestamp-mysql.properties ,内容如下:
name=mysql-a-sink-omneo
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-omneo
# 配置JDBC链接
connection.url=jdbc:mysql://172.21.23.238:3306/kafka_connector?user=root&password=1qaz@WSX
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafka_omneo_timestamp
  1. 创建 kafka topic mysql-kafka-omneo-timestamp,命令如下
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-omneo_timestamp
  1. 启动 kafka connect ,命令如下
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/source-timestamp-mysql.properties $KAFKA_HOME/config/sink-timestamp-mysql.properties
  1. test_kafka_connector.omneo_timestamp 插入4条测试数据
insert into omneo_timestamp values(1,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:00:01');
insert into omneo_timestamp values(2,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:03:45');
insert into omneo_timestamp values(3,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:06:01');
insert into omneo_timestamp values(4,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:10:01');

插入成功如下图所示

  1. 观察 consumer 控制台,发现 mysql-kafka-omneo_timestamp 主题数据被消费到了控制台。
  1. 修改 pid 为 24firstname,并修改业务时间字段 commenttime
update omneo_timestamp set firstname = "world" ,commenttime="2018-12-20 15:55:10" where pid in(2,4);

修改完成如下所示

  1. 观察 consumer 控制台,发现 mysql-kafka-omneo_timestamp 主题数据产生了变化如下

timestamp 结论

  1. 如果修改的时间戳早于latest time ,则不会洞察到更新。例如MySQL中的now()获取当前时间就是很好的能被获取到的例子。
  2. 源表向目标表传输数据,假设有两条(或以上)的数据行拥有同样的时间戳,如果在传输第二条的过程中崩溃,恢复过后第二条将会被丢失,因为latest time已经被记录过了,他只会去找更新的下一次时间。这种方式虽然能获取到update更新,但是不够稳健。而如果使用自增字段加时间戳字段混合的方式,即使崩溃也能记录到更新的最大ID,恢复之后可以被找到不会丢失。因此我们更推荐第三种方式!timestamp+incrementing

incrementing + timestamp 混合模式

1.创建 test_kafka_connector 库 omneo_incrementing_timestamp 测试表

CREATE TABLE IF NOT EXISTS test_kafka_connector.omneo_incrementing_timestamp(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    commenttime timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
  1. 创建 kafka_connector 库 kafka_omneo_incrementing_timestamp 测试表
CREATE TABLE IF NOT EXISTS kafka_connector.kafka_omneo_incrementing_timestamp(
    pid int(11) NOT NULL AUTO_INCREMENT,
    uuid varchar(100) NOT NULL,
    firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
    birthdate varchar(20),
    postalcode varchar(20),
    city varchar(20),
    sexe varchar(20),
    status varchar(20),
    commenttime timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
  1. 切换至 $KAFKA_HOME/config 目录,修改/新增 source-incrementing-timestamp-mysql.properties
name=mysql-a-source-omneo_incrementing
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://172.21.23.238:3306/test_kafka_connector?user=root&password=1qaz@WSX
# incrementing  自增
mode=timestamp+incrementing
# 时间戳字段
timestamp.column.name=commenttime
# 自增字段  pid
incrementing.column.name=pid
# 白名单表  person
table.whitelist=omneo_incrementing_timestamp
# topic前缀   mysql-kafka-
topic.prefix=mysql-kafka-
  1. 修改、新增 sink-incrementing-timestamp-mysql.properties
name=mysql-a-sink-omneo
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-omneo_incrementing_timestamp
# 配置JDBC链接
connection.url=jdbc:mysql://172.21.23.238:3306/kafka_connector?user=root&password=1qaz@WSX
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafka_omneo_incrementing_timestamp
table.name.format=kafka_omneo_incrementing_timestamp
  1. 创建 kafka topic mysql-kafka-omneo-incrementing-timestamp,命令如下
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-omneo_incrementing_timestamp
  1. 启动 kafka connect ,命令如下
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/source-incrementing-timestamp-mysql.properties $KAFKA_HOME/config/sink-incrementing-timestamp-mysql.properties
  1. test_kafka_connector.omneo_incrementing_timestamp 插入4条测试数据
insert into omneo_incrementing_timestamp values(1,"0049683542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H2M2V5","Ballancourt","male","en couple","2020-05-09 11:01:54");
insert into omneo_incrementing_timestamp values(2,"8338623542a7-5bdb-d564-3133-276ae3ce","Gauthier","Garbuet","23/05/1965","05274","Cocagne","female","maried","2020-05-09 11:01:54");
insert into omneo_incrementing_timestamp values(3,"3374573542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H0H0H0","Ottawa","male","en couple","2020-05-09 11:01:54");
insert into omneo_incrementing_timestamp values(4,"5494133542a7-5bdb-d564-3133-276ae3ce","Nicole","Garbuet","01/11/1977","H0H0H0","Maugio","unknown","single","2020-05-09 11:01:54");
image
  1. 观察 consumer 控制台,发现 mysql-kafka-omneo_incrementing_timestamp 主题数据被消费到了控制台。
kafka-console-consumer.sh --bootstrap-server master:9092 --topic mysql-kafka-omneo_incrementing_timestamp --from-beginning
image
  1. 修改 pid 为 24firstname,并修改业务时间字段 commenttime
update omneo_incrementing_timestamp set firstname = "world" ,commenttime="2020-12-20 15:55:10" where pid in(2,4);

修改完成如下所示

image
  1. 观察 consumer 控制台,发现 mysql-kafka-omneo_incrementing_timestamp 主题,发现手动修改的两条数据并没有被消费到控制台。
image

timestamp + incrementing 混合模式结论

由以上测试结果可知,这次的测试不知道是我的粗心导致哪些细节的地方没有注意到,还是 kafka connect 的 BUG,后续我还会继续去排查问题的原因,如果有小伙伴测试成功了,希望小伙伴在留言/私信不吝赐教。

虽然本人测试失败了,但是还是推荐大家使用 canal 来完成 MySQL 数据近实时同步至 Kafka ,因为 kafka connect 是通过 select 的语句将 Mysql中的数据查询出来然后再写入 kafka ,这里会有一个瓶颈问题 当查询的数据量大于千万级别的时候,查询速度会特别的慢(没有建索引),而 canal 则是通过监控 MySQL 的 binlog 日志来完成数据同步的,大数据量的场景下建议使用 canal

请参考笔者另一篇文章:Canal + Kafka 实现 MySQL 的 binlog 近实时同步

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342