本文根据众多互联网博客内容整理后形成,引用内容的版权归原始作者所有,仅限于学习研究使用,不得用于任何商业用途。
互斥
互斥是多线程系统中用于控制访问的一个原对象(primitive object)。下面的例子给出了它最基本的用法:
std::mutex m;
int sh; //共享数据
// …
m.lock();
// 对共享数据进行操作:
sh += 1;
m.unlock();
在任何时刻,最多只能有一个线程执行到lock()和unlock()之间的区域(通常称为临界区)。当第一个线程正在临界区执行时,后续执行到m.lock()的线程将会被阻塞直到第一个进程执行到m.unlock()。这个过程比较简单,但是如何正确使用互斥并不简单。错误地使用互斥将会导致一系列严重后果。大家可以设想以下情形所导致的后果:一个线程只进行了lock()而没有执行相应unlock(); 一个线程对同一个mutex对象执行了两次lock()操作;一个线程在等待unlock()操作时被阻塞了很久;一个线程需要对两个mutex对象执行lock()操作后才能执行后续任务。可以在很多书(译者注:通常操作系统相关书籍中会讲到)中找到这些问题的答案。在这里(包括Locks section一节)所给出的都是一些入门级别的。
除了lock(),mutex还提供了try_lock()操作。线程可以借助该操作来尝试进入临界区,这样一来该线程不会在失败的情况下被阻塞。下面例子给出了try_lock()的用法:
std::mutex m;
int sh; //共享数据
// …
if (m.try_lock()) {
//操作共享数据
sh += 1;
m.unlock();
}
else {
//可能在试图进入临界区失败后执行其它代码
}
recursive_mutex是一种能够被同一线程连续锁定多次的mutex。下面是recursive_mutex的一个实例:
std::recursive_mutex m;
int sh; //共享数据
//..
void f(int i)
{
//…
m.lock();
//对共享数据进行操作
sh += 1;
if (–i>0) f(i); //注意:这里对f(i)进行了递归调用,
//将导致在m.unlock()之前多次执行m.lock()
m.unlock();
//…
}
对于这点,我曾经夸耀过并且用f()调用它自身。一般地,代码会更加微妙。这是因为代码中经常会有间接递归调用。比如f()调用g(),而g()又调用了h(),最后h()又调用了f(),这样就形成了一个间接递归。
如果我想在未来的10秒内进入到一个mutex所划定的临界区,该如果实现? timed_mutex类可以解决这个问题。事实上,关于它的使用可以被看做是关联了时间限制的try_lock()的一个特例。
std::timed_mutex m;
int sh; //共享数据
//…
if ( m.try_lock_for(std::chrono::seconds(10))) {
//对共享数据进行操作
sh += 1;
m.unlock();
}
else {
//进入临界区失败,在此执行其它代码
}
try_lock_for()的参数是一个用相对时间表示的duration。如果你不想这么做而是想等到一个固定的时间点:一个time_point,你可以使用try_lock_until():
std::timed_mutex m;
int sh; //共享数据
// …
if ( m.try_lock_until(midnight)) {
//对共享数据进行操作
sh += 1;
m.unlock();
}
else {
//进入临界区失败,在此执行其它代码
}
这里使用midnight是一个冷笑话:对于mutex级别的操作,相应的时间是毫秒级别的而不是小时。
当然地,C++0x中也有recursive_timed_mutex。
mutex可以被看做是一个资源(因为它经常被用来代表一种真实的资源),并且当它对至少两个线程可见时它才是有用的。必然地,mutex不能被复制或者移动(正如你不能复制一个硬件的输入寄存器)。
令人惊讶地,实际中经常很难做到lock()s与unlock()s的匹配。设想一下那些复杂的控制结构,错误以及异常,要做到匹配的确比较困难。如果你可以选择使用locks去管理你的互斥,这将为你和你的用户节省大量的时间,再也不用熬夜通宵彻夜无眠了。(that will save you and your users a lot of sleep??)。
std::future和std::promise
并行开发挺复杂的,特别是在试图用好线程和锁的过程中。如果要用到条件变量或std-atomics(一种无锁开发方式),那就更复杂了。C++0x提供了future和promise来简化任务线程间的返回值操作;同时为启动任务线程提供了packaged_task以方便操作。其中的关键点是允许2个任务间使用无(显式)锁的方式进行值传递;标准库帮你高效的做好这些了。基本思路很简单:当一个任务需要向父线程(启动它的线程)返回值时,它把这个值放到promise中。之后,这个返回值会出现在和此promise关联的future中。于是父线程就能读到返回值。更简单点的方法,参看async()。
标准库中提供了3种future:普通future和为复杂场合使用的shared_future和atomic_future。在本主题中,只展示了普通future,它已经完全够用了。如果我们有一个future
f,通过get()可以获得它的值:
X v = f.get(); // if necessary wait for the value to get computed
如果它的返回值还没有到达,调用线程会进行阻塞等待。要是等啊等啊,等到花儿也谢了的话,get()会抛出异常的(从标准库或等待的线程那个线程中抛出)。
如果我们不需要等待返回值(非阻塞方式),可以简单询问一下future,看返回值是否已经到达:
if (f.wait_for(0))
{
// there is a value to get()
// do something
}
else
{
// do something else
}
但是,future最主要的目的还是提供一个简单的获取返回值的方法:get()。
promise的主要目的是提供一个”put”(或”get”,随你)操作,以和future的get()对应。future和promise的名字是有历史来历的,是一个双关语。感觉有点别扭?请别怪我。
promise为future传递的结果类型有2种:传一个普通值或者抛出一个异常
try {
X res;
// compute a value for res
p.set_value(res);
}
catch (…) { // oops: couldn’t compute res
p.set_exception(std::current_exception());
}
到目前为止还不错,不过我们如何匹配future/promise对呢?一个在我的线程,另一个在别的啥线程中吗?是这样:既然future和promise可以被到处移动(不是拷贝),那么可能性就挺多的。最普遍的情况是父子线程配对形式,父线程用future获取子线程promise返回的值。在这种情况下,使用async()是很优雅的方法。
packaged_task提供了启动任务线程的简单方法。特别是它处理好了future和promise的关联关系,同时提供了包装代码以保证返回值/异常可以放到promise中,示例代码:
void comp(vector& v)
{
// package the tasks:
// (the task here is the standard
// accumulate() for an array of doubles):
packaged_task pt0{std::accumulate};
packaged_task pt1{std::accumulate};
auto f0 = pt0.get_future(); // get hold of the futures
auto f1 = pt1.get_future();
pt0(&v[0],&v[v.size()/2],0); // start the threads
pt1(&[v.size()/2],&v[size()],0);
return f0.get()+f1.get(); // get the results
}
async()
async()函数是一个简单任务的”启动”(launcher)函数。
下边是一种优于传统的线程+锁的并发编程方法示例(译注:山寨map-reduce哦):
template<class T,class V> struct Accum { // 简单的积函数对象
T* b;
T* e;
V val;
Accum(T* bb, T* ee, const V& v) : b{bb}, e{ee}, val{vv} {}
V operator() ()
{ return std::accumulate(b,e,val); }
};
void comp(vector<double>& v)
// 如果v够大,则产生很多任务 {
if (v.size()<10000)
return std::accumulate(v.begin(),v.end(),0.0);
auto f0 {async(Accum{&v[0],&v[v.size()/4],0.0})};
auto f1 {async(Accum{&v[v.size()/4],&v[v.size()/2],0.0})};
auto f2 {async(Accum{&v[v.size()/2],&v[v.size()*3/4],0.0})};
auto f3 {async(Accum{&v[v.size()*3/4],&v[v.size()],0.0})};
return f0.get()+f1.get()+f2.get()+f3.get();
}
尽管这只是一个简单的并发编程示例(留意其中的”magic number“),不过我们可没有使用线程,锁,缓冲区等概念。f*变量的类型(即async()的返回值)是”std::future”类型。future.get()表示如果有必要的话则等待相应的线程(std::thread)运行结束。async的工作是根据需要来启动新线程,而future的工作则是等待新线程运行结束。”简单性”是async/future设计中最重视的一个方面;future一般也可以和线程一起使用,不过不要使用async()来启动类似I/O操作,操作互斥体(mutex),多任务交互操作等复杂任务。async()背后的理念和range-for statement很类似:简单事儿简单做,把复杂的事情留给一般的通用机制来搞定吧。
async()可以启动一个新线程或者复用一个它认为合适的已有线程(非调用线程即可)(译注:语义上并发即可,不关心具体的调度策略。和go语义中的goroutines有点像)。后者从用户视角看更有效一些(只对简单任务而言)。
线程(thread)
线程(译注:大约是C++11中最激动人心的特性了)是一种对程序中的执行或者计算的表述。跟许多现代计算一样,C++11中的线程之间能够共享地址空间。从这点上来看,它不同于进程:进程一般不会直接跟其它进程共享数据。在过去,C++针对不同的硬件和操作系统有着不同的线程实现版本。如今,C++将线程加入到了标准件库中:一个标准线程ABI。
许多大部头书籍以及成千上万的论文都曾涉及到并发、并行以及线程。在这一条FAQ里几乎不涉及这些内容。事实上,要做到清楚地思考并发非常难。如果你想编写并发程序,请至少看一本书。不要依赖于一本手册、一个标准或者一条FAQ。
在用一个函数或者函数对象(包括lambda)构造std::thread时,一个线程便启动了。
#include <thread>
void f();
struct F {
void operator()();
};
int main()
{
std::thread t1{f}; // f() 在一个单独的线程中执行
std::thread t2{F()}; // F()() 在一个单独的线程中执行
}
然而,无论f()和F()执行任何功能,都不能给出有用的结果。这是因为程序可能会在t1执行f()之前或之后以及t2执行F()之前或之后终结。我们所期望的是能够等到两个任务都完成,这可以通过下述方法来实现:
int main()
{
std::thread t1{f}; // f() 在一个单独的线程中执行
std::thread t2{F()}; // F()()在一个单独的线程中执行
t1.join(); // 等待t1
t2.join(); // 等待t2
}
上面例子中的join()保证了在t1和t2完成后程序才会终结。这里”join”的意思是等待线程返回后再终结。
通常我们需要传递一些参数给要执行的任务。例如:
void f(vector<double>&);
struct F {
vector<double>& v;
F(vector<double>& vv) :v{vv} { }
void operator()();
};
int main(){
// f(some_vec) 在一个单独的线程中执行
std::thread t1{std::bind(f,some_vec)};
// F(some_vec)() 在一个单独的线程中执行
std::thread t2{F(some_vec)};
t1.join();
t2.join();
}
上例中的标准库函数bind会将一个函数对象作为它的参数。
通常我们需要在执行完一个任务后得到返回的结果。对于那些简单的对返回值没有概念的,我建议使用std::future。另一种方法是,我们可以给任务传递一个参数,从而这个任务可以把结果存在这个参数中。例如:
void f(vector<double>&, double* res); // 将结果存在res中
struct F {
vector<double>& v;
double* res;
F(vector<double>& vv, double* p) :v{vv}, res{p} { }
void operator()(); //将结果存在res中
};
int main()
{
double res1;
double res2;
// f(some_vec,&res1) 在一个单独的线程中执行
std::thread t1{std::bind(f,some_vec,&res1)};
// F(some_vec,&res2)() 在一个单独的线程中执行
std::thread t2{F(some_vec,&res2)};
t1.join();
t2.join();
std::cout << res1 << " " << res2 << ‘\n’;
}
但是关于错误呢?如果一个任务抛出了异常应该怎么办?如果一个任务抛出一个异常并且它没有捕获到这个异常,这个任务将会调用std::terminate()。调用这个函数一般意味着程序的结束。我们常常会为避免这个问题做诸多尝试。std::future可以将异常传送给父线程(这正是我喜欢future的原因之一)。否则,返回错误代码。
除非一个线程的任务已经完成了,当一个线程超出所在的域的时候,程序会结束。很明显,我们应该避免这一点。
没有办法来请求(也就是说尽量文雅地请求它尽可能早的退出)一个线程结束或者是强制(也就是说杀死这个线程)它结束。下面是可供我们选择的操作:
- 设计我们自己的协作的中断机制(通过使用共享数据来实现。父线程设置这个数据,子线程检查这个数据(子线程将会在该数据被设置后很快退出))。
- 使用thread::native_handle()来访问线程在操作系统中的符号
- 杀死进程(std::quick_exit())
- 杀死程序(std::terminate())
这些是委员会能够统一的所有的规则。特别地,来自POSIX的代表强烈地反对任何形式的“线程取消”。然而许多C++的资源模型都依赖于析构器。对于每种系统和每种可能的应有并没有完美的解决方案。
线程中的一个基本问题是数据竞争。也就是当在统一地址空间的两个线程独立访问一个对象时将会导致没有定义的结果。如果一个(或者两个)对对象执行写操作,而另一个(或者两个)对该对象执行读操作,两个线程将在谁先完成操作方面进行竞争。这样得到的结果不仅仅是没定义的,而且常常无法预测最后的结果。为解决这个问题,C++0x提供了一些规则和保证从而能够让程序员避免数据竞争。
- C++标准库函数不能直接或间接地访问正在被其它线程访问的对象。一种例外是该函数通过参数(包括this)来直接或间接访问这个对象。
- C++标准库函数不能直接或间接修改正在被其它线程访问的对象。一种例外是该函数通过非const参数(包括this)来直接或间接访问这个对象。
- C++标准函数库的实现需要避免在同时修改统一序列中的不同成员时的数据竞争。
除非已使用别的方式做了声明,多个线程同时访问一个流对象、流缓冲区对象,或者C库中的流可能会导致数据竞争。因此除非你能够控制,绝不要让两个线程来共享一个输出流。
你可以
- 等待一个线程一定的时间
- 通过互斥来控制对数据的访问
- 通过锁来控制对数据的访问
- 使用条件变量来等待另一个线程的行为
- 通过future来从线程中返回值
std::condition_variable 类介绍
std::condition_variable 是条件变量,更多有关条件变量的定义参考维基百科。Linux 下使用 Pthread 库中的 pthread_cond_*() 函数提供了与条件变量相关的功能, Windows 则参考 MSDN。
当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(封装 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。
std::condition_variable 对象通常使用 std::unique_lock<std::mutex> 来等待,如果需要使用另外的 lockable 类型,可以使用 std::condition_variable_any 类,本文后面会讲到 std::condition_variable_any 的用法。
首先我们来看一个简单的例子:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果标志位不为 true, 则等待...
cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
// 线程被唤醒, 继续往下执行打印线程编号id.
std::cout << "thread " << id << '\n';
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 设置全局标志位为 true.
cv.notify_all(); // 唤醒所有线程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
执行结果如下:
concurrency ) ./ConditionVariable-basic1
10 threads ready to race...
thread 1
thread 0
thread 2
thread 3
thread 4
thread 5
thread 6
thread 7
thread 8
thread 9
好了,对条件变量有了一个基本的了解之后,我们来看看 std::condition_variable 的各个成员函数。
std::condition_variable 构造函数
default (1) | condition_variable(); |
---|---|
copy [deleted] (2) | condition_variable (const condition_variable&) = delete; |
std::condition_variable 的拷贝构造函数被禁用,只提供了默认构造函数。
std::condition_variable::wait()介绍
unconditional (1) | void wait (unique_lock<mutex>& lck); |
---|---|
predicate (2) | template <class Predicate>void wait (unique_lock<mutex>& lck, Predicate pred); |
std::condition_variable提供了两种 wait()函数。当前线程调用 wait()后将被阻塞(此时当前线程应g该获得了锁(mutex),不妨设获得锁 lck),直到另外某个线程调用 notify_*唤醒了当前线程。
在线程被阻塞时,该函数会自动调用 lck.unlock()释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_*唤醒了当前线程),wait()函数也是自动调用 lck.lock(),使得 lck的状态和 wait函数被调用时相同。
在第二种情况下(即设置了 Predicate),只有当 pred条件为 false时调用 wait()才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred为 true时才会被解除阻塞。因此第二种情况类似以下代码:
while (!pred()) wait(lck);
请看下面例子:
#include <iostream> // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
int cargo = 0;
bool shipment_available()
{
return cargo != 0;
}
// 消费者线程.
void consume(int n)
{
for (int i = 0; i < n; ++i) {
std::unique_lock <std::mutex> lck(mtx);
cv.wait(lck, shipment_available);
std::cout << cargo << '\n';
cargo = 0;
}
}
int main()
{
std::thread consumer_thread(consume, 10); // 消费者线程.
// 主线程为生产者线程, 生产 10 个物品.
for (int i = 0; i < 10; ++i) {
while (shipment_available())
std::this_thread::yield();
std::unique_lock <std::mutex> lck(mtx);
cargo = i + 1;
cv.notify_one();
}
consumer_thread.join();
return 0;
}
程序执行结果如下:
concurrency ) ./ConditionVariable-wait
1
2
3
4
5
6
7
8
9
10
std::condition_variable::wait_for()介绍
unconditional (1) | template <class Rep, class Period>cv_status wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep,Period>& rel_time); |
---|---|
predicate (2) | template <class Rep, class Period, class Predicate>bool wait_for (unique_lock<mutex>& lck, const chrono::duration<Rep, Period>& rel_time, Predicate pred); |
与 std::condition_variable::wait()类似,不过 wait_for可以指定一个时间段,在当前线程收到通知或者指定的时间 rel_time超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_for返回,剩下的处理步骤和 wait()类似。
另外,wait_for的重载版本(predicte(2))的最后一个参数 pred 表示 wait_for的预测条件,只有当 pred条件为 false时调用 wait()才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred
为 true时才会被解除阻塞,因此相当于如下代码:
return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));
请看下面的例子,下面的例子中,主线程等待 th线程输入一个值,然后将 th线程从终端接收的值打印出来,在 th线程接受到值之前,主线程一直等待,每个一秒超时一次,并打印一个 ".":
#include <iostream> // std::cout
#include <thread> // std::thread
#include <chrono> // std::chrono::seconds
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status
std::condition_variable cv;
int value;
void do_read_value()
{
std::cin >> value;
cv.notify_one();
}
int main ()
{
std::cout << "Please, enter an integer (I'll be printing dots): \n";
std::thread th(do_read_value);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
while (cv.wait_for(lck,std::chrono::seconds(1)) == std::cv_status::timeout) {
std::cout << '.';
std::cout.flush();
}
std::cout << "You entered: " << value << '\n';
th.join();
return 0;
}
std::condition_variable::wait_until 介绍
unconditional (1) | template <class Clock, class Duration>cv_status wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time); |
---|---|
predicate (2) | template <class Clock, class Duration, class Predicate>bool wait_until (unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_time,Predicate pred); |
与 std::condition_variable::wait_for 类似,但是 wait_until 可以指定一个时间点,在当前线程收到通知或者指定的时间点 abs_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_until 返回,剩下的处理步骤和 wait_until() 类似。
另外,wait_until 的重载版本(predicte(2))的最后一个参数 pred 表示 wait_until 的预测条件,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞,因此相当于如下代码:
while (!pred())
if ( wait_until(lck,abs_time) == cv_status::timeout)
return pred();
return true;
std::condition_variable::notify_one() 介绍
唤醒某个等待(wait)线程。如果当前没有等待线程,则该函数什么也不做,如果同时存在多个等待线程,则唤醒某个线程是不确定的(unspecified)。
请看下例:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx;
std::condition_variable cv;
int cargo = 0; // shared value by producers and consumers
void consumer()
{
std::unique_lock < std::mutex > lck(mtx);
while (cargo == 0)
cv.wait(lck);
std::cout << cargo << '\n';
cargo = 0;
}
void producer(int id)
{
std::unique_lock < std::mutex > lck(mtx);
cargo = id;
cv.notify_one();
}
int main()
{
std::thread consumers[10], producers[10];
// spawn 10 consumers and 10 producers:
for (int i = 0; i < 10; ++i) {
consumers[i] = std::thread(consumer);
producers[i] = std::thread(producer, i + 1);
}
// join them back:
for (int i = 0; i < 10; ++i) {
producers[i].join();
consumers[i].join();
}
return 0;
}
std::condition_variable::notify_all() 介绍
唤醒所有的等待(wait)线程。如果当前没有等待线程,则该函数什么也不做。请看下面的例子:
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.
void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx);
while (!ready) // 如果标志位不为 true, 则等待...
cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
// 线程被唤醒, 继续往下执行打印线程编号id.
std::cout << "thread " << id << '\n';
}
void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 设置全局标志位为 true.
cv.notify_all(); // 唤醒所有线程.
}
int main()
{
std::thread threads[10];
// spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!
for (auto & th:threads)
th.join();
return 0;
}
std::condition_variable_any介绍
与 std::condition_variable类似,只不过 std::condition_variable_any的 wait函数可以接受任何 lockable参数,而 std::condition_variable只能接受 std::unique_lock<std::mutex>类型的参数,除此以外,和 std::condition_variable几乎完全一样。
std::cv_status枚举类型介绍
cv_status::no_timeout | wait_for 或者 wait_until 没有超时,即在规定的时间段内线程收到了通知。 |
---|---|
cv_status::timeout | wait_for 或者 wait_until 超时。 |
参考资料
【c++11FAQ】互斥
【c++11FAQ】std::future和std::promise
【c++11FAQ】async()
【c++11FAQ】线程(thread)
C++11 多线程——KingsLanding
第五章 条件变量与线程同步
5.2 条件变量详解
4.3 锁类型详解