c++11多线程编程

C++11的多线程编程方式

std::thread

在c++11的标准里,线程已经由标准库提供:std::thread,它起源于POSIX thread,因此在使用std::thread来做线程编程时,编译需要带上-lpthread。

#include <iostream>
#include <utility>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>

void f1(int n)
{
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 1 executing\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

void f2(int& n)
{
    for (int i = 0; i < 5; ++i) {
        std::cout << "Thread 2 executing\n";
        ++n;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main()
{
            int n = 0;
            std::thread t1; // t1 is not a thread
            std::thread t2(f1, n + 1); // pass by value
            std::thread t3(f2, std::ref(n)); // pass by reference
            std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
            t2.join();
            t4.join();
            std::cout << "Final value of n is " << n << '\n';
}

future和promise

除了std::thread这种原始的多线程编程方式,c++11还提供了future,promise这种语义的多线程编程方式,future以及promise能很好地处理需要线程之间传递数据以及同步的情况。

// promise example
#include <iostream>       // std::cout
#include <functional>     // std::ref
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

void print_int (std::future<int>& fut) {
    int x = fut.get();
    std::cout << "value: " << x << '\n';
}

int main ()
{
  std::promise<int> prom;                      // create promise

  std::future<int> fut = prom.get_future();    // engagement with future

  std::thread th1 (print_int, std::ref(fut));  // send future to new thread

  prom.set_value (10);                         // fulfill promise
                                             // (synchronizes with getting the future)
  th1.join();
  return 0;
}

packaged_task

packaged_task是用于将future和promise连接起来的模板类,旨在减少使用future和promise语义编写多线程程序时的代码的冗余。

// packaged_task example
#include <iostream>     // std::cout
#include <future>       // std::packaged_task, std::future
#include <chrono>       // std::chrono::seconds
#include <thread>       // std::thread, std::this_thread::sleep_for

// count down taking a second for each value:
int countdown (int from, int to) {
  for (int i=from; i!=to; --i) {
  std::cout << i << '\n';
  std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from-to;
}

int main ()
{
  std::packaged_task<int(int,int)> tsk (countdown);   // set up packaged_task
  std::future<int> ret = tsk.get_future();            // get future

  std::thread th (std::move(tsk),10,0);   // spawn thread to count down from 10 to 0

  // ...

  int value = ret.get();                  // wait for the task to finish and get result

  std::cout << "The countdown lasted for " << value << " seconds.\n";

  th.join();

  return 0;
}

async

async提供了最简单的并发编程方式,使用async进行并行编程无需考虑线程和锁,在调用async时,该任务应该不包含有需要锁保护的共享数据,而async会根据当前cpu的core的使用情况来决定创建多少个thread来运行该任务。

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>

template <typename RAIter>
int parallel_sum(RAIter beg, RAIter end)
{
    auto len = end - beg;
    if(len < 1000)
        return std::accumulate(beg, end, 0);

    RAIter mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                         parallel_sum<RAIter>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
 }

int main()
{
     std::vector<int> v(10000, 1);
     std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
}

c++11控制并发逻辑的方式

future和promise

这种方式主要是通过这种语义实现程序上的多线程的并发逻辑的控制,具体参考上面的程序。

锁与条件变量

锁是一个经典的概念,经常被用于控制多线程访问共享数据的逻辑,条件变量更是提供了同步多个线程的语义。条件变量的必须和锁——跟更确地是std::unique_lock<std::mutex>——配合使用,
这种配合使用的限制可以使得条件变量在一些平台上的效率得到最大的优化。

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
     // Wait until main() sends data
     std::unique_lock<std::mutex> lk(m);
     cv.wait(lk, []{return ready;});

     // after the wait, we own the lock.
     std::cout << "Worker thread is processing data\n";
     data += " after processing";

     // Send data back to main()
     processed = true;
     std::cout << "Worker thread signals data processing completed\n";

     // Manual unlocking is done before notifying, to avoid waking up
     // the waiting thread only to block again (see notify_one for details)
     lk.unlock();
     cv.notify_one();
 }

int main()
{
    std::thread worker(worker_thread);

    data = "Example data";
        // send data to the worker thread
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();

    // wait for the worker
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';

    worker.join();
}

std::lock_guard和std::unique_lock类似,但是前者在构造的时候就会加锁,后者可以不用在构造时就立即加锁(默认构造不需要锁),加锁可以发生在其生命期间的任何时刻,另外后者也可以转移其所有权到别的变量。
条件变量的notify方法能用于线程的唤醒,当有notify发生时,其他处于waiting状态的线程会尝试获取锁,成功后判断需要唤醒的条件(通过wait的第二个参数给出)是否为真,如果是真就唤醒,否则释放锁继续waiting。

一个充分使用了c++11特性的线程池

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
       -> std::future<typename std::result_of<F(Args...)>::type>;
          ~ThreadPool();
           private:
           // need to keep track of threads so we can join them
           std::vector< std::thread > workers;
           // the task queue
           std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
        for(size_t i = 0;i<threads;++i)
            workers.emplace_back(
                        [this]
            {
                for(;;)
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                                             [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());                                                                                                                                                               
                        task();
                    }
                }   
             );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
                        );

    std::future<return_type> res = task->get_future();
    {
       std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
       if(stop)
           throw std::runtime_error("enqueue on stopped ThreadPool");

       tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
 }

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

该线程池主要是维护了一个任务的队列,初始化好的线程不断从该队列中取出任务并执行,同时维护好队列。

一些陷阱

std::vector<std::future<std::pair<std::string, std::string> > > results;
results.emplace_back(thread_pool->enqueue([](){return std::make_pair();}));

for (auto && res: results) {
    auto & ret = res.get();
    auto t = std::thread(somefunc, std::ref(ret.first), std::ref(ret.second));
    t.detach();
}

这里的问题在于ret.first, ret.second会在不同线程里使用同一个对象的引用,其值在不同线程里都一样,这明显有悖初衷。修改的方法简单直接:使用值传递,而不是使用引用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容