Kafka

概念:
Kafka作为一个分布式的流平台具有三个关键能力:
1.发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统
2.以容错的方式存储消息(流)
3.在消息流发生时处理它们

优势:
1.构建实时的流数据管道,可靠地获取系统和应用程序之间的数据
2.构建实时流的应用程序,对数据流进行转换或反应

首先几个概念:
1.kafka作为一个集群运行在一个或多个服务器上。
2.kafka集群存储的消息是以topic为类别记录的。
3.每个消息(也叫记录record,我习惯叫消息)是由一个key,一个value和时间戳构成。

kafka有四个核心API:
1.应用程序使用 Producer API 发布消息到1个或多个topic(主题)。
2.应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
3.应用程序使用StreamsAPI充当一个流处理器,从1个或多个topic消费输入流,
并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
4.ConnectorAPI允许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。

下载安装:
我是在自己的mac安装的,下面给出安装命令:
brew install kafka (会自动安装Zookeeper)

启动:
brew services start zookeeper
brew services start kafka

if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

使用kafka:
1.创建一个名为 dog 的Topic, 只有一个分区和一个备份
kafka-topics --create --zookeeper localhost:9092 --replication-factor 1 --partitions 1 --topic dog
2.查看创建的topic信息
kafka-topics --list --zookeeper localhost:9092
3.发送消息
kafka-console-producer --broker-list localhost:9092 --topic dog
4.消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic dog

生产消息 和 消费消息 见下图:


9AC323F5-A495-4A5F-BE2C-86990F4C15EC.png

kafka在linux的集群搭建

1.下载: http://kafka.apache.org/downloads.html
2.安装:

#解压到目录: cd  usr/local/kafka
tar  -zxvf  kafka_2.12-2.5.0.tgz
# 创建logs目录
mkdir  logs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs

## 修改配置文件: vim  /config/server.properties ##

# broker的全局唯一编号,不能重复
broker.id=2

# 处理网络请求的线程数量
num.network.threads=3

# 用来处理磁盘IO的线程数量
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

log.dirs=/usr/local/kafka/kafka/logs

# topic在当前broker上的分区个数
num.partitions=1

listeners=PLAINTEXT://hadoop2:9092
advertised.listeners=PLAINTEXT://hadoop2:9092

# 配置连接Zookeeper集群地址
zookeeper.connect=hadoop2:2181,hadoop3:2181,hadoop4:2181
# 配置环境变量
## sudo  vim /etc/profile
kafka_home
export KAFKA_HOME=/usr/local/kafka/kafka
export PATH=$PATH:$KAFKA_HOME/bin

## source /etc/profile
## 分发到各台机器(假设当前在hadoop2这台机器)

[root@hadoop2 kafka]# ls
kafka
[root@hadoop2 kafka]# pwd
/usr/local/kafka
[root@hadoop2 kafka]# scp -r  kafka  root@hadoop3:/usr/local/kafka

# 修改每台机器的broker.id 
## vim  /config/server.properties
broker.id = 2  (hadoop2)
broker.id = 3   (hadoop3)
broker.id = 4   (hadoop4)
# 启动集群
## 依次在hadoop2、hadoop3、hadoop4 启动kafka
[root@hadoop2 kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@hadoop2 kafka]# pwd
/usr/local/kafka/kafka
[root@hadoop2 kafka]# bin/kafka-server-start.sh config/server.properties &

# 启动集群方式2
[root@hadoop2 kafka]# bin/kafka-server-start.sh -daemon config/server.properties 
[root@hadoop2 kafka]# jps
9462 Kafka
9480 Jps
3419 QuorumPeerMain


# 关闭集群
[root@hadoop2 kafka]# bin/kafka-server-stop.sh stop

Kafka命令行操作

# 查看当前服务器中的所有topic
[root@hadoop2 kafka]# bin/kafka-topics.sh  --zookeeper hadoop2:2181 --list
# 创建topic
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181 \ --create --replication-factor 3 --partitions 1 --topic first
# 报错:
[root@hadoop2 kafka]# bin/kafka-topics.sh zookeeper hadoop4:2181 --list
Exception in thread "main" java.lang.IllegalArgumentException: Only one of --bootstrap-server or --zookeeper must be specified
    at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:702)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:52)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

## 注意: broker.id 要与对应的zookeepr的myid保持一致
主机名          host(ip)      zookeeper    myid
kafka-2        hadoop2        server.2      2 
kafka-3        hadoop3        server.3      3
kafka-4        hadoop4        server.4      4
  • 尝试正确的创建topic命令
# 在hadoop4机器上创建topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop4:2181 --replication-factor 3 --partitions 6 --topic test
Created topic test.
## 选项说明
--topic : 定义topic名
--partitions: 定义分区数
--replication-factor: 定义副本数


# 查看创建的topic(创建成功后hadoop2、hadoop3都可查看)
[root@hadoop4 kafka] bin/kafka-topics.sh --describe --zookeeper  hadoop4:2181 --topic test

>   Topic: test     PartitionCount: 6           ReplicationFactor: 3    Configs: 
>   Topic: test     Partition: 0    Leader: 2   Replicas: 2,3,4         Isr: 2
>   Topic: test     Partition: 1    Leader: 3   Replicas: 3,4,2         Isr: 3,4,2
>   Topic: test     Partition: 2    Leader: 4   Replicas: 4,2,3         Isr: 4,2,3
>   Topic: test     Partition: 3    Leader: 2   Replicas: 2,4,3         Isr: 2
>   Topic: test     Partition: 4    Leader: 3   Replicas: 3,2,4         Isr: 3,2,4
>   Topic: test     Partition: 5    Leader: 4   Replicas: 4,3,2         Isr: 4,3,2

# 删除topic  
[root@hadoop2 kafka]# bin/kafka-topics.sh --zookeeper hadoop4:2181  --delete --topic test

# 在hadoop2上启动消费者
[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  hadoop2:9092 --topic test

# 在hadoop4启动生产者
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list  hadoop2:9092 --topic test

调整准则:
一般来说,若是集群较小(小于6个brokers),则配置2 x broker数的partition数。在这里主要考虑的是之后的扩展。若是集群扩展了一倍(例如12个),则不用担心会有partition不足的现象发生
一般来说,若是集群较大(大于12个),则配置1 x broker 数的partition数。因为这里不需要再考虑集群的扩展情况,与broker数相同的partition数已经足够应付常规场景。若有必要,则再手动调整
考虑最高峰吞吐需要的并行consumer数,调整partition的数目。若是应用场景需要有20个(同一个consumer group中的)consumer并行消费,则据此设置为20个partition
考虑producer所需的吞吐,调整partition数目(如果producer的吞吐非常高,或是在接下来两年内都比较高,则增加partition的数目)

kafka工作流程分析

32402AB8-6275-42DC-96D2-026C327B8AD2.png
  • 写入方式:producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分 区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)

  • 分区: 消息发送时都被发送到一个 topic,其本质就是一个目录,而 topic 是由一些 Partition Logs(分区日志)组成,其组织结构如下图所示


    522069A3-BD57-465D-8AF5-91594CC5DA62.png

我们可以看到,每个 Partition 中的消息都是有序的,生产的消息被不断追加到 Partition log 上,其中的每一个消息都被赋予了一个唯一的 offset 值。

  • 分区的原因
    1>方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了
    2>可以提高并发,因为可以以 Partition 为单位读写了

  • 分区原则
    1>指定了 patition,则直接使用
    2>未指定 patition 但指定 key,通过对 key 的 value 进行 hash 出一个 patition
    3>patition 和 key 都未指定,使用轮询选出一个 patition

  • 副本
    同一个 partition 可能会有多个 replication(对应 server.properties 配置中的 default.replication.factor=N)。没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入 replication 之 后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 作为 follower 从 leader 中复制数据

写入流程

12A28A8B-BC04-4FF8-84AE-46A5CB6B653A.png

1>producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
2>producer 将消息发送给该 leader
3>leader 将消息写入本地 log
4>followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
5>leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK

保存消息

  • 存储方式
    物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配 置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文 件)

  • 存储策略
    无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
    1>基于时间:log.retention.hours=168
    2>基于大小:log.retention.bytes=1073741824
    需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,所
    以这里删除过期文件与提高 Kafka 性能无关

注意:
ACK的确认方式有:
0:  不需要确认, 速度块
1: 生产者需要Leader的确认
all: 生产者需要Leder、Follower的确认

小试牛刀

  • 创建一个Maven项目 , pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafkademo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients !-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
        </dependency>

        <!-- 日志包 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>

    </dependencies>

</project>
  • 在resources目录下创建一个log4j.properties文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  • 代码
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {
    public static void main(String[] args) {

        Properties properties = new Properties();

        // 配置信息
        // 1.kafka集群
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");

        // 2.应答机制
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");

        // 3.重试次数
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "0");

        // 4.批量大小
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");  // 16k

        // 提交延时
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");

        // 缓存
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");  // 32M

        // KV的序列化类
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (int i=10; i<20; i++){
            producer.send(new ProducerRecord<String, String>("test", Integer.valueOf(i).toString()), (recordMetadata, e) -> {
                    if (e == null) {
                        System.out.println(recordMetadata.partition() + "-----" + recordMetadata.offset());
                    } else {
                        System.out.println("发送失败");
                    }
            });
        }

        // 关闭资源
        producer.close();
    }
}

运行程序

  • 开启消费者
[root@hadoop4 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  hadoop4:9092   --topic  test
0
1
2
3
4
5
6
7
8
9

注意:如果topic有多(3)个分区,那么则是按照分区顺序输出的,有可能是输出 0 3 6 9 1 4 7 2 5 8, 但是每个分区内部是有序的

创建多分区多副本

[root@hadoop2 kafka]# bin/kafka-topics.sh --create --zookeeper  hadoop2:2181 --replication-factor 3 --partitions  3 --topic  secondTest
Created topic secondTest.

# 查看创建多分区多副本的topic
[root@hadoop4 kafka]# bin/kafka-topics.sh --describe --zookeeper  hadoop4:2181 --topic  secondTest
Topic: secondTest   PartitionCount: 3   ReplicationFactor: 3    Configs: 
Topic: secondTest   Partition: 0        Leader: 2 Replicas: 2,3,4   Isr: 2,3,4
Topic: secondTest   Partition: 1        Leader: 3 Replicas: 3,4,2   Isr: 3,4,2
Topic: secondTest   Partition: 2        Leader: 4 Replicas: 4,2,3   Isr: 4,2,3
[root@hadoop2 kafka]# 
  • 自定义分区
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;
import java.util.Random;

/**
  自定义分区
 */
public class CustomerPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//        return 0;
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        int partitionNum = partitionInfoList.size();
        Random random = new Random();
        return random.nextInt(partitionNum);  // [0 partitionNum) 中的一个随机数
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

# 生产者(Producer)代码 参考上例
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "CustomerPartitioner");

控制台输出:

分区=0-----偏移量=2
分区=0-----偏移量=3
分区=1-----偏移量=2
分区=1-----偏移量=3
分区=1-----偏移量=4
分区=1-----偏移量=5
分区=1-----偏移量=6
分区=1-----偏移量=7
分区=1-----偏移量=8
分区=2-----偏移量=6

kafka消费者输出:

[root@hadoop2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server  hadoop2:9092 --topic secondTest
1
5
0
2
3
6
7
8
9
4

由上可知, 1,5 在分区0中,0,2,3,6,7,8,9在分区1中, 4在分区2中 , 每个分区维护着一套自己的offset, 每个分区内部有序

消费者(Consumer)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // kafka集群
        properties.setProperty("bootstrap.servers", "hadoop2:9092");
        // 消费者组id
        properties.setProperty("group.id", "secondTest");
        // 设置自动提交offset
        properties.setProperty("enable.auto.commit", "true");
        // 延时提交
        properties.setProperty("auto.commit.interval.ms", "1000");
        // KV的反序列化
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // 指定topic
        // topic = "third" 这个topic是没有创建的,开启了log4j后会提示
        // WARN [org.apache.kafka.clients.NetworkClient] - Error while fetching metadata with correlation id 2 : {third=LEADER_NOT_AVAILABLE}"
        kafkaConsumer.subscribe(Arrays.asList("test", "secondTest","third"));
        // 如果想只消费某一个指定的topic
//        kafkaConsumer.subscribe(Collections.singletonList("test"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record: consumerRecords) {
                System.out.println(
                        "topic=" + record.topic() + "---" +
                        "patition=" + record.partition() + "---" +
                                "key=" + record.key() + "---" +
                                "value=" + record.value() + "---" +
                                "headers=" + record.headers().toString()
                ) ;
            }
        }
    }
}

# 生产测试数据
[root@hadoop2 kafka]# bin/kafka-console-producer.sh --broker-list  hadoop4:9092 --topic secondTest
>wudy  haha
>still water run deep
>

# console输出
topic=secondTest---patition=2---key=null---value=wudy  haha---headers=RecordHeaders(headers = [], isReadOnly = false)
topic=secondTest---patition=0---key=null---value=still water run deep---headers=RecordHeaders(headers = [], isReadOnly = false)
  • kafka如何重复消费?

1.使用低级API直接指定到哪个offset
2.设置auto.offset.reset=earliest, 并更换组名

properties.setProperty("group.id", "wudyGroup"); // 原来 == secondTest
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  1. kafka seek方法
# 可能存在的报错

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition secondTest-0
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
    at Consumer.main(Consumer.java:51)
# 解决方式1
// 指定消费某一个partition分区的数据
        kafkaConsumer.assign(Collections.singletonList(new TopicPartition("secondTest", 0)));
        // 指定获取某个topic的指定分区的指定offset的数据
        kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);
# 解决方式2
// 如果想只消费某一个指定的topi
 kafkaConsumer.subscribe(Collections.singletonList("secondTest"));
 kafkaConsumer.poll(0);
 // 指定获取某个topic的指定分区的指定offset的数
  kafkaConsumer.seek(new TopicPartition("secondTest", 0), 4);

拦截器

Producer拦截器(interceptor)主要用于实现clients端的定制化控制逻辑
对于Producer而言,interceptor使得用户在消息发送前以及producer回掉逻辑前有机会对消息做一些定制化需求,比如修改消息
同时Producer允许用户指定多个interceptor按序作用于同一消息从而形成一个拦截链(org.apache.kafka.clients.producer.ProducerInterceptor)

  • 1.configure(configs)
    获取配置信息和初始化数据时调用

  • 2.onSend(ProducerRecord)
    该方法封装进kafkaProducer.send方法中,即它运行在用户主线程中,Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区

  • 3.onAcknowledgement(RecordMetadata, Exception):
    该方法会在消息被应答或消息发送失败时调用,通常都是在producer回掉逻辑触发之前。onAcknowledgement运行在producer的IO线程中,不要在该方法中放很重的逻辑,否则会拖慢Producer的消息发送效率

  • 4.close
    关闭interceptor, 主要用于执行一些资源清理工作。
    interceptor可能被运行在多个线程中,因此在具体实现时需要用户自行确保线程安全,倘若指定了多个interceptor,则producer将按照指定顺序调用她们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志再向上传递

参考文章:https://mp.weixin.qq.com/s?__biz=Mzg2MjEwMjI1Mg==&mid=2247498512&idx=3&sn=d3f2fc7124fbfa556ecdfdd789417788&chksm=ce0e4c93f979c5859e0bb8bbebca80d389041894c4bbbf3c16af66040ab57c987afa6a03bb05&mpshare=1&scene=23&srcid=0813E6fZRaGThkrOfoStJoBs&sharer_sharetime=1597325081562&sharer_shareid=f770d25bc57f1c2f9159f85750f854dc%23rd

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容