线程的同步化与并发问题
解决并发访问的三个主要问题:
- 未同步化的数据访问;
- 写至半途的数据;
- 重新安排的语句
首先要建立以下概念:
1.不可切割性(原子性):读写一个变量的时候无法打断,另一个线程不可能读到一个写了一半的数据
2.次序:需要一些方法来保证指定语句的次序
C++库用了多种办法来处理这些概念(这份清单从高级到低级)
1.使用future和promse,他们保证成功之后,才设定"共享状态"
2.可以使用metux和lock来保护临界区
3.可以使用条件变量,可以使一些线程等待另一个线程控制的判断式成为true
4.使用atomic模板类型来包装基础类型等类型,确保每次对变量的访问动作都是不可切割的
5.可以使用atomic的底层接口,来使:专家的放宽语句次序,或对内存访问使用manual barrier
注意:c++中的volatile不提供原子访问的属性,所以加这个关键字没用
mutex和lock
使用RAII管理锁资源
int val;
std::mutex valMutex;
//一个线程中这样
if (val >= 0) {
f(val);
}
else {
f(-val);
}
valMutex.unlock();
//另一个线程中这样
valMutex.lock();
++val;
valMutex.unlock();
注意:如果在两个锁之间抛出了异常,会导致锁被永远的锁住
正确做法:
//只要对象销毁,析构函数调用,就会自动释放锁
#include <future>
#include <mutex>
#include <iostream>
#include <string>
std::mutex printMutex; // enable synchronized output with print()
void print (const std::string& s)
{
std::lock_guard<std::mutex> l(printMutex);
for (char c : s) {
std::cout.put(c);
}
std::cout << std::endl;
}
int main()
{
auto f1 = std::async (std::launch::async,
print, "Hello from a first thread");
auto f2 = std::async (std::launch::async,
print, "Hello from a second thread");
print("Hello from the main thread");
}
递归的lock
使用Recursive Locks就可以允许递归锁
class DatabaseAccess
{
private:
std::recursive_mutex dbMutex; ... // state of database access
public:
void insertData (...) {
std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
}
void insertData (...) {
std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
}
void createTableAndinsertData (...) {
std::lock_guard<std::recursive_mutex> lg(dbMutex);
...
createTable(...); // OK: no deadlock }
...
};
}
尝试性地锁和带时间性的锁
- try_lock()
尝试性地锁,成功返回true,失败返回false
为了仍旧能够使用class std::lock_guard,需要额外加一个参数std::adopt_lock
std::mutex m;
while (m.try_lock() == false) {
doSomeOtherStuff();
}
std::lock_guard<std::mutex> lg(m,std::adopt_lock);
- 等待特定长度时间的lock
有两个支持等待时间的锁:
classes std::timed_mutex 时间段
std::recursive_timed_mutex 时间点
实例:
std::timed_mutex m;
if (m.try_lock_for(std::chrono::seconds(1))) {
std::lock_guard<std::timed_mutex> lg(m,std::adopt_lock);
...
}
else {
couldNotGetTheLock();
}
注意:如果改变系统时间,时间段和时间点的行为有所不同
多个锁顺序,可能导致的死锁
- 实例——银行转账
template<typename T> class Lock
{
public:
Lock(T&mutex):m_mutex(mutex){m_mutex.lock();}
~Lock(){m_mutex.unlock();}
private:
T& mutex;
};
struct BankAccount
{
BankAccount(int b):Balance(b){}
int Balance;
mutex Mutex;
};
void transferMoney(BankAccount &a,BankAccount &b,int money)
{
if(&a==&b) return ;//单线程安全了
if(&(a.Mutex)<&(b.mutex))
{
//保证每次锁定地址小的。那就可以了。多线程安全版本。但是自己写不太现实吧。要是BankAccount多的话,咋整呢?使用标准库吧。
Lock<mutex> lockA(a.Mutex);
Lock<mutex> lockB(b.Mutex);
a.Balance-=money;
b.Balance+=money;
}
else
{
//.....
}
}
- 分析
单线程:
transferMoney(a,a,int money) 完蛋了,死锁了,可以if进行判断,相同用户返回。但是多线程还是可能死锁的.
多线程:
thread1:
transferMoney(a,b,int money)
thread2:
transferMoney(b,a,int money)
之间时间片很短。怎么办?死锁。
此时如实例代码所给,我们使用标准库来解决
第一种:luck_guard加adopt参数(提供死锁检查)
std::mutex m1; std::mutex m2; ...
{
std::lock (m1, m2);
std::lock_guard<std::mutex> lockM1(m1,std::adopt_lock);
std::lock_guard<std::mutex> lockM2(m2,std::adopt_lock);
...
}
- 分析
1.全局函数lock(),会锁住它收到的所有锁(直到全部锁住,或者抛出异常),如果抛出异常,已经上锁的锁,也会全部解锁</br>
2.锁上后,应该使用class std::lock_guard,并带上一个额外的参数std::adopt_lock,来使析构函数可以自动解锁
注意:lock()提供了死锁的回避机制,也就代表,锁住的顺序不明确
第二种:try_lock(不提供死锁检查)
std::mutex m1;
std::mutex m2;
int idx = std::try_lock (m1, m2);
if (idx < 0) {
std::lock_guard<std::mutex> lockM1(m1,std::adopt_lock);
std::lock_guard<std::mutex> lockM2(m2,std::adopt_lock); ...
}
else {
std::cerr << "could not lock mutex m" << idx+1 << std::endl;
}
- 分析
<html>
1.全局函数try_lock会依次锁定所有的锁(不提供死锁回避机制,保证以实参的次序依次上锁),若成功(全部的锁都锁上了),返回-1,否则,返回第一个失败的锁的索引(所有成功的锁,会被释放)</br>
2.通常不会因为使用了try_lock或者lock就不过继给lock_guard(要保证离开作用域的时候自动解锁)
</html>
unique_lock
简介
除了lock_guard,c++标准库还提供了unique_lock,unique_lock更有弹性,更灵活,unique_lock的接口和lock_guard类似,但可以明确指出,"何时"以及"如何"锁住或解锁它所拥有的mutex(lock_guard的生命中,总是锁住mutex)。
调用成员函数,bool或者owns_lock(),可以检查当前unique_lock东西内的锁,是否锁住(同样,如果锁住unique_lock的析构函数会自动释放锁,否则析构函数什么也不做)
构造函数
1.构造函数中传递try_to_lock,表示企图锁定,但不希望阻塞
std::unique_lock<std::mutex> lock(mutex,std::try_to_lock); ...
if (lock) {
...
}
2.可以传递一个时间段或时间点给构造函数(尝试在一个时间周期内锁定)
std::unique_lock<std::timed_mutex> lock(mutex,
std::chrono::seconds(1));
...
3.可以传递一个defer_lock,表示初始化这一个锁,但是并未打算锁住
std::unique_lock<std::mutex> lock(mutex,std::defer_lock);
...
lock.lock(); // or (timed) try_lock()
(可以用来建立一个或多个锁,并且在稍后再锁住他们)
(提供release成员函数来释放锁)
与lock_guard区别
1.lock_guard保证锁在离开作用域的时候,会自动释放
mutex可以由lock_guard的构造函数申请,也可以绑定已经锁上的mutex
(lock_guard生命周期内,锁总是保持锁定状态)
lock_guard的操作函数见:表18.9 class lock_guard的操作函数 P1000
2.unique_lock和lock_guard类似,区别是unique_lock的生命周期内,并不是始终保持锁定状态
(你可以明确的控制,unique_lock所控制的锁,是占有锁定状态,还是解锁状态)
如果在调用析构函数的时候,是锁定状态,那析构函数会自动解锁,否则,析构函数什么也不干
lock()可能抛出system_error异常,夹带的差错码和mutex相同
unlock()可能抛出system_error异常,并夹带差错码operation_not_permitted(这个锁并未被锁定)
只调用一次call_once
1.缓式初始化问题:某些机能初次被某个线程使用过之后,其他线程再也不需要它.可以使用mutex来实现,但c++提供了一个特殊的解法:
使用一个std::once_flag,并且调用std::call_once
std::once_flag oc; // global flag
...
std::call_once(oc,initialize); // initialize if not initialized yet
2.单例
- 分析
1.第一实参是once_flag,第二实参是可调用对象(保证同一个once_flag下的可调用对象,只会调用一次)
2.原则上可以使用同一个once_flag调用不同的调用对象,但只要一个调用成功了,剩下的就不会再调用,哪怕调用对象不同
3.如果在调用对象内部抛出异常,这个异常会传递给call_once,
此时这次调用算是不成功,第二此调用会继续调用
条件变量
对不同线程的任务执行,有时候必须互相等待
条件变量就是用来对付这种情况:同步化线程之间的数据流依赖关系
条件变量的意图
等待flag改变后,进行一系列操作(需要轮询来检测flag的状态)
条件变量可以休眠等待标识改变,改变标志的线程会向等待的线程发送信号(这样可以避免轮询)
C++标准库在<condition_variable>里面提供了条件变量
简介
1.包含<mutex>和<condition_variable>,声明一个条件变量和一个互斥量
#include <mutex>
#include <condition_variable>
std::mutex readyMutex;
std::condition_variable readyCondVar;
2.激发条件满足的线程,在改变标志之后,要向等待的线程发送信号
readyCondVar.notify_one(); //通知一个等待线程
或
readyCondVar.notify_all(); //通知所有等待线程
3.等待条件满足的线程,必须调用:
std::unique_lock<std::mutex> l(readyMutex);
readyCondVar.wait(l);
注意:1.必须使用unique_lock,原因是,在等待条件的时候,需要解锁
2.条件变量有可能假醒,所以醒来的条件变量一定要再次检查条件(检查标志)
实例
#include <condition_variable>
#include <mutex>
#include <future>
#include <iostream>
bool readyFlag;
std::mutex readyMutex;
std::condition_variable readyCondVar;
void thread1()
{
// do something thread2 needs as preparation
std::cout << "<return>" << std::endl;
std::cin.get();
// signal that thread1 has prepared a condition
{
std::lock_guard<std::mutex> lg(readyMutex);
readyFlag = true;
} // release lock
readyCondVar.notify_one();
}
void thread2()
{
// wait until thread1 is ready (readyFlag is true)
{
std::unique_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, []{ return readyFlag; });
} // release lock
// do whatever shall happen after thread1 has prepared things
std::cout << "done" << std::endl;
}
int main()
{
auto f1 = std::async(std::launch::async,thread1);
auto f2 = std::async(std::launch::async,thread2);
}
-
分析:运用条件变量在包含了必要的头文件之后,要准备三样东西:
- 一个标志量(用来表现条件是否真的满足了)
- 一个mutex
- 一个条件变量
条件变量还有一个可有可无的第二实参,是一个判断式,用来在收到消息的时候,再次测试条件变量(可用于防止惊群).
流程就如实例中所示,记得一定要使用unique_lock(wait会在内部明确的对unique_lock解锁或者加锁)
#include <condition_variable>
#include <mutex>
#include <future>
#include <thread>
#include <iostream>
#include <queue>
std::queue<int> queue;
std::mutex queueMutex;
std::condition_variable queueCondVar;
void provider (int val)
{
// push different values (val til val+5 with timeouts of val milliseconds into the queue
for (int i=0; i<6; ++i) {
{
std::lock_guard<std::mutex> lg(queueMutex);
queue.push(val+i);
} // release lock
queueCondVar.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(val));
}
}
void consumer (int num)
{
// pop values if available (num identifies the consumer)
while (true) {
int val;
{
std::unique_lock<std::mutex> ul(queueMutex);
queueCondVar.wait(ul,[]{ return !queue.empty(); });
val = queue.front();
queue.pop();
} // release lock
std::cout << "consumer " << num << ": " << val << std::endl;
}
}
int main()
{
// start three providers for values 100+, 300+, and 500+
auto p1 = std::async(std::launch::async,provider,100);
auto p2 = std::async(std::launch::async,provider,300);
auto p3 = std::async(std::launch::async,provider,500);
// start two consumers printing the values
auto c1 = std::async(std::launch::async,consumer,1);
auto c2 = std::async(std::launch::async,consumer,2);
}
- 分析:
上一个例子的future可能会造成阻塞,直到某些数据到达(本例中没有这层顾虑)三个线程把数值推入队列,两个线程从中读取数据
总结条件变量
头文件<condition_variable>,包含了两个条件变量的类:
condition_variable注意点
1.因为有假醒的可能,所以,每当条件变量被唤醒,都需要重新检查条件
2.condition_variable的构造函数失败,会抛出system_error异常,并夹带错误码resource_unavailable_try_again
3.condition_variable的复制和赋值都是不允许的
4.通知都会同步化,所以并发调用notify_one()和notify_all(),没有任何问题
5.所有等待condition_variable的线程必须使用相同的mutex
6.wait()成员函数调用之前,必须用unique_lock锁住互斥量,否则结果是未定义的
7.wait_for()和wait_until()有一个不接受判断式的版本,他们的返回值是以下枚举:
std::cv_status::timeout 如果发生超时
std::cv_status::no_timeout 如果发生通知
wait_for()和wait_until()有一个接受判断式的版本(判断式作为第三实参),他们返回判断式的执行结果
8.notify_all_at_thread_exit(cv,l)在调用线程退场的时候,调用notify_all(),为了在通知"等待线程"之前完成清理工作,这个清理工作绝不该造成阻塞
class condition_variable_any
1.和condition_variable类似,就是缺少native_handle()和notify_all_at_thread_exit()成员函数
2.不一定要求非要使用unique_lock(如果不是unique_lock加锁,则所有的同步化操作必须自己来实现)