Flink-1.10 源码笔记 checkpint -- 3

Flink 源码笔记 --- checkpoint

接上篇内容

Flink-1.10 源码笔记 checkpint - 2

Snapshot确认消息发送

在AsyncCheckpointRunnable.run()中调用了reportCompletedSnapshotStates方法负责报告 snapshot,在这里taskEnvironment调用了getTaskStateManager 获取TaskStateManager(任务状态管理器),TaskStateManager是TaskStateManagerImpl类型,后调用reportTaskStateSnapshots进行上报状态

private void reportCompletedSnapshotStates(
        TaskStateSnapshot acknowledgedTaskStateSnapshot,
        TaskStateSnapshot localTaskStateSnapshot,
        long asyncDurationMillis) {  //快照持续时间

        boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
        boolean hasLocalState = localTaskStateSnapshot.hasState();

        Preconditions.checkState(hasAckState || !hasLocalState,
            "Found cached state but no corresponding primary state is reported to the job " +
                "manager. This indicates a problem.");

        // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
        // to stateless tasks on restore. This enables simple job modifications that only concern
        // stateless without the need to assign them uids to match their (always empty) states.
        //todo 上报任务  快照状态
        // 当任务上触发检查点或保存点时,它将为其拥有的所有流操作符实例创建快照。然后,通过此接口报告来自任务的所有操作符快照。
        // 典型的实现将把报告的状态信息分派并转发给相关方,比如检查点协调器或本地状态存储。
        taskEnvironment.getTaskStateManager().reportTaskStateSnapshots( //改方法调用的 TaskStateManagerImpl实现了的方法
            checkpointMetaData,
            checkpointMetrics,
            hasAckState ? acknowledgedTaskStateSnapshot : null,
            hasLocalState ? localTaskStateSnapshot : null);

        LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
            taskName, checkpointMetaData.getCheckpointId(), asyncDurationMillis);

        LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.",
            taskName, checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
    }

进入TaskStateManagerImpl类,该类实现了TaskStateManager接口

public class TaskStateManagerImpl implements TaskStateManager

现在查看reportTaskStateSnapshots方法,在该方法中,会先将快照状态保存在本地,然后调用checkpointResponder的acknowledgeCheckpoint方法,发送消息

checkpointResponder是RpcCheckpointResponder类型

    /**
     * @param checkpointMetaData 检查点请求的元数据。
     * @param checkpointMetrics  检查点的任务级别度量。   --指标监控
     * @param acknowledgedState  报告的状态表示向作业管理器应答。
     * @param localState         报告的状态为本地恢复。
     */ 
    @Override
    public void reportTaskStateSnapshots(
        @Nonnull CheckpointMetaData checkpointMetaData,
        @Nonnull CheckpointMetrics checkpointMetrics,
        @Nullable TaskStateSnapshot acknowledgedState,
        @Nullable TaskStateSnapshot localState) {

        long checkpointId = checkpointMetaData.getCheckpointId();

        //保存本地快照状态
        localStateStore.storeLocalState(checkpointId, localState);

        //发送快照成功消息
        checkpointResponder.acknowledgeCheckpoint( //调用RpcCheckpointResponder类中的方法
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
    }

进入到acknowledgeCheckpoint方法中主要调用了checkpointCoordinatorGateway的acknowledgeCheckpoint方法,该方法调用的是JobMaster的acknowledgeCheckpoint方法

    @Override
    public void acknowledgeCheckpoint(
            JobID jobID,
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            CheckpointMetrics checkpointMetrics,
            TaskStateSnapshot subtaskState) {
        //调用了同名方法
        // JobMaster  --> JobMasterGateway  --> JobMasterOperatorEventGateway   以此实现   JobMasterOperatorEventGateway为父接口
        // jobMaster 是 任务管理器负责执行单个任务的jobGraph
        // RpcCheckpointResponder 本对象 在jobManager选举成功的时候,建立和jobManager的联系时候创建的
        checkpointCoordinatorGateway.acknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            subtaskState);
    }

JobMaster继承了FencedRpcEndpoint,实现了JobMasterGateway和JobMasterService接口,

JobMasterGateway接口实现了JobMasterOperatorEventGateway接口

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService

进入checkpointCoordinatorGateway.acknowledgeCheckpoint方法

    /**
     * @param jobID                     正在运行的作业的作业ID
     * @param executionAttemptID        正在运行的任务的执行尝试ID
     * @param checkpointId              此检查点的元数据
     * @param checkpointMetrics         这个检查点的度量
     * @param checkpointState           检查点的状态句柄
     */
    @Override
    public void acknowledgeCheckpoint(
            final JobID jobID,
            final ExecutionAttemptID executionAttemptID,
            final long checkpointId,
            final CheckpointMetrics checkpointMetrics,
            final TaskStateSnapshot checkpointState) {
        //todo schedulerNG 该对象, 负责调度Flink作业
        //调用schedulerBase实现类的方法
        schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
    }

进入schedulerNG的acknowledgeCheckpoint方法

在该方法中, 会获取checkpointCoordinator(checkpoint协调 器),AcknowledgeCheckpoint对象,然后获取获取taskManager位置,最后调用checkpointCoordinator的receiveAcknowledgeMessage方法

@Override
    public void acknowledgeCheckpoint(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long checkpointId, final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) {
        // 确保在主线程中运行
        mainThreadExecutor.assertRunningInMainThread();

        //从执行图中 获取checkpoint协调器
        final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        /*
        该对象用于
        此消息从{@link org.apache.flink.runtime.taskexecutor}发送到{@link org.apache.flink.runtime.jobmaster},以指示单个任务的检查点已经完成。
         */
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            checkpointState);

        //获取taskManager位置
        final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

        if (checkpointCoordinator != null) {
            ioExecutor.execute(() -> {
                try {
                    //该方法 接收确认AcknowledgeCheckpoint消息,并返回该消息是否与某个挂起的检查点相关联。
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
                } catch (Throwable t) {
                    log.warn("Error while processing checkpoint acknowledgement message", t);
                }
            });
        } else {
            String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
            if (executionGraph.getState() == JobStatus.RUNNING) {
                log.error(errorMessage, jobGraph.getJobID());
            } else {
                log.debug(errorMessage, jobGraph.getJobID());
            }
        }
    }

现在进入checkpointCoordinator的receiveAcknowledgeMessage方法

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
        if (shutdown || message == null) {
            return false;
        }

        if (!job.equals(message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", job, taskManagerLocationInfo, message);
            return false;
        }

        //获取 检查点id
        final long checkpointId = message.getCheckpointId();

        //对此处代码添加线程锁, 同一时刻为单线程跑
        synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            // 确保没有关闭
            if (shutdown) {
                return false;
            }

            //获取正在进行的checkpoint
            final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

            if (checkpoint != null && !checkpoint.isDiscarded()) {
                //acknowledgeTask  使用给定的执行尝试id和给定的子任务状态来识别任务
                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS: //成功
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",
                            checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                        //确认成功,并接收到了所有operator快照成功的确认
                        if (checkpoint.areTasksFullyAcknowledged()) {
                            //调用改方法
                            completePendingCheckpoint(checkpoint);
                        }
                        break;
                    case DUPLICATE: // 复制,副本
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                        break;
                    case UNKNOWN: //未知的
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                                "because the task's execution attempt id was unknown. Discarding " +
                                "the state handle to avoid lingering state.", message.getCheckpointId(),
                            message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                        break;
                    case DISCARDED: // 废弃的
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, " +
                                "because the pending checkpoint had been discarded. Discarding the " +
                                "state handle tp avoid lingering state.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                }

                return true;
            } else if (checkpoint != null) {
                // this should not happen
                throw new IllegalStateException(
                    "Received message for discarded but non-removed checkpoint " + checkpointId);
            } else {
                boolean wasPendingCheckpoint;

                // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
                if (recentPendingCheckpoints.contains(checkpointId)) {
                    wasPendingCheckpoint = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from task " +
                        "{} of job {} at {}.", checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                } else {
                    LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
                    wasPendingCheckpoint = false;
                }

                // try to discard the state so that we don't have lingering state lying around
                discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                return wasPendingCheckpoint;
            }
        }
    }

在进入该方法后,会先确认正在运行得checkpoint不等于null,并且不能是被丢弃的才会进入逻辑,

首先看一下switch中调用的方法,调用的checkpoint的acknowledgeTask方法,

该方法主要根据任务的状态,返回一个操作结果,

public TaskAcknowledgeResult acknowledgeTask(
            ExecutionAttemptID executionAttemptId,
            TaskStateSnapshot operatorSubtaskStates,
            CheckpointMetrics metrics) {

        synchronized (lock) {
            //如果弃用 ,返回弃用
            if (discarded) {
                return TaskAcknowledgeResult.DISCARDED;
            }

            // 从notYetAcknowledgedTasks集合中移除已确认的task
            // notYetAcknowledgedTasks保存了所有未确认的task
            final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);

            if (vertex == null) {
                // 如果notYetAcknowledgedTasks没有该task
                // 但是它在acknowledgedTasks(已确认的task)集合中
                // 返回重复确认DUPLICATE
                if (acknowledgedTasks.contains(executionAttemptId)) {
                    return TaskAcknowledgeResult.DUPLICATE;
                } else {
                    // 其他情况返回未知
                    return TaskAcknowledgeResult.UNKNOWN;
                }
            } else {
                //如果 不等于null  将其添加至已确认的task集合
                acknowledgedTasks.add(executionAttemptId);
            }

            List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();
            int subtaskIndex = vertex.getParallelSubtaskIndex();
            long ackTimestamp = System.currentTimeMillis();

            long stateSize = 0L;

            // 这段代码为保存各个operator的snapshot状态
            if (operatorSubtaskStates != null) {
                for (OperatorID operatorID : operatorIDs) {

                    //返回给定操作符id的子任务状态(如果不包含则返回null)。
                    OperatorSubtaskState operatorSubtaskState =
                        operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);

                    // if no real operatorSubtaskState was reported, we insert an empty state
                    //如果没获取到 operatorSubtaskSate的状态 给一个空状态
                    if (operatorSubtaskState == null) {
                        operatorSubtaskState = new OperatorSubtaskState();
                    }

                    //获取改 operator的状态
                    OperatorState operatorState = operatorStates.get(operatorID);

                    if (operatorState == null) {
                        //如果 operator有状态, 那么给添加一个默认的状态
                        operatorState = new OperatorState(
                            operatorID,
                            vertex.getTotalNumberOfParallelSubtasks(),
                            vertex.getMaxParallelism());
                        //添加的状态后 进行put
                        operatorStates.put(operatorID, operatorState);
                    }
                    //对 operator添加状态    平行任务的索引 operator子任务状态
                    operatorState.putState(subtaskIndex, operatorSubtaskState);
                    stateSize += operatorSubtaskState.getStateSize();
                }
            }

            ++numAcknowledgedTasks; //多少个确认的任务

            // publish the checkpoint statistics
            // to prevent null-pointers from concurrent modification, copy reference onto stack
            // 这段代码为汇报所有子任务checkpoint状态
            final PendingCheckpointStats statsCallback = this.statsCallback;
            if (statsCallback != null) {
                // Do this in millis because the web frontend works with them
                long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000;
                long checkpointStartDelayMillis = metrics.getCheckpointStartDelayNanos() / 1_000_000;

                SubtaskStateStats subtaskStateStats = new SubtaskStateStats(
                    subtaskIndex,
                    ackTimestamp,
                    stateSize,
                    metrics.getSyncDurationMillis(),
                    metrics.getAsyncDurationMillis(),
                    alignmentDurationMillis,
                    checkpointStartDelayMillis);

                statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats);
            }
            // 最后返回执行成功
            return TaskAcknowledgeResult.SUCCESS;
        }
    }

在switch语句中匹配到SUCCESS 会调用 completePendingCheckpoint方法, 在进入方法前会先判断checkpoint.areTasksFullyAcknowledged()方法,通过代码可以看出,所有任务都被确认,并且checkpoint不是被弃用的才会君如completePendingCheckpoint方法

    boolean areTasksFullyAcknowledged() {
        //    为确认的任务 =空                     不是废弃的
        return notYetAcknowledgedTasks.isEmpty() && !discarded;
    }

现在跟着进入completePendingCheckpoint方法

该方法, 会先将operator的状态注册到 注册表中,然后回调用完成checkpoint的逻辑,到这里checkpoint的过程就已经结束了

private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        final long checkpointId = pendingCheckpoint.getCheckpointId();
        final CompletedCheckpoint completedCheckpoint;

        // As a first step to complete the checkpoint, we register its state with the registry
        // 作为完成检查点的第一步,我们在注册表中注册检查点的状态
        // 注册所有operator的state到sharedStateRegistry
        Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
        sharedStateRegistry.registerAll(operatorStates.values());

        try {
            try {
                // 调用完成pendingCheckpoint的逻辑,
                completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
                // 重置失败checkpoint的计数
                failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
            } catch (Exception e1) {
                // abort the current pending checkpoint if we fails to finalize the pending checkpoint.
                // 如果我们未能完成挂起检查点,则终止当前挂起检查点
                if (!pendingCheckpoint.isDiscarded()) {
                    abortPendingCheckpoint(
                        pendingCheckpoint,
                        new CheckpointException(
                            CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
                }

                throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.',
                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
            }

            // the pending checkpoint must be discarded after the finalization
            // 检查状态,调用finalizeCheckpoint方法后pendingCheckpoint必须为discarded状态
            Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

            try {
                //存储完成的checkpoint
                completedCheckpointStore.addCheckpoint(completedCheckpoint);
            } catch (Exception exception) {
                // we failed to store the completed checkpoint. Let's clean up
                // 未能存储完成的检查点,那么进行清理
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            completedCheckpoint.discardOnFailedStoring();
                        } catch (Throwable t) {
                            LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
                        }
                    }
                });

                throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.',
                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
            }
        } finally {
            // 从正在进行中checkpoint集合中移除此checkpoint
            pendingCheckpoints.remove(checkpointId);

            // 恢复暂停周期性触发
            resumePeriodicTriggering();
        }

        // 保存最近的checkpoint ID
        rememberRecentCheckpointId(checkpointId);

        // drop those pending checkpoints that are at prior to the completed one
        // 在完成检查点之前删除那些挂起的检查点
        // 挂掉所有id小于checkpointId的checkpoint操作(被挂掉的checkpoint不能是强制的)
        dropSubsumedCheckpoints(checkpointId);

        // record the time when this was completed, to calculate
        // the 'min delay between checkpoints'
        // 保存此次checkpoint完成时间
        lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();

        //Completed checkpoint 2 for job d36ca92c353c5fc9794d42fdff834b5d (9013 bytes in 106 ms).
        // flink web监控 输出
        LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
            completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());

        if (LOG.isDebugEnabled()) {
            StringBuilder builder = new StringBuilder();
            builder.append("Checkpoint state: ");
            for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
                builder.append(state);
                builder.append(", ");
            }
            // Remove last two chars ", "
            builder.setLength(builder.length() - 2);

            LOG.debug(builder.toString());
        }

        // send the "notify complete" call to all vertices, coordinators, etc.
        sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
    }

最后我们看一下PendingCheckpoint如何生成CompletedCheckpoint的过程,首先会调用pendingCheckpoint.finalizeCheckpoint方法,在方法中会存储checkpoint的元数据等操作,最终将pendingCheckpoint变成CompletedCheckpoint状态,在这之后会将pendingCheckpoint标记为弃用的状态,返回完成的checkpoint

    public CompletedCheckpoint finalizeCheckpoint() throws IOException {

        synchronized (lock) {

            checkState(!isDiscarded(), "checkpoint is discarded");
            checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet");

            // make sure we fulfill the promise with an exception if something fails
            try {
                // write out the metadata
                //创建一个 checkpointMetadata对象       --改对象封装 检查点或保存的元数据
                final CheckpointMetadata savepoint = new CheckpointMetadata(checkpointId, operatorStates.values(), masterStates);

                final CompletedCheckpointStorageLocation finalizedLocation;

                // 保存checkpoint数据到文件系统
                // createMetadataOutputStream 创建将检查点元数据持久化到的输出流
                try (CheckpointMetadataOutputStream out = targetLocation.createMetadataOutputStream()) {
                    Checkpoints.storeCheckpointMetadata(savepoint, out);
                    //在写入所有元数据后关闭流并完成检查点位置
                    finalizedLocation = out.closeAndFinalizeCheckpoint();
                }

                // 创建一个 CompletedCheckpoint对象
                // CompletedCheckpoint描述在所有需要的任务确认之后的检查点(带有它们的状态),它被认为是成功的。
                // CompletedCheckpoint类包含检查点的所有元数据,即、检查点ID、时间戳以及检查点的所有状态的句柄
                CompletedCheckpoint completed = new CompletedCheckpoint(
                        jobId,
                        checkpointId,
                        checkpointTimestamp,
                        System.currentTimeMillis(),
                        operatorStates,
                        masterStates,
                        props,
                        finalizedLocation);
                // completableFuture任务完成,返回completedCheckpoint
                onCompletionPromise.complete(completed);

                // to prevent null-pointers from concurrent modification, copy reference onto stack
                // 设置completedCheckpoint的discardCallback
                PendingCheckpointStats statsCallback = this.statsCallback;
                if (statsCallback != null) {
                    // Finalize the statsCallback and give the completed checkpoint a
                    // callback for discards.
                    CompletedCheckpointStats.DiscardCallback discardCallback =
                            statsCallback.reportCompletedCheckpoint(finalizedLocation.getExternalPointer());
                    //设置当丢弃此检查点时用于跟踪的回调
                    completed.setDiscardCallback(discardCallback);
                }

                // mark this pending checkpoint as disposed, but do NOT drop the state
                // 标记自己为disposed状态
                dispose(false);

                return completed;
            }
            catch (Throwable t) {
                onCompletionPromise.completeExceptionally(t);
                ExceptionUtils.rethrowIOException(t);
                return null; // silence the compiler
            }
        }
    }

完成了pendingCheckpoint向completedCheckpoint转换后,会调用failureManager.handleCheckpointSuccess方法

这个方法主要是重置checkpoint失败的计数

    public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) {
        clearCount();
    }

checkpoint往期内容

Flink-1.10 源码笔记 checkpint - 1

Flink-1.10 源码笔记 checkpint - 2

如有错误,欢迎指正!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容