ClickHouse内存管理之MemoryTracker(附源码分析)

先说总结

ClickHouse数据库中有很多不同level的MemoryTracker,包括线程级别、查询级别、用户级别、server级别,这些MemoryTracker会通过parent指针组织成一个树形结构,把内存申请释放信息层层反馈上去。

MemoryTrack中还有额外的峰值信息(peak)统计,内存上限检查,一旦某个查询线程的申请内存请求在上层(查询级别、用户级别、server级别)MemoryTracker遇到超过限制错误,查询线程就会抛出OOM(Out Of Memory)异常导致查询退出。同时查询线程的MemoryTracker每申请一定量的内存都会统计出当前的工作栈,非常方便排查内存OOM(Out Of Memory)的原因。

1.ThreadGroup

ClickHouse的MPP计算引擎中每个查询的主线程都会有一个ThreadGroup对象,每个MPP引擎worker线程在启动时必须要attach到ThreadGroup上,在线程退出时detach,这保证了整个资源追踪链路的完整传递。

ThreadGroup对象的代码定义如下:

/** Thread group is a collection of threads dedicated to single task
  * (query or other process like background merge).
  *
  * ProfileEvents (counters) from a thread are propagated to thread group.
  *
  * Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks).
  * Use via CurrentThread::getGroup.
  */
class ThreadGroupStatus
{
public:
    mutable std::mutex mutex;

    ProfileEvents::Counters performance_counters{VariableContext::Process};
    MemoryTracker memory_tracker{VariableContext::Process};

    Context * query_context = nullptr;
    Context * global_context = nullptr;

    InternalTextLogsQueueWeakPtr logs_queue_ptr;
    std::function<void()> fatal_error_callback;

    std::vector<UInt64> thread_ids;

    /// The first thread created this thread group
    UInt64 master_thread_id = 0;

    LogsLevel client_logs_level = LogsLevel::none;

    String query;
    UInt64 normalized_query_hash;
};

using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
void PipelineExecutor::executeImpl(size_t num_threads)
{
    OpenTelemetrySpanHolder span("PipelineExecutor::executeImpl()");

    initializeExecution(num_threads);

    using ThreadsData = std::vector<ThreadFromGlobalPool>;
    ThreadsData threads;
    threads.reserve(num_threads);

    bool finished_flag = false;

    SCOPE_EXIT(
        if (!finished_flag)
        {
            finish();

            for (auto & thread : threads)
                if (thread.joinable())
                    thread.join();
        }
    );

    if (num_threads > 1)
    {
        auto thread_group = CurrentThread::getGroup();

        for (size_t i = 0; i < num_threads; ++i)
        {
            // 把lambda表达式作为子线程的参数,由于lambda表达式可以方便的捕获作用域中的变量,故可以作为子线程的参数
            // 线程池中的thread真正的执行体, emplace_back到threads中之后的调度由操作系统控制
            // 注意GlobalThreadPool基于std::thread实现
            threads.emplace_back([this, thread_group, thread_num = i, num_threads]
            {
                /// ThreadStatus thread_status;

                setThreadName("QueryPipelineEx");

                if (thread_group)
                    CurrentThread::attachTo(thread_group);

                SCOPE_EXIT(
                        if (thread_group)
                            CurrentThread::detachQueryIfNotDetached();
                );

                try
                {
                    executeSingleThread(thread_num, num_threads);
                }
                catch (...)
                {
                    /// In case of exception from executor itself, stop other threads.
                    finish();
                    executor_contexts[thread_num]->exception = std::current_exception();
                }
            });
        }

#if defined(OS_LINUX)
        {
            /// Wait for async tasks.
            std::unique_lock lock(task_queue_mutex);
            while (auto task = async_task_queue.wait(lock))
            {
                auto * node = static_cast<ExecutingGraph::Node *>(task.data);
                executor_contexts[task.thread_num]->async_tasks.push(node);
                executor_contexts[task.thread_num]->has_async_tasks = true;
                ++num_waiting_async_tasks;

                if (threads_queue.has(task.thread_num))
                {
                    threads_queue.pop(task.thread_num);
                    wakeUpExecutor(task.thread_num);
                }
            }
        }
#endif

        for (auto & thread : threads)
            if (thread.joinable())
                thread.join();
    }
    else
        executeSingleThread(0, num_threads);

    finished_flag = true;
}

可以看到这个关键的代码行,顾名思义,是一个thread汇聚到一个thread的集合里。

 CurrentThread::attachTo(thread_group);

2.Attach to ThreadGroup

具体Attach过程:

void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
    if (thread_state == ThreadState::AttachedToQuery)
    {
        if (check_detached)
            throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
        return;
    }

    if (!thread_group_)
        throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);

    setupState(thread_group_);
}

void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
{
    assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);

    /// Attach or init current thread to thread group and copy useful information from it
    thread_group = thread_group_;

    performance_counters.setParent(&thread_group->performance_counters);
    memory_tracker.setParent(&thread_group->memory_tracker);

    {
        std::lock_guard lock(thread_group->mutex);

        /// NOTE: thread may be attached multiple times if it is reused from a thread pool.
        thread_group->thread_ids.emplace_back(thread_id);

        logs_queue_ptr = thread_group->logs_queue_ptr;
        fatal_error_callback = thread_group->fatal_error_callback;
        query_context = thread_group->query_context;

        if (!global_context)
            global_context = thread_group->global_context;
    }

    if (query_context)
    {
        applyQuerySettings();

        // Generate new span for thread manually here, because we can't depend
        // on OpenTelemetrySpanHolder due to link order issues.
        thread_trace_context = query_context->query_trace_context;
        if (thread_trace_context.trace_id)
        {
            thread_trace_context.span_id = thread_local_rng();
        }
    }
    else
    {
        thread_trace_context.trace_id = 0;
    }

    initPerformanceCounters();

    thread_state = ThreadState::AttachedToQuery;
}

这里的两行代码表示了当前thread和threadGroups的一个关系。

从这里也不难猜出threadGroups应该是有一个总池子,thread有一个自己的池子。

这里将threadGroups的池子设置为所有thread线程的总池子。

那么,这说明,threadGroups的一些设置,对于单个thread的执行,是有约束作用的。

image

我们看看在哪些地方有用到了attchgroup。

image

3.重载new\delete

如何把CurrentThread::MemoryTracker hook到系统的内存申请、释放上去?ClickHouse首先是重载了c++的new_delete operator,其次针对需要使用malloc的一些场景封装了特殊的Allocator同步内存申请释放。

文件所在路径: src/Common/new_delete.cpp

/// new
void * operator new(std::size_t size)
{   
    //这里我们一会儿重点看一下
    Memory::trackMemory(size);
    return Memory::newImpl(size);
}

void * operator new[](std::size_t size)
{
    Memory::trackMemory(size);
    return Memory::newImpl(size);
}

void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
    if (likely(Memory::trackMemoryNoExcept(size)))
        return Memory::newNoExept(size);
    return nullptr;
}

void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
    if (likely(Memory::trackMemoryNoExcept(size)))
        return Memory::newNoExept(size);
    return nullptr;
}

/// delete

/// C++17 std 21.6.2.1 (11)
/// If a function without a size parameter is defined, the program should also define the corresponding function with a size parameter.
/// If a function with a size parameter is defined, the program shall also define the corresponding version without the size parameter.

/// cppreference:
/// It's unspecified whether size-aware or size-unaware version is called when deleting objects of
/// incomplete type and arrays of non-class and trivially-destructible class types.

void operator delete(void * ptr) noexcept
{
    Memory::untrackMemory(ptr);
    Memory::deleteImpl(ptr);
}

void operator delete[](void * ptr) noexcept
{
    Memory::untrackMemory(ptr);
    Memory::deleteImpl(ptr);
}

void operator delete(void * ptr, std::size_t size) noexcept
{
    Memory::untrackMemory(ptr, size);
    Memory::deleteSized(ptr, size);
}

void operator delete[](void * ptr, std::size_t size) noexcept
{
    Memory::untrackMemory(ptr, size);
    Memory::deleteSized(ptr, size);
}

重点看上面代码段中标注的一行。

inline ALWAYS_INLINE void trackMemory(std::size_t size)
{
    std::size_t actual_size = size;

#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 5
    /// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
    /// @note je_mallocx() != je_malloc(). It's expected they don't differ much in allocation logic.
    if (likely(size != 0))
        actual_size = nallocx(size, 0);
#endif

    CurrentMemoryTracker::alloc(actual_size);
}

接下来看看alloc的实现,这个alloc并不是真正的alloc,是从限制中分配的alloc。

也就是说,真正的trick在这里,还记得刚才提到的池子么?

这个池子如果分配到了上限,那么,程序就提示报错了。

namespace CurrentMemoryTracker
{

using DB::current_thread;

void alloc(Int64 size)
{
    if (auto * memory_tracker = getMemoryTracker())
    {
        if (current_thread)
        {
            current_thread->untracked_memory += size;
            if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
            {
                /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
                /// more. It could be useful to enlarge Exception message in rethrow logic.
                Int64 tmp = current_thread->untracked_memory;
                current_thread->untracked_memory = 0;
                memory_tracker->alloc(tmp);
            }
        }
        /// total_memory_tracker only, ignore untracked_memory
        else
        {
            memory_tracker->alloc(size);
        }
    }
}
void MemoryTracker::alloc(Int64 size)
{
    if (size < 0)
        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Negative size ({}) is passed to MemoryTracker. It is a bug.", size);

    if (BlockerInThread::isBlocked(level))
    {
        /// Since the BlockerInThread should respect the level, we should go to the next parent.
        if (auto * loaded_next = parent.load(std::memory_order_relaxed))
            loaded_next->alloc(size);
        return;
    }

    /** Using memory_order_relaxed means that if allocations are done simultaneously,
      *  we allow exception about memory limit exceeded to be thrown only on next allocation.
      * So, we allow over-allocations.
      */
    Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);

    auto metric_loaded = metric.load(std::memory_order_relaxed);
    if (metric_loaded != CurrentMetrics::end())
        CurrentMetrics::add(metric_loaded, size);

    //这里有一个hard_limit,也就是内存的上限,如果超过这个上限,那么,要强制报错了。
    Int64 current_hard_limit = hard_limit.load(std::memory_order_relaxed);
    Int64 current_profiler_limit = profiler_limit.load(std::memory_order_relaxed);

    /// Cap the limit to the total_memory_tracker, since it may include some drift
    /// for user-level memory tracker.
    ///
    /// And since total_memory_tracker is reset to the process resident
    /// memory peridically (in AsynchronousMetrics::update()), any limit can be
    /// capped to it, to avoid possible drift.
    if (unlikely(current_hard_limit
        && will_be > current_hard_limit
        && level == VariableContext::User))
    {
        Int64 total_amount = total_memory_tracker.get();
        if (amount > total_amount)
        {
            set(total_amount);
            will_be = size + total_amount;
        }
    }

#ifdef MEMORY_TRACKER_DEBUG_CHECKS
    if (unlikely(_memory_tracker_always_throw_logical_error_on_allocation))
    {
        _memory_tracker_always_throw_logical_error_on_allocation = false;
        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Memory tracker: allocations not allowed.");
    }
#endif

    std::bernoulli_distribution fault(fault_probability);
    if (unlikely(fault_probability && fault(thread_local_rng)) && memoryTrackerCanThrow(level, true))
    {
        /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
        BlockerInThread untrack_lock(VariableContext::Global);

        ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
        const auto * description = description_ptr.load(std::memory_order_relaxed);
        amount.fetch_sub(size, std::memory_order_relaxed);
        throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
                            "Memory tracker{}{}: fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
                            description ? " " : "", description ? description : "",
                            formatReadableSizeWithBinarySuffix(will_be),
                            size, formatReadableSizeWithBinarySuffix(current_hard_limit));
    }

    if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
    {
        BlockerInThread untrack_lock(VariableContext::Global);
        DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
        setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
    }

    std::bernoulli_distribution sample(sample_probability);
    if (unlikely(sample_probability && sample(thread_local_rng)))
    {
        BlockerInThread untrack_lock(VariableContext::Global);
        DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
    }

    //触发内存超限条件
    if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false))
    {
        /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
        BlockerInThread untrack_lock(VariableContext::Global);

        ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
        const auto * description = description_ptr.load(std::memory_order_relaxed);
        amount.fetch_sub(size, std::memory_order_relaxed);
        throw DB::Exception(DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
                            "Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
                            description ? " " : "", description ? description : "",
                            formatReadableSizeWithBinarySuffix(will_be),
                            size, formatReadableSizeWithBinarySuffix(current_hard_limit));
    }

    updatePeak(will_be);

    //everything goes ok until here , normal way will be executing..
    if (auto * loaded_next = parent.load(std::memory_order_relaxed))
        loaded_next->alloc(size);
}

为了解决内存追踪的性能问题,每个线程的内存申请释放会在thread local变量上进行积攒,最后以大块内存的形式同步给MemoryTracker。

/** Tracks memory consumption.
  * It throws an exception if amount of consumed memory become greater than certain limit.
  * The same memory tracker could be simultaneously used in different threads.
  */
class MemoryTracker
{
    std::atomic<Int64> amount {0};
    std::atomic<Int64> peak {0};
    std::atomic<Int64> hard_limit {0};
    std::atomic<Int64> profiler_limit {0};

    Int64 profiler_step = 0;

    /// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
    double fault_probability = 0;

    /// To randomly sample allocations and deallocations in trace_log.
    double sample_probability = 0;

    /// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy).
    /// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker.
    std::atomic<MemoryTracker *> parent {};

    /// You could specify custom metric to track memory usage.
    CurrentMetrics::Metric metric = CurrentMetrics::end();

    /// This description will be used as prefix into log messages (if isn't nullptr)
    std::atomic<const char *> description_ptr = nullptr;
    ......
}

4. MemoryTrack 层级

上面所有论述讲述的是thread级别的memoryTracker设置。

下面看看User 级别的memoryTracker设置和Query级别的设置。

server参数设置为settings.max_memory_usage_for_user

我们来深入看一把ClickHouse打标user和query memoryTrack的逻辑。

首先,在执行Query的时候,ClickHouse server会对Query进行分析,交由ProcessList类对executeQuery时做了一部分检查工作。

包括一系列的阈值检查。比如是否超过某个user的最大并发执行数。

image

接下来ProcessList对象会设置memoryTracker。如下图所示。

image

标红的两段代码一个设置了single user级别的multiple queries的memory限制。一个设置了single query级别的memory限制。

执行顺序决定了memoryTrack的层级。

每一个线程执行,在runImp时,就已经生成了一个thread_status。

这个status后续会和clickhouse server从global线程池中分配的线程做一个绑定。这也是为什么文章开头的地方起名attach的缘故。

image
void CurrentThread::attachTo(const ThreadGroupStatusPtr & thread_group)
{
    if (unlikely(!current_thread))
        return;
    current_thread->attachQuery(thread_group, true);
    current_thread->deleter = CurrentThread::defaultThreadDeleter;
}

void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached)
{
    if (thread_state == ThreadState::AttachedToQuery)
    {
        if (check_detached)
            throw Exception("Can't attach query to the thread, it is already attached", ErrorCodes::LOGICAL_ERROR);
        return;
    }

    if (!thread_group_)
        throw Exception("Attempt to attach to nullptr thread group", ErrorCodes::LOGICAL_ERROR);

    setupState(thread_group_);
}

void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
{
    assertState({ThreadState::DetachedFromQuery}, __PRETTY_FUNCTION__);

    /// Attach or init current thread to thread group and copy useful information from it
    thread_group = thread_group_;

    performance_counters.setParent(&thread_group->performance_counters);
    //注意,这里设置了thread的memoryTrack的parent是thread_group。
    memory_tracker.setParent(&thread_group->memory_tracker);

    {
        std::lock_guard lock(thread_group->mutex);

        /// NOTE: thread may be attached multiple times if it is reused from a thread pool.
        thread_group->thread_ids.emplace_back(thread_id);

        logs_queue_ptr = thread_group->logs_queue_ptr;
        fatal_error_callback = thread_group->fatal_error_callback;
        query_context = thread_group->query_context;

        if (!global_context)
            global_context = thread_group->global_context;
    }

    if (query_context)
    {
        applyQuerySettings();

        // Generate new span for thread manually here, because we can't depend
        // on OpenTelemetrySpanHolder due to link order issues.
        thread_trace_context = query_context->query_trace_context;
        if (thread_trace_context.trace_id)
        {
            thread_trace_context.span_id = thread_local_rng();
        }
    }
    else
    {
        thread_trace_context.trace_id = 0;
    }

    initPerformanceCounters();

    thread_state = ThreadState::AttachedToQuery;
}

从上述代码中可以看到thread在初始化之后,已经设置了thread的memoryTrack的parent是thread_group。

接下来,我们回到ProcessList看看执行完对象逻辑后,memoryTracker的层级变为什么。

image

如上图示,针对threadGroup设置层级为 for query -> for user -> total,刚才提到thread的parent已经是threadGroup。所以对于thread来说,memoryTracker的层级是:

for thread -> for query -> for user -> total

image

至此,我们基本理顺了memoryTracker的层级关系。

但是我们到现在为止并没有看到哪里有设置Setparent(totalMemoryTracker)的地方。

image

这里,不管后来的parent如何设置,新加入的节点的parent一定是total_memory_tracker。

clickhouse server 在启动的时候设置了 total_memory_tracker。

image

5.整体MemoryTracker流程

从上面的1、2、3、4点我们可以了解到。

对于一个Query来说。

有3个memory限制的关卡。分别对应参数为:

 max_memory_usage,  "Maximum memory usage for processing of single query. Zero means unlimited."
 max_memory_usage_for_user, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited."

上面两个是对于user级别的限定。

    <!-- Maximum memory usage (resident set size) for server process.
         Zero value or unset means default. Default is "max_server_memory_usage_to_ram_ratio" of available physical RAM.
         If the value is larger than "max_server_memory_usage_to_ram_ratio" of available physical RAM, it will be cut down.

         The constraint is checked on query execution time.
         If a query tries to allocate memory and the current memory usage plus allocation is greater
          than specified threshold, exception will be thrown.

         It is not practical to set this constraint to small values like just a few gigabytes,
          because memory allocator will keep this amount of memory in caches and the server will deny service of queries.
      -->
    <max_server_memory_usage>0</max_server_memory_usage>

上面这一个,是总的内存限定。这个值并没有在setting.h中有默认值,这个如果在配置文件中不指定的话,就是0,不设上限。

所以这个设置应该是: max_memory_usage < max_memory_usage_for_user < max_server_memory_usage。

参考

云数据库ClickHouse资源隔离-弹性资源队列
https://developer.aliyun.com/article/780376?utm_content=g_1000223973

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容