C++11 最重要的新特性: 多线程
2 个好处
[1] 可写 `平台无关的 多线程程序, 移植性 提高`
[2] `并发` 提高性能
1 概述
1 并发
(1) 含义
`任务切换`
(2) 2 种方式
[1] `多进程`
`进程间通信: 信号 / 套接字 / 文件 / 管道`
[2] `多线程`
2 并发 优势
(1) 关注点分离 ( SOC )`
线程间 交互
(2) ·算法/数据 并行 -> 提高性能`
[1] 数据相同 / 算法拆分
[2] 计算相同 / 数据拆分
3 C++11 多线程 特点
(1) `线程感知` 的 `内存模型`
(2) 直接支持 `原子操作`
(3) `C++ 线程库 性能` `接近 直接使用底层 API 时`
4 多线程入门
(2) 多线程
#include <iostream>
#include <thread> //(1)
void hello() //(2)
{
std::cout<<"Hello Concurrent World\n";
}
int main()
{
std::thread t(hello); //(3)
t.join(); //(4)
}
1) 头文件 <thread>
2) `线程 初始函数`: 新线程执行入口
[1] `主 线程: main()`
[2] `其他 线程: std::thread 对象 ctor 中 指定`
3) std:thread 对象创建 完成 -> 开始执行新线程
4) join(): 让 caller 等待 新线程 完成
2 管理线程: thread/join/detach/RAAI/std::ref/std::bind/move ownership
线索
1 启动线程, 等待它结束 or 放后台运行
2 给 线程函数 传参
3 transfer ownership of thread
from current associated std::thread object to another
4 确定线程数, 识别特殊线程
#1 基本 线程管理
1.1 启动线程
`线程 在 std::thread 对象创建 时 启动`
1 thread 对象
(1) 一旦构造, 立即开始执行 `线程函数`
(2) 不可 copy, 可 move
copy/赋值 deleted
原因
线程对象 `copy/赋值` 可能 `弄丢已 join 的线程`
(3) 可能 `不 表示/关联 任何线程` => safe to destroy & not joinable
after 4 场景之一
————————————————————————————————————————————————
[1] 默认构造
=> 无 线程 ( 函数 ) 执行
————————————————————————————————————————————————
[2] move from
=> 线程 move 给 other thread 对象去管理
————————————————————————————————————————————————
[3] detach
=> 线程 可能仍在执行
————————————————————————————————————————————————
[4] join
=> 线程 执行完成
————————————————————————————————————————————————
2 线程函数
[1] 不与 caller 通信时, 若 抛出异常
std::terminate 被调用
以终止线程函数
[2] return value 或 异常 可 传给 caller
2种方式
——————————————————————————
1] std::promise
2] 共享变量 (可能需 同步)
——————————————————————————
3 std::thread ctor 的 args: 可调用对象
(1) 入口点为 function
void f();
std::thread my_thread(f)
(2) 入口点为 `可调用对象`
可调用对象 copied into thread's storage
并 `invoked` from there
原始可调用对象 可被立即销毁
问题: `函数对象 临时无名对象 作 std::thread ctor's arg`
会被 C++ 编译器 解释为 `函数声明`
std::thread my_thread( F() );
3 种 解决
————————————————————————————————————————————————————
[1] 函数对象对象 外再加一层小括号
std::thread my_thread( (F() ) );
————————————————————————————————————————————————————
[2] 花括号
std::thread my_thread{ F() };
————————————————————————————————————————————————————
[3] lambda 表达式 启动线程
std::thread my_thread(
[]{ do_something(); }
);
————————————————————————————————————————————————————
4 join 还是 detach ?
(1) `必须用 + 何时用`
原因
`std::thread 对象 销毁` 前线程 `没被 joined 或 detached`
线程对象
——————————————————————————————————————
[1] joinable
——————————————————————————————————————
[2] dtor 调 std::terminate() 结束程序
——————————————————————————————————————
(2) `detached 线程` 要在 `程序结束前 结束`
原因
main return 时, 正在执行的 detached 线程
——————————————————————————————————————
[1] 被暂停
——————————————————————————————————————
[2] 其 thread-local objects 销毁
——————————————————————————————————————
(3) 用 join 还是 detach ?
————————————————————————————————————————————
[1] join
————————————————————————————————————————————
[2] 除非 你想 `更灵活`
并用 `同步` 机制 去 `等待` 线程完成
此时 `detach`
————————————————————————————————————————————
5 std::terminate() in <exception>
被 C++ runtime 调用
——————————————————————————————————————————————
3 种 case
——————————————————————————————————————————
[1] std::thread 初始函数 抛出异常
——————————————————————————————————————————
[2] joinable 线程对象 `被销毁` 或 `被赋值`
——————————————————————————————————————————
[3] `异常` 被抛出却未被捕获
——————————————————————————————————————————————
6 lifetime 问题: 多(比单)线程 更易发生
`正在运行的 detached 子线程`
access `已被 destroyed 的 object`
=> undefined behavior
|
| 如
|/
caller
——————————————————————————————————————————————————
[1] local 变量 ptr/ref -> 传给 callee 线程 以 访问
——————————————————————————————————————————————————
[2] 已 return
——————————————————————————————————————————————————
[3] 相比 caller 线程, `callee 线程 lifetime 更长`
=> `ptr/ref 悬挂` ( dangling )
|
|/
潜在的 访问 隐患
——————————————————————————————————————————————————
|
| 解决
|/
[1] 线程函数 `自包含`
+
[2] `copy data into thread`
1.2 `等待 线程完成`
1 怎样 `等待 线程完成`
关联的 std::thread object 上调 join()
2 join() 做了啥
清理 所关联线程的 内存
调 1 次 join 后
线程对象
[1] 不再关联 (已完成的)线程
[2] not joinable <=> joinable() == false
3 等待过程中 控制线程
——————————————————————
[1] 检查线程是否完成
1] cv
2] futures 机制
——————————————————————
[2] 等待特定时间
——————————————————————
1.3 异常下的 等待
1 `call join() 的位置 要精挑细选`
(0) 问题
call join() 前 抛出异常
join() 调用 被跳过
`lifetime problem`
(1) 解决 1
让 join() 不被跳过
use try/catch, `catch 中 也调 join()`
缺点
[1] try/catch 冗长
[2] scope 易出错
(2) 解决 2
RAII
本质含义
`让 资源管理对象 own 资源, 在 析构时 释放资源
=> `资源的 lifetime 管理` 就和 `资源管理对象 lifetime 管理` 统一起来
=> 只要 `不泄漏 资源管理对象, 就能保证 不泄漏资源`
至于是 `让 ctor 自己 创建资源`,
`还是 把资源创建好 再交给 ctor 保管`,
没有优劣之分
可用 `static Factory Method 创建资源对象`
`client 就不用管 里边怎么实现了`
实现
线程对象
|
| 设
|/
保护类 thread_guard
[1] explicit ctor: 左值引用 para = std::thread& t_
|
| `线程对象` 作 arg 引用传递 构造 `线程保护对象`
|/
[2] init. list 初始化 / 绑定
|
|/
[3] 左值引用 成员 = std::thread& t
=> `线程 的 ownership 没有转移`
=> `caller 可能 已对线程 调 join()`
=> [4] thread_guard `dtor 中 调 join() 前 必须 先判线程是否可 joinable`
=> 线程 `caller 函数 执行 }`
即 ret 指令 `弹栈 逆序销毁` local 对象 时
先 `销毁 保护对象`
`调 保护对象 dtor 中 可能 调 join()`
[5] copy 和 赋值 delete
因 `资源管理对象` 可能 `outlive 其管理的 `线程对象` 的 `scope`
//`RAII 等待线程完成
class thread_guard
{
private:
std::thread& t; // [3]
public:
explicit thread_guard(std::thread& t_) // [1]
:t(t_){} // [2]
~thread_guard()
{
// [4] 先 test joinable(), 再 join()
if( t.joinable() )
{
t.join();
}
}
// [5] copy 和 赋值 delete
thread_guard(thread_guard const&)=delete;
thread_guard& operator=(thread_guard const&)=delete;
};
struct func; // defined in list2.1
void f()
{
int state = 0;
func f(state);
std::thread t(f);
// `新线程对象` 作 arg 引用传递 构造 `线程保护对象`
thread_guard g(t);
do_something_in_current_thread();
}
(3) 解决 3
若 不必 等待线程结束
`detach & copy data into thread`
可避免 `异常安全` 问题
`detach` 会 `打破 线程 与 std::thread 对象` 间 `关联`
只要 detach 线程 先于 main 函数 退出,
1.4 后台 / background 运行线程
1 detach 线程: std::thread instance上调 detach()`
`detached 线程` 特点
[1] 在后台(typically 长时间)运行
称 `守护线程`
应用
1> 监控文件系统
2> 实现发后即忘/fire and forget task:
只管消息发送, 不管消息接收
[2] 无法 获得 关联它的 std::thread object
=> 无法直接 communicate with it
it can `no longer be joined
=> 无法等待它完成`
[3] `ownership 和 control 被传给 C++ runtime 库`
保证了 线程关联资源
在线程结束时 被正确回收
#2 传参 给 线程函数
2.1 thread ctor 接口
——————————————————————————————————————————————————————————————————————————————————————————
线程函数 | thread ctor 的 paraList
——————————————————————————————————————————————————————————————————————————————————————————
非成员函数 | [1] callable obj [2] 相应 `函数调用运算符` 的 para...
——————————————————————————————————————————————————————————————————————————————————————————
成员函数 | [1] `成员函数指针` [2] 相应 对象的指针 [3] 相应 `成员函数` 的 para...
——————————————————————————————————————————————————————————————————————————————————————————
2.2 内部机制
1 `默认下`
std::thread Ctor 实参
[1] copied
副本 在新线程 context 中 `隐式转换 为 相应参数类型`
[2] 除非 用 `std::ref 等`
void f(std::string const& s);
std::thread t(f, "hello");
如
实参 `字符串 字面量`
[0] 是 ptr: char const*
[1] copied
[2] 副本 converted 为 std::string
[3] 表面上 引用传递
实际是 ptr copy 作 arg 调 string Ctor
问题
caller 函数
在 thread ctor 的 `ptr/ref 型 arg` 的
`副本 转换为 相应参数前 退出`
`所指对象销毁`
`副本指针 悬挂`
=> undefined behavior
解决
[1] 先 强转 arg, 再 传给 thread ctor`
[2] ptr 用 具有 `移动语义` 的 对象 管理
std::thread t(f, std::string(buffer) );
2 std::ref() / std::bind wrap 想 `引用传递` 的 `arg`
|
| 想
|/
更新数据
[1] std::ref()
void f(Data& data);
Data data;
std::thread t(f, std::ref(data) );
[2] std::bind <functional>
std::thread ctor 与 std::bind 机制相同
X x;
std::thread t(&X::f, &x);
#3 转移 线程 ownership
1 为什么要引出 move of std::thread
应对 2 种场景
线程 ownership 转交给
——————————————
[1] caller
[2] 其他 函数
——————————————
(1) `ownership moved out of a function`
`形似 pass by value, 实际却是 move 语义`
std::thread caller()
{
void callee();
return std::thread(callee);
}
(2) `ownership moved into a function`
void g(std::thread t);
void f()
{
void thread_func();
g( std::thread(thread_func) );
}
2 move 机制
—————————————————————————————————————————
[1] 隐含自动 move
左 或 右运算对象 至少一侧为 右值
—————————————————————————————————————————
[2] 显式 调 std::move()
—————————————————————————————————————————
3 thread_guard 与 scoped_thread
————————————————————————————————————————————————————————————————————————————
是否转移线程 ownship | 否 | 是
————————————————————————————————————————————————————————————————————————————
成员 | 左值引用 std::thread& t | 值 std::thread t
————————————————————————————————————————————————————————————————————————————
args 传递方式 | 左值引用 传递 | 值传递 + 移动语义
————————————————————————————————————————————————————————————————————————————
dtor 中 call join 前 | |
是否 check joinable | 是 | 否
————————————————————————————————————————————————————————————————————————————
ctor 中 | |
是否 check joinable | 否: 啥也不干 | 是
————————————————————————————————————————————————————————————————————————————
#4 runtime 时 选择 动态数量的线程 : 用 线程 groups 来 divide work
std::thread::hardware_concurrency()
硬件支持的并发线程数 的 hint
[1] 线程 vector
std::vector<std::thread> threads(n_threads);
[2] 启动 n_threads 个子线程
for i = 0 ~ n_threads-1
threads[i] = std::thread(
f, block_start, block_end, std::ref(results[i] )
);
[3] wait 子线程
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join) );
#5 识别线程:目的 `把 data 或 behavior 与 线程 关联起来`
1 线程 identifier 是 std::thread::id type
std::thread::id master_thread;
void f()
{
if( std::this_thread::get_id() == master_thread )
{
do_master_thread_work();
}
do_common_work();
}
2 线程ID 可作 关联容器 中 `key`
3 在线程间 共享 data
2个目标
[1] 避免潜在问题
[2] 最大化收益
`并发 好处`
线程间 `共享数据` 容易、直接
1.1 线程间 共享 数据
双向链表 delete node == update next + update prev
只 update next 时, `invariant 被 打破了`
若此时另一线程 正 access this node & 没做保护处理
=> 该 `race condition` 会 引起 bug
1 问题: race condition 竞争条件
`结果取决于 多个线程` 上 `操作` 执行的 `相对顺序`
多个线程 竞争地去执行 各自的 `操作`
|
|/
抢票
data race
对象 并发修改 对象
=> undefined behavior
2 解决
(1) `用 mutex wrap 数据结构`
以保证 `只有 真正执行修改的那个线程 可看到 invariants 被打破的状态`
(2) lock-free 编程
修改 数据结构的设计 及其 invariant
`分离 变化` 成 `n 个 不可分割的 更小变化`
每个小变化 保护 其 invariant
(3) STM / software transactional(事务) memory
数据库更新
update 数据结构 == process a transaction/事务
`store` series of `数据修改和读` to a `事务log`
commit it in a step
`commit 失败, 事务重启`
2.1 mutex 机制
1 `mut`ually `ex`clusive 互斥
保护 共享数据
access 前 lock 其关联的 `mutex`
access 后 unlock
2 mutex 并不是 silver bullet 银弹
更重要的是
[1] `设计代码结构` 来 `保护数据` 2.2节
[2] `接口中内在地避免 race conditions` 2.3节
3 mutex 问题
[1] 死锁
[2] 保护范围 要多大
2.2 mutexes 使用
1 mutex 创建/lock/unlock
创建
上/解 锁
成员函数 lock()/unlock()
2 对 mutex 不推荐 手工 调 lock()
原因: mutex 是一种 资源
与 对 thread 不推荐手工调 join() 一样
———————————————————————————————————————————————
资源 | 资源管理类 / RAII 类
———————————————————————————————————————————————
thread 对象 | thread_guard 或 scoped_thread
———————————————————————————————————————————————
mutex 对象 | std::lock_guard
———————————————————————————————————————————————
|
|
| 重构
|
| [1] 函数 作 public 成员
|
| [2] 被保护数据 + mutex 作 private 成员
|/
2.3 Structuring code 以 保护 共享数据
1 问题
`迷途( stray ) ptr/ref` 和 `后门`
若 成员函数 将 被保护数据 的 `ref/ptr 传出` lock 的 scope
|
| 如
|/
——————————————————————————————————————————————————————————————————————————————————
[1] `隐晦地 传出`
成员函数 para 为 `( 用户策略 ) 函数对象`
|
| 函数调用运算符 operator() 的 para 为
|/
ref / ptr 参数
——————————————————————————————————————————————————————————————————————————————————
[2] 显而易见 的 传出
return
——————————————————————————————————————————————————————————————————————————————————
=> 留了 `后门 ( backdoor )`
=> 任何能 访问 ptr/ref 的 code 可
`绕过(bypass) mutex 的 保护` access `被保护数据`
|
|/
不用 lock mutex
(1) code 结构
——————————————————————————————————————————————————————————
[1] ProtectedData + mutex 作 其 管理类(DataWrapper) 成员
——————————————————————————————————————————————————————————
[2] 成员函数 para 为 `函数对象`
接受 `用户(恶意 malicious)策略函数`
|
| [3]
|/
——————————————————————————————————————————————
1] `引用参数` => 可 `取 被保护数据 的 ptr`
2] 绕过 mutex 保护 access 被保护数据
——————————————————————————————————————————————————————————
(2) 根因
没有把 access 数据 的 所有代码段 标记为 `互斥`
`漏掉了 保护 传出的 被保护数据 的 ref`
(3) 更深的 陷阱 ( pitfall )
接口函数间 调用顺序( 接口设计 ) 导致 race conditions — 2.3节
2.4 在接口本身中 发现 ( Spotting ) race conditions
1 双向 list
删除 1个 node
线程安全
阻止 前中后共 3个节点 的 并发 accesses
问题
单独保护 每个节点
仍有 race conditions
解决
`单个 mutex` 保护 `整个 数据结构(list)`
2 线程安全 的 栈
5个操作 时
(1) 问题
调用顺序 为 empty() -> top() -> pop() 时, `not 线程安全`
[1] 线程1 empty() top() 间 线程2 pop()
empty() 判非空 -> pop() 使栈 空 -> top()
原因: 接口设计导致
[2] 线程1 top() pop() 间 线程2 top()
`2个线程 本打算 分别 处理 顶值和次顶值, 实际 都 处理 顶值`
次顶值 没处理 就被移除了
(2) 解决
( 有 隐患 )
|\
|
[1] mutex + 联合调 top 和 pop
若 stack 对象 copy ctor 可能 抛出异常
Herb Sutter 给出 solution
[2] pop() = std::stack 的 empty() + top() + pop()
————————————————————————————————————————————————
1] `构造 栈元素类型 新值` + pass by reference
+
2] 赋值 给 构造的新值 = `引用 实参/形参`
————————————————————————————————————————————————
缺点
1] 构造 代价太高 for some type
2] popped value type 必须 `可 赋值`
许多 用户定义类型 不支持 assignment
[3] 用 copy ctor 或 move ctor 不抛出异常 的类型
C++11 的 `右值引用`
使 更多类型 move ctor 不会抛出异常, 虽然 copy ctor 会
缺点
不通用
用户自定义类型
1] copy ctor 可能 抛出异常
2] 没有 move ctor
[4] `return ptr` 指向 popped item
`ptr 可安全 copy, 不会 抛出异常`
缺点
要内存管理, 简单类型时 开销大
用 std::shared_ptr
[5] 解决 [2] + [3] 或 [4]
接口简化
例 [2] + [4]
5个操作 变 3个 好处
empty empty
top
pop pop: 2 个 重载版本 [2] [4]
push push
swap
mutex 保护 std::stack 上 `完整 3 操作` empty() -> top() -> pop()
——————————————————————————————————————————————————————————————————————————————————————————————————
| [4] | [2]
——————————————————————————————————————————————————————————————————————————————————————————————————
函数原型 | std::shared_ptr<T> pop() | void pop(T& value)
——————————————————————————————————————————————————————————————————————————————————————————————————
para | 空 | 模板参数 T `引用类型`
——————————————————————————————————————————————————————————————————————————————————————————————————
实现关键 | 似 placement new + args 转发 | 1] pop 的 caller 负责 构造
取出 top 元素 | pop() 负责 构造 | 2] 赋值 给 构造的新值 = `引用 实参/形参` Note: 只是 引用参数, 不是 取出/top 栈中 元素的引用
怎么办 ? | std::shared_ptr<T> const res( | value = stk.top(); // T 可 赋值
用于`构造 栈元素类型 新值`| std::make_shared<T>( stk.top() ) ); |
——————————————————————————————————————————————————————————————————————————————————————————————————
记住 解决 [4] 易 => 解决 [2] 等
——————————————————————————————————————————————————————————————————————————————————————————————————
template< class T, class... Args >
shared_ptr<T> make_shared( Args&&... args ) // <memory>
T 非 数组
|\
| Note
|
构造 T 型 object
wrap 进 std::shared_ptr
就像
::new (pv) T(std::forward<Args>(args)...) // placement new + args 转发
|
| internal
|/
internal void*
// 解决 [4]: 记住 解决 [4] => 解决 [2] 等
template<typename T>
std::shared_ptr<T> threadsafe_stack::pop()
{
std::lock_guard<std::mutex> lock(m);
if( stk.empty() )
throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>( stk.top() ) );
stk.pop();
return res;
}
3 lock 粒度/范围
————————————————————————————————————————————————————————————————————
[1] 大 + 1个 mutex | 并发增益小
————————————————————————————————————————————————————————————————————
[2] 小 + 1个 mutex | 没有完全保护期望的操作 => race condition
————————————————————————————————————————————————————————————————————
[3] 更小 + 多个 mutex | 死锁: 两线程相互等待, 结果谁也不往前走
————————————————————————————————————————————————————————————————————
2.5 死锁 解决
1 死锁: 2个线程, 2个mutex
2线程 都要 lock 2个 mutex 才能执行
2个线程 各锁 1个mutex 时, 都在 等对方锁住的 mutex 被释放
2个线程都在死等
场景: 无锁 case 下的 死锁
`2 个 thread func` 在 对方 `线程对象` 上 调 join()
|
| std::thread t1;
|/ std::thread t2(f2, std::ref(t1) );
para 为 `线程对象 ref` t1 = std::thread(f1, std::ref(t2) );
void f1(std::thread& t_) { t_.join(); }
死锁: 都等 对方结束 void f2(std::thread& t_) { /* 休眠 5秒 */ t_.join(); }
2.6 避免死锁 的方法
思想
线程1 可能等 线程2 时, 线程2 就不要 等线程 1 了
1 避免 嵌套锁
2 当 持有1个锁 时, 避免调 `用户代码`
|
|/
可能 获得 另一个 锁
=> 可能会 嵌套锁
3 每个线程 以 相同顺序 获得多个锁
总在 mutex1 前 锁住 mutex2
[1] 适用于: 不同 mutexes for 不同 purposes
|
| [2] 问题: 不能 解决 的 死锁
|/
2个 mutex 保护 同 class 2个 instance
swap
|
| 解决
|/
std::lock() + std::lock_guard + std::adopt_lock 作 第 2 参数
| |
| |/
1次 lock 多个 mutex 告诉 std::lock_guard Ctor
[1] 其 第1参数 mutex 已上锁
[2] 构造时只需 接收 mutex 上 mutex 的 ownership
| [3] 问题: 不能 解决 的 死锁
|
| 多个锁 被 `分开 获得`
|/
4 锁层次
[1] application 分 多层
[2] 某层 已拥有 某 mutex 上的锁
就 `不允许 上一层 lock 另一 mutex`
2.7 std::unique_lock 实现更灵活的 locking
1 思想
松弛 invariants
并不总是拥有 mutex
代价
内部 要存/更新 1个 flag 来标识是否拥有 mutex
2 std::defer_lock 作 第2参数, 告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
3 std::unique_lock 对象
2种方法 加锁
1) 其 上调用 lock()
2) 传给 std::lock()
因为 std::unique_lock 提供 3 个成员函数
lock() try_lock() unlock()
4 std::lock_guard & std::unique_lock
同 都可
RAII 锁管理
异
1) 成员函数
前者 只有 ctor & dtor: ctor 中 加锁 + dtor 中 解锁
后者 还有 3 个成员函数 lock() try_lock() unlock() => 更灵活
2) 是否可 `管理 mutex 的 lifetime`
否/是
=>前者 最好将 mutex 设为 global 变量
swap 的 std::unique_lock 版本
std::unique_lock + std::defer_lock 作 第2参数 + std::lock(uniLkm1, uniLkm2);
|
|/
告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
2.8 转移 mutex ownership between scope
1 mutex 的 ownership 可在 std::unique_lock 间 move
std::unique_lock: movable 但 not copyable
2 std::unique_lock 3 种灵活应用
(1) 函数 lock mutex, transfer 该 mutex 的 ownership 给其 caller 的 lock
(2) lock 不直接返回, 而是作 gateway class 的 mem,
所有 access to data 都通过 gateway 类
gateway
[1] destory 时, releases lock
[2] movable
(3) std::unique_lock 可在 `销毁前 释放 其 管理的 mutex`
|
|
|/
unlock() 中
=> 提高性能
2.9 合适粒度下 上锁
`真正 访问 共享数据 时 再 lock` mutex
3.1 `初始化期间` 保护 共享数据
1 延迟 初始化
共享数据 只在 `初始化 并发 access 阶段` 才用 mutex 保护
|
| 如: 打开 数据库连接
|/
构造代价
(1) 单检测: `资源 (指针)` 是否 已初始化
1) 思想
资源 使用前检测
若 没初始化
先 初始化, 再 解锁, 再 使用
|
|/
spr.reset(new R);
2) code 结构
—————————————————————————————————————————————————
[1] 加锁 => 所有线程 串行
[2] 检测
[3] 初始化
[4] 解锁
[5] 使用 资源指针
—————————————————————————————————————————————————
加/解 锁
std::unique_lock + 其 成员函数 unlock()
—————————————————————————————————————————————————
3) 问题
第1次检测 前 加锁 => 检测处 `串行` => `并发性差`
|
| (半)解决
|/
(2) 双检测锁 ( Double-Checked Locking )
1) 思想
`第2次检查 前 才加锁`
2) code 结构
—————————————————————————————————————————————————
[1] 第1次检测
[2] 加锁
[3] 第2次检测
—————————————————————————————————————————————————
加/解锁
std::lock_guard + scope 结束 Dtor 解锁
—————————————————————————————————————————————————
3) 问题
1条 new 语句 分解为 3 条底层语句
——————————-————
[1] 分配 内存
[2] 构造
[3] 指针赋值
————————————————
compiler 可能重排执行顺序为 [1][3][2]
线程 1 资源 `构造未完成 -> 资源 ptr 就 已赋值`
线程 2 检测 资源 ptr 非空
`访问 初始化未完成` 的 资源
data race 型 race condition
undefined behavior
|
| 解决
|/
(3) std::call_once(std::once_flag, 资源初始化函数, func_args)
[1] 多线程 `并发进入` std::call_once 函数 + `同步`
但 `只有1个线程 ( active 线程 ) 真正执行` 第 2 arg: (资源初始化) `函数`
其他线程 ( passive/被动 线程 ) 进入 std::call_once 后
`等待` active 线程 完成初始化 + 返回 后, 再返回
|
|/
同步
[2] Note
当前线程 从 std::call_once 返回时,
资源初始化 已完成, 但可能由 另一线程 完成
[3] std::call_once 参数
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args ) // <mutex>
——————————————————————————————————————————————————————
1] 第1参数 std::once_flag
——————————————————————————————————————————————————————
2] 其余参数 与 std::thread Ctor
——————————————————————————————————————————————————
1> 形式 同 `右值引用 args`
——————————————————————————————————————————————————
2> 目的 不同 `避免 copy, 而不是 实现 move 语义`
因为 不必转移到 另一线程 去执行
——————————————————————————————————————————————————————
[4] std::once_flag
功能相当于 std::mutex
同 `不能 copy`
2 `static local 变量` 初始化 时 race condition
(1) static 局部变量 初始化
[1] 可能 真正在多个线程上 进行(C++11 前编译器) => problematic race condition
[2] 只能 真正在1个线程上 进行 (C++11 compiler 解决)
=> race condition 只是说 `哪个线程 去 初始化`
(2) 应用
`单例`
需要 `单个 全局 instance` 时
实现 `像 std::call_once 一样` 的 功能
3.2 很少更新 的 数据结构 的 保护
boost 库 boost::shared_mutex
第4章 同步
并发操作 cv & future
& async & packaged_task & promise
多线程 2个要点
[1] 保护 `共享数据`
第 3 章
[2] 线程 `同步`
线程 需 `等待` 另一线程 完成某任务后, 再去完成 自己的任务
实现
cv & futures
|
| cv: 类 std::condition_variable
|/
条件变量/condition variables
#1 循环等待 事件/条件: cv
##1.1 最简单的 方法
(1) code 结构 // 等待线程
|—— ——> `标志变量 flag` = true /false 表示条件 是/否满足
加锁 | - - - - -
| |
check 条件 是否满足 |
| => 周期性 ( 即 循环 )
否 | +
| 等待
解锁 - - - - - |\
|
sleep 固定时间 // Note: sleep 期间, other 线程 可获得锁 并 `modify 条件 使满足`
是
函数退出 => 自动解锁
(2) 问题
sleep 多久 难确定
太短: 浪费太多处理时间
太长: 事件早已发生, 等待线程还在 sleep
1] 丢帧 快节奏游戏 中
2] 过多消耗 时间片 实时应用 中
(3) 解决
cv
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100) );
lk.lock();
}
}
##1.2 cv class
1 `同步` 基础
可用于 同时 `block/阻塞 多个线程`
until 通知线程
1] 修改 a `shared variable`
|
| 用于表示
|/
条件
2] notifies cv
——————————————————————————————
通知线程 modify & notify cv
|
shared variable
|
等待线程 check & wait on cv
——————————————————————————————
2 2 个问题
(1) 问题1
精确调度事件
=> 可能 `notify called on destroyed cv`
解决1
notify under lock 可能必要
(2) 问题2
notify 发送时, 等待线程 没有 waiting on cv
=> `错过 notification`
原因
————————————————
shared variable
————————————————
1] `atomic`
————————————————
2] 未加锁
————————————————
Note
atomic copy delete
=>
不能用 cv 上 predicated wait 版本
即 不能为
cv.wait(lk, [] { return proceed; } ); // std::atomic<bool> proceed(false);
解决
1) 用 cv 上 non-predicated wait 版本
while ( !proceed ){ cv.wait(lk); }
2) shared variable 即使是 atomic
修改 也要 加锁
3 cv 类
[1] copy deleted
[2] notify_one & notify_all
[3] wait 2个重载
1] non-predicated 版本 (如) while ( !proceed ){ cv.wait(lk); }
非循环
`释放` lock
`阻塞` current thread & 加到 cv 上的 `等待线程 队列`
notified 或 spuriously wake up
`解阻塞`
`relock` + `wait exits`
2] predicated wait 版本 (如) cv.wait(lk, [] { return proceed; } );
循环 + 条件
|
|/
满足 = p() == true 时, 返回
————————————————————————————————————————————
与 non-predicated 版本
————————————————————————————————————————
区别
仅多了1步
relock 后, (循环) 检测 条件是否满足
————————————————————————————————————————
联系
视为 循环调 non-predicated 版本
————————————————————————————————————————————
template<typename Pred>
void std::condition_variable::wait(unique_lock<mutex>& uniLk, Pred pred)
{
while ( !pred() )
wait(uniLk); // 视为 non-predicated 版本
}
//等价于
while ( !pred() )
{
wait(uniLk);
}
4 等待线程
(1) notify_one
通知线程 + 多个等待线程
没必要 持有 the same mutex
否则
`悲观锁`
hurry up and wait
原因
awakened 后 可能无法 获得 mutex ( 被 通知线程 占着 )
|
| 解决
|/
pthreads 实现
1] `识别` 悲观锁场景
2] `notify 内` 把 等待线程
从 `cv 上的 队列` 转移到 `mutex 上的 队列`
而 `不 唤醒它`
(2) notify_all
`惊群现象`
事件发生
`唤醒所有` 所有等待线程
但 `只能有1个线程 获得事件处理权`
`其他线程 重新 陷入等待`
5 spurious wakeup / 假唤醒
notify_one/notify_all 之外的因素导致的
`wait 被唤醒, 但 条件不满足`
|
| 解决
|/
predicate wait 版本
[1] 只影响 non-predicated wait
[2] 不影响 predicated wait
6 应用
//------ App
threadsafe_queue<T> que;
void producer()
{
while( more_data_to_prepare() )
{
T const data = prepare_data();
que.push(data); // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前
}
}
void consumer()
{
while(true)
{
T data;
que.wait_and_pop(data);
process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
if( is_last_chunk(data) )
break;
}
}
// 记住这 1个 足够 => others
template<typename T>
std::shared_ptr<T>
threadsafe_queue::wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
// 捕获 this 目的: 可在 lambda 中用 当前类的 成员
cv.wait(lk,
[this]{ return !que.empty();} );
std::shared_ptr<T> res(
std::make_shared<T>( que.front() ) );
que.pop();
return res;
}
#2 等待 `1次性事件`: future
线程 以某种方式
获得 future -> 表示事件
场景: 等待航班
##2.0 std::future
`等待 被 异步设定 的 value`
template< class T > class future; (1)
template< class T > class future<T&>; (2)
template<> class future<void>; (3)
1 异步 机制
准备好
异步操作 - - - -> result + 修改 shared state
| /\ |
| 提供 / hold | link
|/ / |
std::future 对象 —— —— —— —— —— —— ——
| |\
| 给 | query / wait for / extract result: 阻塞, 直到 result 准备好
|/ |
creator
std::async / std::packaged_task / std::promise
future shared_future async packaged_task promise
除 async 为 为 function template 外, 其余为 class template
##2.1 std::async
`异步 runs a function` ( 可能 in a new thread)
返回 std::future
future 上调 get(), 当前线程 阻塞, 直到 future ready, 然后 return result
传参
与 std::thread 传参方式相同
##2.2 std::packaged_task
std::packaged_task<> obj 绑定 future to callable object
task.get_future() 返回 future 对象
##2.3 std::promise
1 针对2种场景
[1] task 不能被表达为 函数调用
[2] 结果 来自多个地方
2 机制
// 3大成员函数
[1] get_future()
返回 promised 结果 关联的 future
[2] set_value() : 主动线程
会 make state/future ready
[3] fut.get() : 被动线程
第5章 C++内存模型 和 原子类型操作
C++ `原子类型` 如何用于
[1] 线程间 `同步`
[2] `lock-free` 数据结构
#1 C++11新标准: 有 `多线程意识` 的 `内存模型`
##1.1 `两个线程 access 间 强迫顺序`
2种办法
(1) mutex
(2) 原子操作 的 同步特性: 迫使
##1.2 修改顺序
1 对象 写过程中 不允许 写或读
但每次写/读 由 哪个线程来做, 没有规定
内核 线程调度 决定
2 coder 还是 编译器 保证
(1) 非原子类型: 由 coder 在代码中保证
(2) 原子类型: 编译器保证
#2 C++中 原子 操作/类型
`原子: 不可分割, 要么不执行, 要么执行完`
#2.1 标准原子类型 <atomic>
`内存顺序参数`, 以指定 `内存顺序语义`
#3 同步操作 和 迫使顺序
思想
`用 原子操作 在 非原子操作间 迫使顺序`
=> 线程间 `同步/强加顺序`
1 写/读 线程
写线程1
填/populate 数据结构 -> data ready -> 原子变量 set flag
读线程2:
直到 flag 被 set, 才 读
#include <vector>
#include <atomic>
#include <iostream>
std::vector<int> data;
std::atomic<bool> data_ready(false);// [1] flag
void reader_thread()
{
while( !data_ready.load() ) // [2]
{
std::this_thread::sleep(std::milliseconds(1));
}
std::cout<<”The answer=”<< data[0] <<”\n”;
}
void writer_thread()
{
data.push_back(42);
data_ready = true; // [3]
}
2 synchronizes-with 关系 ( => happens-before ) + sequenced-before
线程间 (操作间) synchronizes-with ( 在...之前 ) 关系
=> `线程间 (操作间) happens-before` 关系
+ `线程内 sequenced-before` 关系
=> `执行顺序 传递性`
3 `内存 ordering`: 用于 `原子操作`
内存 排序 => synchronizes-with 关系
`3种模型 6种 memory ordering`
memory_order_
seq_cst // 1 顺序一致: 默认 最严格
relaxed // 2
//3 acquire-release
consume
acquire // acquire 操作 : 原子 load
release // release 操作 : 原子 store
acq_rel // acquire / release / both : 原子 read-modify-write ( fetch_add() / exchange() )
怎么选?根据它们 对程序 behavior 的影响
[1] 顺序一致 seq_cst
1) `多线程` 像 单线程
`所有线程 看到的 操作执行 顺序相同`
2) 从`同步` 角度 看
同一 原子变量, `所有线程` 间 `全局同步`
3) 优劣
优: 最简单直观
劣: `所有线程间 全局同步 -> 耗时 / 耗资源`
[2] relaxed
线程内, 是 单线程 happens-before
`线程间, 线程1 看到 线程2 中的操作是 out of order/乱序的`
`未必是 线程2自己 看到的顺序(代码表现的顺序)`
|
| 对 relaxed ordering 在操作间 force 同步/顺序关系
|/
[3] acquire-release
通过 2 个线程间 `局部顺序` 强迫出 `全局顺序`
| |
| 同一 |
|/ |/
原子变量 `令 release 操作(线程) synchronizes-with acquire 操作(线程)`
| |
store load
线程间同步
用 `1个 原子变量` 上
`2个 线程` 间 `release / (循环) acquire 操作`
来 `force` 线程间 顺序关系
————————————————————————————————————————————————————————————————————————————————————
release 线程 acquire 线程
release 前 操作
|
| 1 线程内 (natural) 顺序关系 ( sequenced-before )
|
release - - - - - - - - - - - - -
| 2 强迫 线程间顺序关系 ( synchronizes-with 关系 )
|
acquire
|
| 3 线程内 (natural) 顺序关系 ( sequenced-before )
|
acquire 后操作
————————————————————————————————————————————————————————————————————————————————————
4 Fences 围栏/隔离措施
`全局操作, 不修改数据, 在代码中 放一条线, 使 特定操作不能越过
=> 引入 happens-before 和 synchronizes-with 关系`
`release / acquire fence` 等价于
store / load tagged with memory_order_`release/acquire`
5 用原子操作 强迫 非原子操作
1 原子操作 的 happens-before + fence => 非原子操作 的 happens-before
第9章 高级线程管理 — 线程池
#1 线程池
work 即 task: 是 func_wrap obj
|
| 如
|/
std::function
1 3 阶段
(1) 创建 线程池 object
启动 多线程 - - - - -> 线程函数统一入口 worker_thread()
push 多个 thread object 到 `线程 vector`
(2) push 想 并行 的 `work` 到 work_queue
|
|/
submit
(3) thread_pool 自动管理 线程 + work
`每个 thread 的 线程函数` 从 `work_queue` 中
循环 try_pop 出 work 去执行: work()
调 wrap 的 callable object 的 operator()
若 try_pop 失败, 则 yield 切出 线程
Note
`每个 work` 被 `worker threads` 中 `某线程 取出 并执行`
|
|/
未指定
#1.1 最简单情况: `线程池` 中 `线程数量 固定`
1 `std::function` 可 store / copy / invoke
`有 copy 语义` 的 `callable object`
2 保证 线程一定会被 等待/join()
RAII 类
class join_threads
{
private:
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_):
threads(threads_) {}
~join_threads()
{
for(unsigned long i=0; i<threads.size(); ++i)
{
if( threads[i].joinable() )
threads[i].join();
}
}
};
3 有 work 要做时,
调 submit( Func ) 去 push work 到 pending work_queue
4 适用场景
(1) work 间 完全独立
(2) 等待线程完成 而不是 work 完成
(3) 无 阻塞
class thread_pool
{
private:
// (1) 标志 thread_pool 是否已销毁
std::atomic_bool done;
// (2) work_queue
thread_safe_queue< std::function< void() > > work_queue;
// (3) worker_threads: 用 vector
std::vector<std::thread> threads;
// (4) 等待 worker_threads 中 所有 thread 结束 => RAII 类
join_threads joiner;
void
worker_thread()
{
while( !done ) // done==false / thread_pool 销毁前, n 个 线程函数 worker_thread() 均一直循环 ( try_pop work 失败, 立即 切出线程 )
{
std::function<void()> work; // std::function object
// `每个 thread` 从 work_queue 中 `循环 try_pop 出 work 去执行: 若 try_pop 失败, yield 切出 线程`
if( work_queue.try_pop(work) )
{
work(); // invoke std::function 内 store 的 target ( callable object ) 的 operator()
}
else
{
std::this_thread::yield();
}
}
}
public:
template<typename F>
void
submit(F f)
{
work_queue.push( std::function<void()>(f) );
}
thread_pool():
done(false), joiner(threads)
{
unsigned const thread_count =
std::thread::hardware_concurrency();
try
{
for(unsigned i=0; i < thread_count; ++i)
{
threads.push_back(
std::thread( &thread_pool::worker_thread, this) );
}
}
catch(...)
{
// 保证 线程启动失败时,已启动的线程 被停掉 和 清理
done = true; // 只需把 线程统一入口函数中 循环停掉
throw;
}
}
// 2. dtor
~thread_pool()
{
done = true;
}
};
##1.2 等待 work 而不是 线程
|
|/
callable object
———————————————————————————————————————————————————————————
work wrap 进 Work
|
|/
[1] 只有 copy 语义 std::function 等待 线程
|
| 换为
|/
[2] 只含 move 语义
自定义: 提供 move 语义 的 ctor 等待 work
———————————————————————————————————————————————————————————
3 个 可 move class
[1] callable object
|
| 左值引用 para: Work 的 ctor / move ctor / move assignment
|/
[2] Work
unique_ptr
成员类型
1] Ctor: callable object 左值引用 para
template<typename F>
Work(F&& f):
sp_impl( new impl_type<F>(std::move(f) ) ) {}
2] 函数调用运算符 调 call()
void operator()() { up_impl->call(); }
3] call() 调 callable object 的 函数调用运算符
[3] pask_task
1] std::result_of<CallableType()>::type
取出 callable object
函数调用运算符 的 return type
|
| 作 future
|/
模板参数类型 T
2] callable object 的 ownership
依次 转移给 std::packaged_task // std::packaged_task< return_type() > pack_tsk( std::move(f) );
转移给 work_queue 中 ( 队尾 ) Work //work_queue.push( std::move(pack_tsk) );
template< class R, class ...Args >
class packaged_task< R(Args...) >; // R: return type
// ctor
template <class F>
explicit packaged_task( F&& f );
template< class S, class... Args >
class result_of< S (Args...) >;
Note
S: callable type, not return type
std::result_of<S(char, int&)>::type 是 S 的 operator() 的 return type
struct S {
double operator()(char, int&);
};
int main()
{
std::result_of<S(char, int&)>::type f = 3.14; // f has type double
}
// List9.2 线程池 with waitable works
class Work
{
private:
struct impl_base
{
virtual void call() = 0;
virtual ~impl_base() {}
};
template<typename F>
struct impl_type: impl_base
{
F f;
impl_type(F&& f_)
: f( std::move(f_) ) {}
void call() { f(); }
};
// (2) struct 用 unique_ptr 管理
std::unique_ptr<impl_base> up_impl;
public:
// (1) default ctor: empty
Work() = default;
// (2) move 语义: ctor / move ctor / move assignment
template<typename F>
Work(F&& f):
up_impl( new impl_type<F>(std::move(f) ) ) {}
Work(Work&& rhs):
up_impl( std::move(rhs.up_impl) ) {}
Work&
operator=(Work&& rhs)
{
up_impl=std::move(rhs.up_impl);
return *this;
}
// (3) 函数调用运算符
void operator()() { up_impl->call(); }
// copy 语义 delete
Work(const Work&)=delete;
Work(Work&) = delete;
Work& operator=(const Work&)=delete;
};
class thread_pool
{
thread_safe_queue< Work > work_queue;
void worker_thread()
{
while(!done)
{
Work work;
if( work_queue.try_pop(work) )
{
work();
}
else
{
std::this_thread::yield();
}
}
}
public:
template<typename CallableType>
std::future<typename std::result_of<CallableType()>::type>
submit(CallableType f)
{
typedef typename std::result_of<CallableType()>::type
return_type;
std::packaged_task< return_type() > pack_tsk( std::move(f) );
std::future<return_type> fut( pack_tsk.get_future() );
work_queue.push( std::move(pack_tsk) );
return fut;
}
// rest as before
};