主要参考:Advanced Operating Systems-Multi-threading in C++ from Giuseppe Massari and Federico Terraneo
介绍
多任务处理器允许我们同时运行多个任务。操作系统会为不同的进程分配独立的地址空间。
多线程允许一个进程在共享的地址空间里执行多个任务。
线程
一个线程是一个轻量的任务。
每个线程拥有独立的栈和context。
取决于具体的实现,线程至核心的安排由OS或者language runtime来负责。
C++对线程的支持
新建线程
void myThread() {
for (;;) {
std::cout << "world" << std::endl;
}
}
int main() {
std::thread t(myThread);
for(;;) {
std::cout << "hello " << std::endl;
}
}
std::thread
的构造函数可以以一个可调用对象和一系列参数为参数来启动一个线程执行这个可调用对象。
除了上面例子里的函数(myThread
)外,仿函数(functor)也是线程常用的可调用对象。
仿函数是一个定义和实现了operator()
成员函数的类。与普通的函数相比,可以赋予其一些类的性质,如继承、多态等。
std::thread::join()
等待线程结束,调用后thread变为unjoinable。
std::thread::detach()
将线程与thread对象脱离,调用后thread变为unjoinalbe。
bool std::thread::joinable()
返回线程是否可加入。
同步
static int sharedVariable = 0;
void myThread() {
for (int i=0; i<1000000; i++) sharedVariable++;
}
int main() {
std::thread t(myThread);
for (int i=0; i<1000000; i++) sharedVariable--;
t.join();
std::cout<<"sharedVariable="<<sharedVariable<<std::endl;
}
上面的程序会遇到数据竞争的问题,因为++
和--
都不是元操作(atomic operation),实际上我们需要拿到数据、递增/递减、放回数据三步,而两个线程可能会在对方没有完成三步的时候就插入,导致结果不可预测。
为了避免竞争,我们需要在线程进入关键段(critical section)的时候阻止并行。为此,我们引入互斥锁。
互斥锁
在我们进入一个关键段的时候,线程检查互斥锁是否是锁住的:
- 如果锁住,线程阻塞
- 如果没有,则进入关键段
std::mutex
有两个成员函数lock
和unlock
。
然而,对互斥锁使用不当可能导致死锁(deadlock):
- 原因1:忘记
unlock
一个mutex
解决方案:使用scoped locklocak_guard<mutex>
,会在析构的时候自动释放互斥锁。std::mutex myMutex; void muFunctions(int value) { { std::lock_guard<std::mutex> lck(myMutex); //... } }
- 原因2:同一个互斥锁被嵌套的函数使用
解决方案:使用recursive_mutex
,允许同一个线程多次使用同一个互斥锁。std::recursive_mutex myMutex; void func2() { std::lock_guard<recursive_mutex> lck(myMutex); //do some thing } void func1() { std::lock_guard<recursive_mutex> lck(myMutex); //do some thing func2(); }
- 原因3:多个线程用不同的顺序调用互斥锁
解决方案:使用lock(..)
函数取代mutex::lock()
成员函数,该函数会自动判断上锁的顺序。mutex myMutex1, myMutex2; void func2() { lock(myMutex1, myMutex2); //do something myMutex1.unlock(); myMutex2.unlock(); } void func1() { lock(myMutex2, myMutex1); //do something myMutex1.unlock(); myMutex2.unlock(); }
条件变量
有的时候,线程之间有依赖关系,这种时候需要一些线程等待其他线程完成特定的操作。
std::condition_variable
条件变量,有三个成员函数:
-
wait(unique_lock<mutex> &)
:阻塞当前线程,直到另一个线程将其唤醒。在wait(...)
的过程中,互斥锁是解锁的状态。 -
notify_one()
:唤醒一个等待线程。 -
notify_all()
:唤醒所有等待线程。
using namespace std;
string shared;
mutex myMutex;
condition_variable myCv;
void myThread() {
unique_lock<mutex> lck(myMutex);
while (shared.empty()) myCv.wait(lck);
cout << shared << endl;
}
int main() {
thread t(myThread);
string s;
cin >> s;
{
unique_lock<mutex> lck(myMutex);
shared = s;
myCv.notify_one();
}
t.join();
}
另外有一个比较小的点:为什么wait()
通常放在循环中调用,是为了保证condition_variable
被唤醒的时候条件仍然会被判断一次。
设计模式
Producer/Consumer
一个消费者线程需要生产者线程提供数据。
为了让两个线程的操作解耦,我们设计一个队列用来缓存数据。
#include <list>
#include <mutex>
#include <condition_variable>
template<typename T>
class SynchronizedQueue {
public:
SynchronizedQueue();
void put(const T&);
T get();
private:
SynchronizedQueue(const SynchronizedQueue&);
SynchronizedQueue &operator=(const SynchronizedQueue&);
std::list<T> queue;
std::mutex myMutex;
std::condition_variable myCv;
};
template<typename T>
void SynchronizedQueue<T>::put (const T& data) {
std::unique_lock<std::mutex> lck(myMutex);
queue.push_backdata();
myCv.notify_one();
}
template<typename T>
T SynchronizedQueue<T>::get() {
std::unique_lock<std::mutex> lck(myMutex);
while(queue.empty())
myCv.wait(lck);
T result = queue.front();
queue.pop_front();
return result;
}
Active Object
目标是实例化一个任务对象。
通常来说,其他线程无法通过显式的方法与一个线程函数通信,数据常常是通过全局变量在线程之间交流。
这种设计模式让我们能够在一个对象里封装一个线程,从而获得一个拥有可调用方法的线程。
设计一个类,拥有一个thread
成员变量和一个run()
成员函数。
//active_object.hpp
#include <atomic>
#include <thread>
class ActiveObject {
public:
ActiveObject();
~ActiveObject();
private:
virtual void run();
ActiveObject(const ActiveObject&);
ActiveObject& operator=(const ActiveObject&);
protected:
std::thread t;
std::atomic<bool> quit;
};
//active_object.cpp
#include "active_object.hpp"
#include <functional>
ActiveObject::ActiveObject() :
t(std::bind(&ActiveObject::run, this)), quit(false) {}
void ActiveObject::run() {
while(!quit.load()) {
// do something
}
}
ActiveObject::~ActiveObject() {
if(quit.load()) return;
quit.store(true);
t.join();
}
其中std::bind可以用于基于函数和部分/全部参数构建一个新的可调用对象。
Reactor
Reactor的目标在于让任务的产生和执行解耦。会有一个任务队列,同时有一个执行线程负责一次执行队列里的任务(FIFO,当然也可以设计其他的执行顺序)。Reactor本身可以继承自Active object,同时维护一个Synchronized Queue作为成员变量。
这样我们拥有了一个线程,它能够在执行的过程中不断地接受新的任务,同时避免了线程频繁的构建和析构所浪费的资源。
ThreadPool
Reactor的局限在于任务是顺序完成的,而线程池Thread Pool则允许我们让多个线程监听同一个任务队列。
一个比较不错的实现可以参考这里:https://blog.csdn.net/MOU_IT/article/details/88712090
通常来说,一个线程池需要有以下几个元素:
- 管理器(创建线程、启动/停止/添加任务)
- 任务队列
- 任务接口(任务抽象)
- 工作线程
其他概念
还有一些其他的与多线程息息相关的概念:
atomic原子类型
常见的比如用std::atomic<bool>或者std::atomic_bool取代bool类型变量。
原子类型主要涉及以下几个问题(参考):
tearing: a read or write involves multiple bus cycles, and a thread switch occurs in the middle of the operation; this can produce incorrect values.
cache coherence: a write from one thread updates its processor's cache, but does not update global memory; a read from a different thread reads global memory, and doesn't see the updated value in the other processor's cache.
compiler optimization: the compiler shuffles the order of reads and writes under the assumption that the values are not accessed from another thread, resulting in chaos.
Using std::atomic<bool> ensures that all three of these issues are managed correctly. Not using std::atomic<bool> leaves you guessing, with, at best, non-portable code.
future和promise
在线程池里常常会用到异步读取线程运行的结果。