kafka 生产调优-kafka常用配置参数

1、硬件选择

1.1  场景预估

100万日活,每人每天100条日志,每天总共的日志条数是100万 * 100条 = 1亿条。

1亿/24小时/60分/60秒 = 1150条/每秒钟。

每条日志大小:0.5k - 2k(取1k)。

1150条/每秒钟 * 1k ≈ 1m/s 。

高峰期每秒钟:1150条 * 20倍 = 23000条。

每秒多少数据量:20MB/s。

1.2 服务器台数选择

服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1

            = 2 * (20m/s * 2 / 100) + 1

        = 3台

建议3台服务器。

1.3 硬盘选择

kafka底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。

建议选择普通的机械硬盘。

每天总数据量:1亿条 * 1k ≈ 100g

100g * 副本2 * 保存时间3天 / 0.7 ≈ 1T

建议三台服务器硬盘总大小,大于等于1T。

1.4 内存选择

Kafka内存组成:堆内存 + 页缓存

1.4.1 堆缓存

Kafka堆内存建议每个节点:10g ~ 15g

在kafka-server-start.sh中修改

if["x$KAFKA_HEAP_OPTS"="x"];then

exportKAFKA_HEAP_OPTS="-Xmx10G -Xms10G"

fi

查看Kafka进程号

jps

2321 Kafka

根据Kafka进程号,查看Kafka的GC情况

$ jstat-gc23211s10

S0C    S1C    S0U    S1U      EC       EU        OC         OU       MC     MU    CCSC   CCSU   YGC     YGCT    FGC    FGCT     GCT  

0.07168.00.07168.0103424.060416.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.060416.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.060416.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.060416.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.060416.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.061440.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.061440.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.061440.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.061440.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

0.07168.00.07168.0103424.061440.01986560.0148433.552092.046656.16780.06202.2130.53100.0000.531

参数说明:

S0C:第一个幸存区的大小;      S1C:第二个幸存区的大小

S0U:第一个幸存区的使用大小;  S1U:第二个幸存区的使用大小

EC:伊甸园区的大小;           EU:伊甸园区的使用大小

OC:老年代大小;               OU:老年代使用大小

MC:方法区大小;               MU:方法区使用大小

CCSC:压缩类空间大小;           CCSU:压缩类空间使用大小

YGC:年轻代垃圾回收次数;      YGCT:年轻代垃圾回收消耗时间

FGC:老年代垃圾回收次数;       FGCT:老年代垃圾回收消耗时间

GCT:垃圾回收消耗总时间

据Kafka进程号,查看Kafka的堆内存

jmap-heap2321

Attaching to process ID2321, please wait...

Debugger attached successfully.

Server compiler detected.

JVM version is25.212-b10

using thread-local object allocation.

Garbage-First (G1) GC with8thread(s)

Heap Configuration:

MinHeapFreeRatio=40

MaxHeapFreeRatio=70

MaxHeapSize=2147483648(2048.0MB)

NewSize=1363144(1.2999954223632812MB)

MaxNewSize=1287651328(1228.0MB)

OldSize=5452592(5.1999969482421875MB)

NewRatio=2

SurvivorRatio=8

MetaspaceSize=21807104(20.796875MB)

CompressedClassSpaceSize=1073741824(1024.0MB)

MaxMetaspaceSize=17592186044415MB

G1HeapRegionSize=1048576(1.0MB)

Heap Usage:

G1 Heap:

regions=2048

capacity=2147483648(2048.0MB)

used=246367744(234.95458984375MB)

free=1901115904(1813.04541015625MB)

11.472392082214355% used

G1 Young Generation:

Eden Space:

regions=83

capacity=105906176(101.0MB)

used=87031808(83.0MB)

free=18874368(18.0MB)

82.17821782178218% used

Survivor Space:

regions=7

capacity=7340032(7.0MB)

used=7340032(7.0MB)

free=0(0.0MB)

100.0% used

G1 Old Generation:

regions=147

capacity=2034237440(1940.0MB)

used=151995904(144.95458984375MB)

free=1882241536(1795.04541015625MB)

7.471886074420103% used

13364interned Strings occupying1449608bytes.

1.4.2 页缓存

页缓存是Linux系统服务器的内存。我们只需要保证1个segment(1g)中25%的数据在内存中就好。

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如10个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g

建议服务器内存大于等于11G。

1.5 CPU

num.io.threads = 8  负责写磁盘的线程数,整个参数值要占总核数的50%。

num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的50%的1/3。

num.network.threads = 3  数据传输线程数,这个参数占总核数的50%的2/3。

建议32个cpu core。

1.6 网络

num.io.threads = 8  负责写磁盘的线程数,整个参数值要占总核数的50%。

num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的50%的1/3。

num.network.threads = 3  数据传输线程数,这个参数占总核数的50%的2/3。

建议32个cpu core。

2、生产者

3.1.1 Updating Broker Configs

From Kafka version 1.1 onwards, some of the broker configs can be updated without restarting the broker. See the Dynamic Update Mode column in Broker Configs for the update mode of each broker config.

read-only: Requires a broker restart for update

per-broker: May be updated dynamically for each broker

cluster-wide: May be updated dynamically as a cluster-wide default.

May also be updated as a per-broker value for testing.

2.1常用参数

参数名称描述

bootstrap.servers生产者连接集群所需的broker地址清单。例如node1:9092,node2:9092,node3:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer指定发送消息的key和value的序列化类型。一定要写全类名。

buffer.memoryRecordAccumulator缓冲区总大小,默认32m。

batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。

acks0:生产者发送过来的数据,不需要等数据落盘应答。  1:生产者发送过来的数据,Leader收到数据后应答。  -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是  1-5的数字。

retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。  如果设置了重试,还想保证消息的有序性,需要设置  MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms两次重试之间的时间间隔,默认是100ms。

enable.idempotence是否开启幂等性,默认true,开启幂等性。

compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。    支持压缩类型:none、gzip、snappy、lz4和zstd。

2.2 生产者提高吞吐量

参数名称描述

buffer.memoryRecordAccumulator缓冲区总大小,默认32m。

batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。

compression.type生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。    支持压缩类型:none、gzip、snappy、lz4和zstd。

2.3 可靠性参数配置

参数名称描述

acks0:生产者发送过来的数据,不需要等数据落盘应答。  1:生产者发送过来的数据,Leader收到数据后应答。  -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.4 生产数据去重

参数名称描述

enable.idempotence是否开启幂等性,默认true,表示开启幂等性。

Kafka的事务一共有如下5个API

// 1初始化事务

void initTransactions();

// 2开启事务

void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务

void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)

void abortTransaction() throws ProducerFencedException;

2.5 数据有序

单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序

2.6 数据乱序

参数名称描述

enable.idempotence是否开启幂等性,默认true,表示开启幂等性。

max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是  1-5的数字。  、

3、Kafka Broker

核心参数

参数名称描述

replica.lag.time.max.msISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s。

auto.leader.rebalance.enable默认是true。 自动Leader Partition 平衡。建议关闭。

leader.imbalance.per.broker.percentage默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。

leader.imbalance.check.interval.seconds默认值300秒。检查leader负载是否平衡的间隔时间。

log.segment.bytesKafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。

log.index.interval.bytes默认4kb,kafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。

log.retention.hoursKafka中数据保存的时间,默认7天。

log.retention.minutesKafka中数据保存的时间,分钟级别,默认关闭。

log.retention.msKafka中数据保存的时间,毫秒级别,默认关闭。

log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是5分钟。

log.retention.bytes默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。

log.cleanup.policy默认是delete,表示所有数据启用删除策略;  如果设置值为compact,表示所有数据启用压缩策略。

num.io.threads默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%。

num.replica.fetchers默认是1。副本拉取线程数,这个参数占总核数的50%的1/3

num.network.threads默认是3。数据传输线程数,这个参数占总核数的50%的2/3 。

log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

log.flush.interval.ms每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

还有服役新节点,退役旧节点,增加分区,增加副本因子,手动调整分区副本存储,leader partition负载均衡,参考之前kafka文章

[kafka入门到精通] https://blog.csdn.net/weixin_38598961/article/details/130111837?spm=1001.2014.3001.5502 kafka入门到精通

3.1 自动创建主题

如果broker端配置参数auto.create.topics.enable设置为true(默认值是true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。生产环境建议将该参数设置为false。

4、消费者

4.1 常用参数

参数名称描述

bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。

key.deserializer和value.deserializer指定接收消息的key和value的反序列化类型。一定要写全类名。

group.id标记消费者所属的消费者组。

enable.auto.commit默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。  latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。不建议修改。

heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。  该条目的值必须小于session.timeout.ms ,也不应该高于 session.timeout.ms的1/3。不建议修改。

session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

fetch.min.bytes默认1个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条。

4.2 再均衡常用参数

参数名称描述

heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。  该条目的值必须小于 session.timeout.ms,也不应该高于 session.timeout.ms 的1/3。

session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

partition.assignment.strategy消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

4.3 制定offset消费数据

kafkaConsumer.seek(topic, 1000);

4.4 制定时间消费

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);

kafkaConsumer.offsetsForTimes(timestampToSearch);

4.5 消费者提高吞吐量-常用参数

参数名称描述

fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records一次poll拉取数据返回消息的最大条数,默认是500条

5、kafka总体

5.1 如何提高吞吐量

5.1.1 提高生产者吞吐量

buffer.memory:发送消息的缓冲区大小,默认值是32m,可以增加到64m。

batch.size:默认是16k。如果batch设置太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

linger.ms,这个值默认是0,意思就是消息必须立即被发送。一般设置一个5-100毫秒。如果linger.ms设置的太小,会导致频繁网络请求,吞吐量下降;如果linger.ms太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的CPU开销。

5.1.2 增加分区

5.1.3消费者提高吞吐量

调整fetch.max.bytes大小,默认是50m。

调整fetch.max.bytes大小,默认是50m。

5.1.4 增加下游消费者处理能力

5.2 数据精准一次

5.2.1 生产者角度

acks设置为-1 (acks=-1)

幂等性(enable.idempotence = true) + 事务 。

5.2.2  Broker角度

分区副本大于等于2 (--replication-factor 2)

ISR里应答的最小副本数量大于等于2 (min.insync.replicas = 2)。

5.2.3 消费者角度

事务 + 手动提交offset (enable.auto.commit = false)。

消费者输出的目的地必须支持事务(MySQL、Kafka)

5.3 合理设置分区数

创建一个只有1个分区的topic。

测试这个topic的producer吞吐量和consumer吞吐量。

假设他们的值分别是Tp和Tc,单位可以是MB/s。

然后假设总的目标吞吐量是Tt,那么分区数 = Tt / min(Tp,Tc)。

例如:producer吞吐量 = 20m/s;consumer吞吐量 = 50m/s,期望吞吐量100m/s;

分区数 = 100 / 20 = 5分区

分区数一般设置为:3-10个

分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数。

5.4 单挑日志大于1M

参数名称描述

message.max.bytes默认1m,broker端接收每个批次消息最大值。

max.request.size默认1m,生产者发往broker每个请求消息最大值。针对topic级别设置消息体的大小。

replica.fetch.max.bytes默认1m,副本同步数据,每个批次消息最大值。

fetch.max.bytes默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

5.5 服务器挂了

在生产环境中,如果某个Kafka节点挂掉。

正常处理办法:

先尝试重新启动一下,如果能启动正常,那直接解决。

如果重启不行,考虑增加内存、增加CPU、网络带宽。

如果将kafka整个节点误删除,如果副本数大于等于2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。

5.6 压力测试

使用kafka官方提供的脚本

生产者压测:kafka-producer-perf-test.sh

消生产者压测:kafka-producer-perf-test.sh

5.6.1 Kafka Producer 压力测试

创建一个test topic,设置为3个分区3个副本

bin/kafka-topics.sh --bootstrap-server node1:9092 --create --replication-factor 3 --partitions 3 --topic test

在kafka的bin目录下执行测试脚本

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=16384 linger.ms=0

参数说明:

record-size是一条信息有多大,单位是字节,本次测试设置为1k。

num-records是总共发送多少条信息,本次测试设置为100万条。

throughput 是每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据,可测出生产者最大吞吐量。本次实验设置为每秒钟1万条。

producer-props 后面可以配置生产者相关参数,batch.size配置为16k

调整batch.size大小

batch.size默认值是16k。本次实验batch.size设置为32k。

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=32768 linger.ms=0

batch.size默认值是16k。本次实验batch.size设置为4k

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=0

调整linger.ms时间

linger.ms默认是0ms。本次实验linger.ms设置为50ms。

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=50

调整压缩方式

默认的压缩方式是none。本次实验compression.type设置为snappy。

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=50 compression.type=snappy

默认的压缩方式是none。本次实验compression.type设置为zstd

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=50 compression.type=zstd

默认的压缩方式是none。本次实验compression.type设置为gzip

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=50 compression.type=gzip

默认的压缩方式是none。本次实验compression.type设置为lz4

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=50 compression.type=lz4

默认生产者端缓存大小32m。本次实验buffer.memory设置为64m。

bin/kafka-producer-perf-test.sh  --topic test --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 batch.size=4096 linger.ms=50 buffer.memory=67108864

测试结果

测试:

1、batch.size=16384 linger.ms=0      9.76 MB/sec

2、batch.size=32768 linger.ms=0    9.76 MB/sec

3、batch.size=4096 linger.ms=0      3.81 MB/sec

4、batch.size=4096 linger.ms=50   3.83 MB/sec

5、batch.size=4096 linger.ms=50 compression.type=snappy  3.77 MB/sec

6、batch.size=4096 linger.ms=50 compression.type=zstd    5.68 MB/sec

7、batch.size=4096 linger.ms=50 compression.type=gzip      5.90 MB/sec

8、batch.size=4096 linger.ms=50 compression.type=lz4      3.72 MB/sec

9、batch.size=4096 linger.ms=50 buffer.memory=67108864  3.76 MB/sec

5.6.2 kafka Consumer 压力测试

修改/opt/soft/kafka/config/consumer.properties文件中的一次拉取条数为500

max.poll.records=500

消费100万条日志进行压测

#--messages 总共要消费的消息个数。本次实验100万条。

bin/kafka-consumer-perf-test.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic test  --messages 1000000  --consumer.config config/consumer.properties

一次拉取条数为2000

max.poll.records=2000

再次执行2的命令

调整fetch.max.bytes大小为100m fetch.max.bytes=104857600 再次执行2命令

测试结果

消费者  一次处理500条  81.2066m/s

消费者  一次处理2000条  138.0992m/s

消费者  一次处理2000条  fetch.max.bytes=104857600  145.2033m/s

###

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容