C++基于消息队列的线程池实现

实现消息队列的关键因素是考量不同线程访问消息队列的同步问题。本实现涉及到几个知识点

std::lock_guard 介绍

std::lock_gurad 是 C++11 中定义的模板类。定义如下:
template <class Mutex> class lock_guard;
lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态;而 lock_guard 的生命周期结束之后,它所管理的锁对象会被解锁(注:类似 shared_ptr 等智能指针管理动态分配的内存资源 )。
模板参数 Mutex 代表互斥量类型,例如 std::mutex 类型,它应该是一个基本的 BasicLockable 类型,标准库中定义几种基本的 BasicLockable 类型,分别 std::mutex, std::recursive_mutex, std::timed_mutex,std::recursive_timed_mutex 以及 std::unique_lock

std::unique_lock 介绍

lock_guard 最大的缺点也是简单,没有给程序员提供足够的灵活度,因此,C++11 标准中定义了另外一个与 Mutex RAII 相关类 unique_lock,该类与 lock_guard 类相似,也很方便线程对互斥量上锁,但它提供了更好的上锁和解锁控制。
顾名思义,unique_lock 对象以独占所有权的方式( unique owership)管理 mutex 对象的上锁和解锁操作,所谓独占所有权,就是没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权。
新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.lock() 对 Mutex 对象进行上锁,如果此时另外某个 unique_lock 对象已经管理了该 Mutex 对象 m,则当前线程将会被阻塞。

std::condition介绍

当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。
std::condition_variable 提供了两种 wait() 函数。当前线程调用 wait() 后将被阻塞(此时当前线程应该获得了锁(mutex),不妨设获得锁 lck),直到另外某个线程调用 notify_* 唤醒了当前线程。
在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。
在第二种情况下(即设置了 Predicate),只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞。因此第二种情况类似以下代码:
while (!pred()) wait(lck);

std::function介绍

使用std::function可以将普通函数,lambda表达式和函数对象类统一起来。它们并不是相同的类型,然而通过function模板类,可以转化为相同类型的对象(function对象),从而放入一个vector或其他容器里,方便回调。

代码实现:

#pragma once

#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H

#include <queue>
#include <mutex>
#include <condition_variable>

template<class Type>
class CMessageQueue
{
public:
    CMessageQueue& operator = (const CMessageQueue&) = delete;
    CMessageQueue(const CMessageQueue& mq) = delete;


    CMessageQueue() :_queue(), _mutex(), _condition(){}
    virtual ~CMessageQueue(){}

    void Push(Type msg){
        std::lock_guard <std::mutex> lock(_mutex);
        _queue.push(msg);
         //当使用阻塞模式从消息队列中获取消息时,由condition在新消息到达时提醒等待线程
        _condition.notify_one();
    }
        //blocked定义访问方式是同步阻塞或者非阻塞模式
    bool Pop(Type& msg, bool isBlocked = true){
        if (isBlocked)
        {
            std::unique_lock <std::mutex> lock(_mutex);
            while (_queue.empty())
            {
                _condition.wait(lock);
                
            }
            //注意这一段必须放在if语句中,因为lock的生命域仅仅在if大括号内
            msg = std::move(_queue.front());
            _queue.pop();
            return true;
            
        }
        else
        {
            std::lock_guard<std::mutex> lock(_mutex);
            if (_queue.empty())
                return false;


            msg = std::move(_queue.front());
            _queue.pop();
            return true;
        }

    }

    int32_t Size(){
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.size();
    }

    bool Empty(){
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.empty();
    }
private:
    std::queue<Type> _queue;//存储消息的队列
    mutable std::mutex _mutex;//同步锁
    std::condition_variable _condition;//实现同步式获取消息
};

#endif//MESSAGE_QUEUE_H

线程池可以直接在构造函数中构造线程,并传入回调函数,也可以写一个Run函数显示调用。这里我们选择了第二种,对比:

  1. 在handler函数外部做循环接受消息,当消息到达后调用hanlder处理。这种实现在上层做封装,但是会在线程中频繁的切换调用函数。这种设计无法复用一些资源,如当在handler中做数据库操作时,需要频繁的连接和断开连接,可以通过定义两个虚函数Prehandler和AfterHandler来实现。
    !!!构造函数中调用虚函数并不会能真正的调用子类的实现!!!
    虽然可以对虚函数进行实调用,但程序员编写虚函数的本意应该是实现动态联编。在构造函数中调用虚函数,函数的入口地址是在编译时静态确定的,并未实现虚调用

  2. 写一个Run函数,将这一部分实现放在run函数中,显示调用。
    《Effective C++ 》条款9:永远不要在构造函数或析构函数中调用虚函数

#ifndef THREAD_POOL_H
#define THREAD_POOL_H


#include <functional>
#include <vector>
#include <thread>

#include "MessageQueue.h"

#define MIN_THREADS 1

template<class Type>
class CThreadPool
{
    CThreadPool& operator = (const CThreadPool&) = delete;
    CThreadPool(const CThreadPool& other) = delete;

public:
    CThreadPool(int32_t threads, 
       std::function<void(Type& record, CThreadPool<Type>* pSub)> handler);
    virtual ~CThreadPool();

    void Run();
    virtual void PreHandler(){}
    virtual void AfterHandler(){}
    void Submit(Type record);


private:
    bool _shutdown;
    int32_t _threads;
    std::function<void(Type& record, CThreadPool<Type>* pSub)> _handler;
    std::vector<std::thread> _workers;
    CMessageQueue<Type> _tasks;

};



template<class Type>
CThreadPool<Type>::CThreadPool(int32_t threads, 
    std::function<void(Type& record,  CThreadPool<Type>* pSub)> handler)
    :_shutdown(false),
    _threads(threads),
    _handler(handler),
    _workers(),
    _tasks()
{

    //第一种实现方案,注意这里的虚函数调用不正确
    /*if (_threads < MIN_THREADS)
        _threads = MIN_THREADS;
    for (int32_t i = 0; i < _threads; i++)
    {
        
        _workers.emplace_back(
            [this]{
            PreHandler();
            while (!_shutdown){
                Type record;
                _tasks.Pop(record, true);
                _handler(record, this);
            }
            AfterHandler();
        }
        );
    }*/

}

//第二种实现方案
template<class Type>
void CThreadPool<Type>::Run()
{
    if (_threads < MIN_THREADS)
        _threads = MIN_THREADS;
    for (int32_t i = 0; i < _threads; i++)
    {
        _workers.emplace_back(
            [this]{
            PreHandler();
            while (!_shutdown){
                Type record;
                _tasks.Pop(record, true);
                _handler(record, this);
            }
            AfterHandler();
        }
        );
    }
}




template<class Type>
CThreadPool<Type>::~CThreadPool()
{
    for (std::thread& worker : _workers)
        worker.join();
}


template<class Type>
void CThreadPool<Type>::Submit(Type record)
{
    _tasks.Push(record);
}
#endif // !THREAD_POOL_H
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容