Flink 使用之配置与调优

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

Flink读取配置文件的方式

参见:Flink 获取配置的途径

Flink常用参数配置

注意:Flink配置项名称随版本的升级变化较大。本文的配置项名称以Flink 1.19 1.20版本为准。如果读者使用其他版本的Flink,请先查询官网对应的配置项名称,以免使用错误的配置无法生效。

端口地址

jobmanger.rpc.address: JobManager的地址。Yarn模式会自动配置该选项。
jobmanager.rpc.port: JobManager的端口号。
jobmanager.bind-host: JobManager绑定的host。在Yarn模式下如果配置为localhost会被忽略,默认设置为0.0.0.0。
taskmanager.bing-host: 同上。针对TaskManager绑定的host。
rest.port Flink rest接口和 web ui的端口号。
rest.bind-address: rest接口绑定的地址,如果要支持多网访问,需要配置为0.0.0.0。
history.web.port 基于web的history server的端口号。
jobmanager.archive.fs.dir 将已完成的任务归档存储的目录。
historyserver.archive.fs.dir history server的归档目录。该配置必须包含jobmanager.archive.fs.dir配置的目录,以便history server能够读取到已完成的任务信息。

资源配置

JobManager内存配置:

jobmanager.memory.process.size: JobManager的进程总内存大小。

注意,内存的配置建议使用进程总内存大小的配置方式。Flink会自动调整内部各部分内存的大小。

jobmanager.memory.flink.size:JobManager中可供Flink使用的内存。包含head和off-heap内存。不包括JVM metaspace和JVM overhead占用的内存。
jobmanager.memory.heap.size:JobManager的堆内存大小。
jobmanager.memory.off-heap.size:off-heap占用的内存大小。
jobmanager.memory.jvm-metaspace.size:metaspace占用的内存大小。

TaskManager内存配置:

taskmanager.memory.process.size:TaskManager的进程总内存大小。
taskmanager.memory.flink.size:TaskManager的进程总内存大小中可供Flink使用的内存。包含head和off-heap内存。不包括JVM metaspace和JVM overhead占用的内存。
taskmanager.memory.framework.off-heap.batch-shuffle.size:批模式shuffle内存限制。
taskmanager.memory.framework.heap.size:Flink框架的heap内存。Task slot无法使用该部分内存。
taskmanager.memory.framework.off-heap.size:Flink框架使用的off-heap内存大小。Task slot无法使用该部分内存。
taskmanager.memory.jvm-metaspace.size:JVM metaspace内存大小。
taskmanager.memory.managed.size:Memory manager管理的内存大小。可供作业排序,哈希表,中间结果缓存和rocksdb状态后端使用。
taskmanager.memory.network.fraction:用于网络缓存部分的内存占比。
taskmanager.memory.process.size:用于网络缓存部分的内存大小。
taskmanager.memory.task.heap.size:作业可使用的heap内存大小。
taskmanager.memory.task.off-heap.size:作业可使用的off-heap内存大小。

taskmanager.numberOfTaskSlots: slot数量。在yarn模式使用的时候会受到yarn.scheduler.maximum-allocation-vcores值的影响。此处指定的slot数量如果超过yarn的maximum-allocation-vcores,flink启动会报错。在Yarn模式,flink启动的task manager个数可以参照如下计算公式:

num_of_tm = ceil(parallelism / slot)

即并行度除以slot个数,结果向上取整。

parallelism.default: 任务默认并行度,如果任务未指定并行度,将采用此设置。

Flink内存模型图

Flink内存模型.png

参见:
配置 Flink 进程的内存 | Apache Flink

状态后端

state.backend.type:状态后端的类型。可以使用hashmap或者rocksdb
execution.checkpointing.dir:保存检查点目录。需要配置为分布式文件系统,所有的集群节点都能够访问到。例如:hdfs:///flink-checkpoints
execution.checkpointing.savepoint-dir save point的目录。类似于execution.checkpointing.dir
execution.checkpointing.num-retained 保留最近检查点的数量。
execution.checkpointing.incremental: 是否开启增量checkpoint。开启可减少每次checkpoint写入的数据量,减少checkpoint的耗时。
execution.checkpointing.interval:checkpoint时间间隔。
execution.checkpointing.min-pause:两次相邻checkpoint的最小间隔时间。该配置项的目的是防止系统checkpoint运行时间过长,占据太多的时间比例。
execution.checkpointing.externalized-checkpoint-retention:checkpoint的保留机制。有如下三个配置值:

  • "DELETE_ON_CANCELLATION": Checkpoint state is only kept when the owning job fails. It is deleted if the job is cancelled.
  • "RETAIN_ON_CANCELLATION": Checkpoint state is kept when the owning job is cancelled or fails.
  • "NO_EXTERNALIZED_CHECKPOINTS": Externalized checkpoints are disabled.
    execution.checkpointing.max-concurrent-checkpoints:最多同时有多少个checkpoint同时进行。
    execution.checkpointing.mode:checkpoint模式。支持如下两种模式:
  • "EXACTLY_ONCE"
  • "AT_LEAST_ONCE"

execution.checkpointing.timeout:checkpoint的超时时间。
execution.checkpointing.tolerable-failed-checkpoints:容许连续失败的checkpoint操作次数。
execution.checkpointing.unaligned:是否启用unaligned checkpoint。

心跳

heartbeat.interval:心跳时间间隔。
heartbeat.timeout:心跳超时时间。

网络

taskmanager.network.netty.client.connectTimeoutSec:taskmanager的客户端连接超时的时间,默认为120s。
taskmanager.network.netty.sendReceiveBufferSize:netty的发送和接收的缓冲区大小。
taskmanager.network.netty.client.tcp.keepCount:连接保持数量。
taskmanager.network.netty.client.tcp.keepIdleSec:连接空间多少秒之后,开始发送keepalive probe。
taskmanager.network.netty.client.tcp.keepIntervalSec:keepalive probe发送的间隔时间。

Flink HA(Job Manager)的配置

Standalone模式使用Zookeeper协助HA模式的JobManager选主。

high-availability.type: zookeeper 使用zookeeper负责HA实现。
zookeeper.sasl.disable: true 是否使用SASL方式连接Zookeeper。默认启用。
high-availability.zookeeper.path.root: /flink flink信息在zookeeper存储节点的名称
high-availability.zookeeper.quorum: zk1,zk2,zk3 zookeeper集群节点的地址和端口
high-availability.storageDir: hdfs://nameservice/flink/ha/ JobManager元数据在文件系统储存的位置。

注意:需要确定集群使用的Flink版本和Zookeeper版本是否兼容。例如Flink1.15.x放弃了对Zookeeper 3.4的支持。

Yarn模式建议配置Yarn的Application Master重试次数。例如修改flink-conf.yaml,增加:

yarn.application-attempts: 10

注意:该配置值受到Yarn配置项yarn.resourcemanager.am.max-attempts的限制,实际允许的重试次数为min(yarn.application-attempts, yarn.resourcemanager.am.max-attempts)

详细参见:Flink HA 部署

Flink metrics 监控相关配置

下面例子中为Prometheus监控的相关配置。Flink定期将监控数据往push gateway。

metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS

Prometheus具体部署和使用方式参见:Flink 配置Prometheus监控

缓冲区超时配置

该配置项可以平衡吞吐量和延迟。设置值的解释如下:

  • -1:直到缓冲区填满才发送,最大化吞吐量
  • 0:一有数据就立刻发送,最小化延迟
  • 其他值:在配置时间后发送,如果在配置时间内缓冲区填满,则立刻发送

设置方法:

env.setBufferTimeout(xxx)

提交应用时参数配置

注意:per-job模式或者application模式可以为某个作业单独指定JM和TM的资源消耗。资源的消耗情况应该以能扛住高峰时段的数据处理压力为准。可提前对集群进行压测,记录极限情况的资源使用量。

JobManager内存

yarn-session:-jm 2048
yarn-cluster:-yjm 2048

TaskManager内存

yarn-session:-tm 2048
yarn-cluster:-ytm 2048

每个TaskManager 的slot个数

yarn-session:-s 8
yarn-cluster:-ys 8

通过系统变量方式配置

还可以在提交作业的时候使用-D参数配置。支持的参数如下:

-Dyarn.application.queue=test \ 指定yarn队列
-Djobmanager.memory.process.size=2048mb \ 指定JM的总进程大小
-Dtaskmanager.memory.process.size=2048mb \ 指定每个TM的总进程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TM的slot数

Kafka相关调优配置

linger.ms/batch.size 这两个配置项配合使用,可以在吞吐量和延迟中得到最佳的平衡点。batch.size是kafka producer发送数据的批量大小,当数据量达到batch size的时候,会将这批数据发送出去,避免了数据一条一条的发送,频繁建立和断开网络连接。但是如果数据量比较小,导致迟迟不能达到batch.size,为了保证延迟不会过大,kafka不能无限等待数据量达到batch.size的时候才发送。为了解决这个问题,引入了linger.ms配置项。当数据在缓存中的时间超过linger.ms时,无论缓存中数据是否达到批量大小,都会被强制发送出去。
ack 数据源是否需要kafka得到确认。all表示需要收到所有ISR节点的确认信息,1表示只需要收到kafka leader的确认信息,0表示不需要任何确认信息。该配置项需要对数据精准性和延迟吞吐量做出权衡。

Kafka topic分区数和Flink并行度的关系

Flink Kafka source的并行度需要和kafka topic的分区数一致。最大化利用kafka多分区topic的并行读取能力。由于一个Kafka分区只能被一个消费者消费,因此一定要确保Flink Kafka source的并行度不要大于Kafka分区数,否则有些计算资源会空闲。如果并行度和分区数相同配置后,消费数据的速度仍然跟不上生产数据的速度,需要加大Kafka的分区数。

同理,如果Sink端也是Kafka,sink的并行度尽量和Kafka分区数一致。

Yarn相关调优配置

yarn.scheduler.maximum-allocation-vcores
yarn.scheduler.minimum-allocation-vcores

Flink单个task manager的slot数量必须介于这两个值之间。

yarn.scheduler.maximum-allocation-mb
yarn.scheduler.minimum-allocation-mb

Flink的job manager 和task manager内存不得超过container最大分配内存大小。

yarn.nodemanager.resource.cpu-vcores yarn的虚拟CPU内核数,建议设置为物理CPU核心数的2-3倍,如果设置过少,会导致CPU资源无法被充分利用,跑任务的时候CPU占用率不高。

更多配置参见:Flink 使用之 Yarn 资源问题排查

数据倾斜问题

几个解决套路:

  • 尽量使用大的并行度
  • 避免使用Map作为分区字段数据类型,同样避免使用String
  • 避免使用windowAll等无法并发操作的算子
  • 使用合适的分区方法

分区方法有如下几种:

  • shuffle:随机分区
  • rebalance:轮询
  • rescale:每个源的数据分散给下游,类似于一对多,这点和rebalance不同
  • broadcast:广播
  • 自定义分区,一个例子如下所示:
stream.partitionCustom(new Partitioner[String] {
  override def partition(key: String, numPartitions: Int): Int = {
    key.hashCode % numPartitions
  }
}, s => s.charAt(0).toString)

这个例子取首字母作为key,使用key的hashCode和分区数取余后的值作为分区编号。

并行度配置

并行度一般配置为CPU核心数的2-3倍。

并行度配置的几个层次如下所示。从上到下作用范围依次增大,但是上面的配置可以覆盖下面的配置。

  • 算子层次。算子的setParallelism方法
  • 执行环境层次。envsetParallelism方法
  • 客户端层次。提交任务时候的-p参数
  • 系统层次。flink-conf.yaml配置文件,parallelism.default配置项。

JVM参数配置

以增加GC日志为例,修改"conf/flink-conf.yaml配置文件的env.java.opts.all参数,增加:

-Xloggc:<LOG_DIR>/gc_log.log
-XX:+PrintGCDetails 
-XX:-OmitStackTraceInFastThrow 
-XX:+PrintGCTimeStamps 
-XX:+PrintGCDateStamps 
-XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=10 
-XX:GCLogFileSize=50M

其中<LOG_DIR>变量的值是JobManager或者TaskManager的日志路径。如果是on Yarn 模式运行,该变量指向的是container的log目录。

除了使用env.java.opts.all对Flink所有的进程JVM参数统一配置外,还可以使用如下参数单独配置Flink中的角色:

  • env.java.opts.client: 配置Flink Client
  • env.java.opts.historyserver: 配置History server
  • env.java.opts.jobmanager: 配置Job manager
  • env.java.opts.taskmanager: 配置Task manager

参考链接:配置参数 | Apache Flink

Yarn模式TaskManager驻留时间

Flink Yarn模式的TaskManager生命周期时随着任务的。有时候为了调试,发现任务停止(或者异常终止)后没多久TaskManager就被自动销毁。来不及去页面观察问题和阅读日志等。为了解决这个问题,我们可以修改TaskManager的驻留时间。可修改如下配置:

  • resourcemanager.taskmanager-timeout: 默认为30000

参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#resourcemanager-taskmanager-timeout

冗余的TaskManager数量

前面章节提到TaskManager数量时根据如下公式计算的:

num_of_tm = ceil(parallelism / slot)

这个公式计算出的TaskManager是刚好够用的。如果任何一个TaskManager出现故障退出,Flink需要重新启动一个新的TaskManager代替。

我们可以配置冗余的TaskManager数量。意思是除了上面公式计算出的最小TaskManager数量外,Flink还会额外启动一些TaskManager。这样当TaskManager出现故障之时,不用等待新的TaskManager启动,冗余的TaskManager会立刻取代故障的TaskManager开始工作,缩短了故障恢复的等待时间。配置方法如下:

  • slotmanager.redundant-taskmanager-num: 冗余的task manager数量。

Checkpoint

Checkpoint周期性进行。如果checkpoint操作耗时比checkpoint间隔时间还长,在上一个checkpoint未完成的时候,即便到了下一个checkpoint触发时间,新的checkpoint操作不会立即开始。只有在前一个checkpoint完成之后下一个checkpoint才能开始。这种情况下checkpoint会连续进行,严重影响系统性能。

为了避免这种情况,可以指定checkpoint之间的最小时间间隔。方法如下:

StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

注意:可以在应用中通过配置CheckpointConfig,可以允许多个checkpoint过程同步执行。

除此之外,checkpoint还支持更多的配置:

// 开启Checkpoint,设置间隔时间
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
// 启用EXACTLY_ONCE模式,使用Unaligned Checkpoint,保证数据精准一次投送,但会略微增大延迟
// 启用AT_LEAST_ONCE模式,barrier不会对齐,投送数据可能会重复,但是延迟很低
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔时间,上面已介绍过
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(10))
// 超时时间。如果超过这个时间checkpoint操作仍未完成,checkpoint会被废弃
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 作业取消后checkpoint仍然保留(需要人工清理)
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

使用RocksDB backend

RocksDB支持增量checkpoint,相比全量checkpoint而言耗时更短。

启用增量checkpoint方法,在flink-conf.yaml中增加:

execution.checkpointing.incremental: true

也可以在代码中设置:

Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
env.configure(config);

RocksDB调优参见: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

调整SSTable的block和block cache

state.backend.rocksdb.block.blocksize
state.backend.rocksdb.block.cache-size

经过实践这两个参数值对checkpoint性能影响较大。

使用全局参数

可以通过全局参数的方式,将参数从JobManager传递给各个TaskManager。

在JobManager中注册全局参数(ParameterTool是可序列化的):

env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));

TaskManager中,通过Rich函数中使用如下方式获取全局参数:

ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

使用分布式缓存

上一节介绍了如何为各个TaskManager传递参数。这里的分布式缓存则用于向各个TaskManager分发文件。

注意:AsyncFunction不支持分布式缓存,直接使用会抛出异常。

首先,需要在JobManager注册需要分发的文件。注册的文件由JobManager发送给各个TaskManager,保存在TM运行环境的临时目录中。

val env = ExecutionEnvironment.getExecutionEnvironment

env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")

然后TaskManager使用的时候,通过Rich函数拉取文件:

getRuntimeContext.getDistributedCache.getFile("b.txt")

反压分析

性能瓶颈测试方法

测试反压可以快速的定位流处理系统的性能瓶颈所在。先在Kafka中积攒一批数据,然后在使用Flink消费,就好比水库泄洪,很容易找到下游性能薄弱的环节。

反压的可能原因

反压的原因可能会有:

  • 短时间的负载高峰,超过流处理系统极限
  • 下游sink负载变大,数据无法及时输出
  • GC压力过大,停顿时间太长
  • 某个算子作业过于复杂,执行耗时较长
  • 集群网络波动,上游传递给下游的网络通道受阻

定位方式

定位之前禁用掉OperatorChain,这样原本chain到一起的多个算子会分开,方便我们更精细的定位性能瓶颈。

  • 看页面:查看Flink Web UI中对应算子的Back Pressure页面,如果各个SubTask显示的结果为High,说明该算子存在反压情况。
  • 看监控:查看算子的inPoolUsage监控项。如果数值过高,说明存在反压。

找到反压算子之后,我们可以使用Flame Graph火焰图,来分析每个方法调用的耗时,从而找到耗时较长的方法。

开启火焰图的方法:

flink-conf.yaml中配置。

参数 默认值 含义
rest.flamegraph.enabled false 是否开启火焰图
rest.flamegraph.cleanup-interval 10min 统计信息的缓存清除时间
rest.flamegraph.delay-between-samples 50 ms 构建 FlameGraph 的单个堆栈跟踪样本之间的延迟
rest.flamegraph.num-samples 100 构建flamegraph的采样数
rest.flamegraph.refresh-interval 1 min 火焰图刷新的时间间隔
rest.flamegraph.stack-depth 100 创建FlameGraphs 的堆栈跟踪的最大深度

一些通用的方法:

  • 优化反压算子的业务逻辑代码
  • 调用外部系统使用AsyncFunction
  • 增大TM的内存资源
  • 增大反压算子的并行度
  • 减少反压上游算子的并行度

分区空闲检测

Barrier对齐环节(一个算子有多个input)需要收集齐各个input的watermark才放行数据,如果某一个input的数据量很少,导致该input迟迟收不到watermark,则整个数据链路会被阻塞。

为了解决这个问题,Flink提供了分区空闲检测功能,如果某个input在一段时间内没有收到数据,会被标记为空闲。在barrier对齐环节,这个input上面的watermark会被忽略。

配置分区空闲判定时间的方法如下:

SourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMinutes(2))
                        .withIdleness(Duration.ofMinutes(5))
);

作业提交方式

参考链接:YARN | Apache Flink

Flink作业提交的方式分为application模式,per-job模式和session模式。

per-job模式

在Yarn创建一个Flink集群,然后在提交任务客户端所在机器本地运行作业jar的main方法,提交生成的JobGraph到Flink集群的JobManager。如果附带--detached参数,提交的作业被accept的时候,客户端会停止运行(命令行不用一直开着,生产环境必须。开发测试时可不带--detached参数,通过命令行查看运行日志)。

flink run -t yarn-per-job --detached /path/to/job.jar

session模式

首先启动Flink Yarn Session,它是一个常驻与Yarn的Flink集群。启动成功后,无论是否有作业执行,或者作业是否执行完毕,该session始终保持运行。启动yarn session的方法如下:

export HADOOP_CLASSPATH=`hadoop classpath`
./bin/yarn-session.sh --detached

yarn-session支持的相关参数解释:

  • -d/--detached: Detach模式,session启动成功后client停止运行。不用保持控制台一直开启。
  • -nm: Application名称
  • -jm: Job Manager 容器的内存
  • -tm: Task Manager 容器的内存
  • -t: 传送文件至集群,使用相对路径。程序中读取文件仍使用相对路径
  • -qu: 指定使用的Yarn队列

提交作业到Yarn session:

flink run -t yarn-session \
  -Dyarn.application.id=application_XXXX_YY \
  /path/to/job.jar

停止Flink Yarn session可以通过Yarn UI的kill按钮。当然也可以通过如下方式:

echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX

注意:一个Flink Yarn Session可以同时跑多个Flink作业。

application模式

和per-job模式类似,提交一次任务会在Yarn运行一个Flink集群。不同之处为作业jar包的main方法在Yarn集群的JobManager上运行,而不是提交作业的client端运行。作业执行完毕后,Flink on yarn集群会被关闭。

flink run-application -t yarn-application /path/to/job.jar

application模式的好处是Flink yarn集群可以直接从HDFS上查找并下载作业jar以及所需依赖,避免了从client机器上传。

flink run-application -t yarn-application \
    -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
    hdfs://myhdfs/jars/my-application.jar

其中yarn.provided.lib.dirs为Flink作业所需依赖包的地址。

实际生产中推荐使用此模式。每个作业都使用单独的Flink集群,它们消耗的资源是互相隔离的,一个作业的崩溃不会影响到其他作业。

Flink on Yarn 模式配置JDK

Flink 1.10版本以上支持使用JDK 11。从Flink 1.15开始逐渐放弃JDK 8的支持。因JDK版本造成的问题不再提供修复方案,建议升级JDK。

Flink以standalone模式运行的话配置JDK 11以上版本较为简单,为Flink配置java home可满足要求。然而问题在于使用on Yarn模式。Yarn稳定起见本身在JDK 8环境运行。如何让JDK 8版本的Yarn启动JDK 11以上版本的Flink呢?

我们可以使用如下两个参数:

  • containerized.master.env.: 指定on Yarn模式下Flink JobManager的环境变量。
  • containerized.taskmanager.env.: 指定on Yarn模式下Flink TaskManager的环境变量。

我们可以通过上述方式,为JobManager和TaskManager指定JAVA_HOME环境变量。例如JDK 11位于/opt/jdk-11.0.22+7(需要每个Yarn NodeManager节点上面都安装的有),修改flink-conf.yaml文件,增加:

containerized.master.env.JAVA_HOME: /opt/jdk-11.0.22+7
containerized.taskmanager.env.JAVA_HOME: /opt/jdk-11.0.22+7

最后重启Yarn session,或者提交新任务,即可生效。

参考文献:配置参数 | Apache Flink

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345