简介
本文是本系列的最后一篇,讨论两个主题:关于实现基于互斥锁的并发链表的设计方法和设计不使用互斥锁的并发数据结构。对于后一个主题,我选择实现一个并发堆栈并解释设计这种数据结构涉及的一些问题。用 C++
设计独立于平台的不使用互斥锁的数据结构目前还不可行,所以我选用 GCC version 4.3.4 作为编译器并在代码中使用 GCC 特有的 _sync*
函数。如果您是 WIndows® C++
开发人员,可以考虑使用 Interlocked* 系列函数实现相似的功能。
并发单向链表的设计方法
清单 1 给出最基本的并发单向链表接口。是不是缺少什么东西?
清单 1. 并发单向链表接口
template <typename T>
class SList {
private:
typedef struct Node {
T data;
Node *next;
Node(T& data) : value(data), next(NULL) { }
} Node;
pthread_mutex_t _lock;
Node *head, *tail;
public:
void push_back(T& value);
void insert_after(T& previous, T& value); // insert data after previous
void remove(const T& value);
bool find(const T& value); // return true on success
SList( );
~SList( );
};
清单 2 给出 push_back
方法定义。
清单 2. 把数据添加到并发链表中
void SList<T>::push_back(T& data)
{
pthread_mutex_lock(&_lock);
if (head == NULL) {
head = new Node(data);
tail = head;
} else {
tail->next = new Node(data);
tail = tail->next;
}
pthread_mutex_unlock(&_lock);
}
现在,假设一个线程试图通过调用 push_back
把 n 个整数连续地添加到这个链表中。这个接口本身要求获取并释放互斥锁 n 次,即使在第一次获取锁之前已经知道要插入的所有数据。更好的做法是定义另一个方法,它接收一系列整数,只获取并释放互斥锁一次。清单 3 给出方法定义。
清单 3. 在链表中插入数据的更好方法
void SList<T>::push_back(T* data, int count) // or use C++ iterators
{
Node *begin = new Node(data[0]);
Node *temp = begin;
for (int i=1; i<count; ++i) {
temp->next = new Node(data[i]);
temp = temp->next;
}
pthread_mutex_lock(&_lock);
if (head == NULL) {
head = begin;
tail = head;
} else {
tail->next = begin;
tail = temp;
}
pthread_mutex_unlock(&_lock);
}
优化搜索元素
现在,我们来优化链表中的搜索元素 — 即 find
方法。下面是几种可能出现的情况:
- 当一些线程正在迭代链表时,出现插入或删除请求。
- 当一些线程正在迭代链表时,出现迭代请求。
- 当一些线程正在插入或删除数据时,出现迭代请求。
显然,应该能够同时处理多个迭代请求。如果系统中的插入/删除操作很少,主要活动是搜索,那么基于单一锁的方法性能会很差。在这种情况下,应该考虑使用读写锁,即 pthread_rwlock_t
。在本文的示例中,将在 SList
中使用 pthread_rwlock_t
而不是 pthread_mutex_t
。这么做就允许多个线程同时搜索链表。插入和删除操作仍然会锁住整个链表,这是合适的。清单 4 给出使用 pthread_rwlock_t
的链表实现。
清单 4. 使用读写锁的并发单向链表
template <typename T>
class SList {
private:
typedef struct Node {
// … same as before
} Node;
pthread_rwlock_t _rwlock; // Not pthread_mutex_t any more!
Node *head, *tail;
public:
// … other API remain as-is
SList( ) : head(NULL), tail(NULL) {
pthread_rwlock_init(&_rwlock, NULL);
}
~SList( ) {
pthread_rwlock_destroy(&_rwlock);
// … now cleanup nodes
}
};
清单 5 给出链表搜索代码。
清单 5. 使用读写锁搜索链表
bool SList<T>::find(const T& value)
{
pthread_rwlock_rdlock (&_rwlock);
Node* temp = head;
while (temp) {
if (temp->value == data) {
status = true;
break;
}
temp = temp->next;
}
pthread_rwlock_unlock(&_rwlock);
return status;
}
清单 6 给出使用读写锁的 push_back
方法。
清单 6. 使用读写锁在并发单向链表中添加数据
void SList<T>::push_back(T& data)
{
pthread_setschedprio(pthread_self( ), SCHED_FIFO);
pthread_rwlock_wrlock(&_rwlock);
// … All the code here is same as Listing 2
pthread_rwlock_unlock(&_rwlock);
}
我们来分析一下。这里使用两个锁定函数调用(pthread_rwlock_rdlock 和 pthread_rwlock_wrlock)管理同步,使用 pthread_setschedprio 调用设置写线程的优先级。如果没有写线程在这个锁上阻塞(换句话说,没有插入/删除请求),那么多个请求链表搜索的读线程可以同时操作,因为在这种情况下一个读线程不会阻塞另一个读线程。如果有写线程等待这个锁,当然不允许新的读线程获得锁,写线程等待,到现有的读线程完成操作时写线程开始操作。如果不按这种方式使用 pthread_setschedprio 设置写线程的优先级,根据读写锁的性质,很容易看出写线程可能会饿死。
下面是对于这种方式要记住的几点:
- 如果超过了最大读锁数量(由实现定义),pthread_rwlock_rdlock 可能会失败。
- 如果有 n 个并发的读锁,一定要调用 pthread_rwlock_unlock n 次。
允许并发插入
应该了解的最后一个方法是 insert_after。同样,预期的使用模式要求调整数据结构的设计。如果一个应用程序使用前面提供的链表,它执行的插入和搜索操作数量差不多相同,但是删除操作很少,那么在插入期间锁住整个链表是不合适的。在这种情况下,最好允许在链表中的分离点(disjoint point)上执行并发插入,同样使用基于读写锁的方式。下面是构造链表的方法:
在两个级别上执行锁定(见 清单 7):链表有一个读写锁,* 各个节点包含一个互斥锁。如果想节省空间,可以考虑共享互斥锁 — 可以维护节点与互斥锁的映射。
- 在插入期间,写线程在链表上建立读锁,然后继续处理。在插入数据之前,锁住要在其后添加新数据的节点,插入之后释放此节点,然后释放读写锁。
- 删除操作在链表上建立写锁。不需要获得与节点相关的锁。
- 与前面一样,可以并发地执行搜索。
清单 7. 使用两级锁定的并发单向链表
template <typename T>
class SList {
private:
typedef struct Node {
pthread_mutex_lock lock;
T data;
Node *next;
Node(T& data) : value(data), next(NULL) {
pthread_mutex_init(&lock, NULL);
}
~Node( ) {
pthread_mutex_destroy(&lock);
}
} Node;
pthread_rwlock_t _rwlock; // 2 level locking
Node *head, *tail;
public:
// … all external API remain as-is
}
};
清单 8 给出在链表中插入数据的代码。
清单 8. 使用双重锁定在链表中插入数据
void SList<T>:: insert_after(T& previous, T& value)
{
pthread_rwlock_rdlock (&_rwlock);
Node* temp = head;
while (temp) {
if (temp->value == previous) {
break;
}
temp = temp->next;
}
Node* newNode = new Node(value);
pthread_mutex_lock(&temp->lock);
newNode->next = temp->next;
temp->next = newNode;
pthread_mutex_unlock(&temp->lock);
pthread_rwlock_unlock(&_rwlock);
return status;
}
基于互斥锁的方法的问题
到目前为止,都是在数据结构中使用一个或多个互斥锁管理同步。但是,这种方法并非没有问题。请考虑以下情况:
- 等待互斥锁会消耗宝贵的时间 — 有时候是很多时间。这种延迟会损害系统的可伸缩性。
- 低优先级的线程可以获得互斥锁,因此阻碍需要同一互斥锁的高优先级线程。这个问题称为优先级倒置(priority inversion ) (更多信息见 参考资料 中的链接)。
- 可能因为分配的时间片结束,持有互斥锁的线程被取消调度。这对于等待同一互斥锁的其他线程有不利影响,因为等待时间现在会更长。这个问题称为锁护送(lock convoying)(更多信息见 参考资料中的链接)。
互斥锁的问题还不只这些。最近,出现了一些不使用互斥锁的解决方案。话虽如此,尽管使用互斥锁需要谨慎,但是如果希望提高性能,肯定应该研究互斥锁。
比较并交换指令
在研究不使用互斥锁的解决方案之前,先讨论一下从 80486 开始在所有 Intel® 处理器上都支持的 CMPXCHG 汇编指令。清单 9 从概念角度说明这个指令的作用。
清单 9. 比较并交换指令的行为
int compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
int old_value = *memory_location;
if (old_value == expected_value)
*memory_location = new_value;
return old_value;
}
这里发生的操作是:指令检查一个内存位置是否包含预期的值;如果是这样,就把新的值复制到这个位置。清单 10 提供汇编语言伪代码。
清单 10. 比较并交换指令的汇编语言伪代码
CMPXCHG OP1, OP2
if ({AL or AX or EAX} = OP1)
zero = 1 ;Set the zero flag in the flag register
OP1 = OP2
else
zero := 0 ;Clear the zero flag in the flag register
{AL or AX or EAX}= OP1
CPU 根据操作数的宽度(8、16 或 32)选择 AL、AX 或 EAX 寄存器。如果 AL/AX/EAX 寄存器的内容与操作数 1 的内容匹配,就把操作数 2 的内容复制到操作数 1;否则,用操作数 2 的值更新 AL/AX/EAX 寄存器。Intel Pentium® 64 位处理器有相似的指令 CMPXCHG8B,它支持 64 位的比较并交换。注意,CMPXCHG 指令是原子性的,这意味着在这个指令结束之前没有可见的中间状态。它要么完整地执行,要么根本没有开始。在其他平台上有等效的指令,例如 Motorola MC68030 处理器的 compare and swap (CAS) 指令有相似的语义。
我们为什么对 CMPXCHG 感兴趣?这意味着要使用汇编语言编写代码吗?
需要了解 CMPXCHG 和 CMPXCHG8B 等相关指令是因为它们构成了无锁解决方案的核心。但是,不必使用汇编语言编写代码。GCC (GNU Compiler Collection,4.1 和更高版本)提供几个原子性的内置函数(见 参考资料),可以使用它们为 x86 和 x86-64 平台实现 CAS 操作。实现这一支持不需要包含头文件。在本文中,我们要在无锁数据结构的实现中使用 GCC 内置函数。看一下这些内置函数:
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...)
__sync_bool_compare_and_swap
内置函数比较 oldval
和 ptr
。如果它们匹配,就把 newval
复制到ptr
。如果 oldval
和 *ptr
匹配,返回值是 True,否则是 False。__sync_val_compare_and_swap
内置函数的行为是相似的,只是它总是返回旧值。清单 11 提供一个使用示例。
清单 11. GCC CAS 内置函数的使用示例
#include <iostream>
using namespace std;
int main()
{
bool lock(false);
bool old_value = __sync_val_compare_and_swap( &lock, false, true);
cout >> lock >> endl; // prints 0x1
cout >> old_value >> endl; // prints 0x0
}
设计无锁并发堆栈
既然了解了 CAS,现在就来设计一个并发堆栈。这个堆栈没有锁;这种无锁的并发数据结构也称为非阻塞数据结构。清单 12 给出代码接口。
清单 12. 基于链表的非阻塞堆栈实现
template <typename T>
class Stack {
typedef struct Node {
T data;
Node* next;
Node(const T& d) : data(d), next(0) { }
} Node;
Node *top;
public:
Stack( ) : top(0) { }
void push(const T& data);
T pop( ) throw (…);
};
清单 13 给出压入操作的定义。
清单 13. 在非阻塞堆栈中压入数据
void Stack<T>::push(const T& data)
{
Node *n = new Node(data);
while (1) {
n->next = top;
if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS
break;
}
}
}
压入(Push)操作做了什么?从单一线程的角度来看,创建了一个新节点,它的 next 指针指向堆栈的顶部。接下来,调用 CAS 内置函数,把新的节点复制到 top 位置。
从多个线程的角度来看,完全可能有两个或更多线程同时试图把数据压入堆栈。假设线程 A 试图把 20 压入堆栈,线程 B 试图压入 30,而线程 A 先获得了时间片。但是,在 n->next = top
指令结束之后,调度程序暂停了线程 A。现在,线程 B 获得了时间片(它很幸运),它能够完成 CAS,把 30 压入堆栈后结束。接下来,线程 A 恢复执行,显然对于这个线程 *top
和 n->next
不匹配,因为线程 B 修改了 top 位置的内容。因此,代码回到循环的开头,指向正确的 top 指针(线程 B 修改后的),调用 CAS,把 20 压入堆栈后结束。整个过程没有使用任何锁。
弹出操作
清单 14 给出从堆栈弹出数据的代码。
清单 14. 从非阻塞堆栈弹出数据
T Stack<T>::pop( )
{
if (top == NULL)
throw std::string(“Cannot pop from empty stack”);
while (1) {
Node* next = top->next;
if (__sync_bool_compare_and_swap(&top, top, next)) { // CAS
return top->data;
}
}
}
用与 push
相似的代码定义弹出操作语义。堆栈的顶存储在 result
中,使用 CAS 把 top 位置更新为top->next
并返回适当的数据。如果恰在执行 CAS 之前线程失去执行权,那么在线程恢复执行之后,CAS 会失败,继续循环,直到有有效的数据可用为止。
结果好就一切都好
不幸的是,这种堆栈弹出实现有问题 — 包括明显的问题和不太明显的问题。明显的问题是 NULL 检查必须放在 while
循环中。如果线程 P 和线程 Q 都试图从只剩一个元素的堆栈弹出数据,而线程 P 恰在执行 CAS 之前失去执行权,那么当它重新获得执行权时,堆栈中已经没有可弹出的数据了。因为 top
是 NULL,访问 &top
肯定会导致崩溃 — 这显然是可以避免的 bug。这个问题也突显了并发数据结构的基本设计原则之一:决不要假设任何代码会连续执行。
清单 15 给出解决了此问题的代码。
清单 15. 从非阻塞堆栈弹出数据
T Stack<T>::pop( )
{
while (1) {
if (top == NULL)
throw std::string(“Cannot pop from empty stack”);
Node* next = top->next;
if (top && __sync_bool_compare_and_swap(&top, top, next)) { // CAS
return top->data;
}
}
}
下一个问题比较复杂,但是如果您了解内存管理程序的工作方式(更多信息见 参考资料 中的链接),应该不太难理解。清单 16 展示这个问题。
清单 16. 内存的回收利用会导致 CAS 出现严重的问题
T* ptr1 = new T(8, 18);
T* old = ptr1;
// .. do stuff with ptr1
delete ptr1;
T* ptr2 = new T(0, 1);
// We can't guarantee that the operating system will not recycle memory
// Custom memory managers recycle memory often
if (old1 == ptr2) {
…
}
在此代码中,无法保证 old
和 ptr2
有不同的值。根据操作系统和定制的应用程序内存管理系统的具体情况,完全可能回收利用已删除的内存 — 也就是说,删除的内存放在应用程序专用的池中,可在需要时重用,而不返回给系统。这显然会改进性能,因为不需要通过系统调用请求更多内存。尽管在一般情况下这是有利的,但是对于非阻塞堆栈不好。现在我们来看看这是为什么。
假设有两个线程 — A 和 B。A 调用 pop
并恰在执行 CAS 之前失去执行权。然后 B 调用 pop
,然后压入数据,数据的一部分来自从前面的弹出操作回收的内存。清单 17 给出伪代码。
清单 17. 序列图
Thread A tries to pop
Stack Contents: 5 10 14 9 100 2
result = pointer to node containing 5
Thread A now de-scheduled
Thread B gains control
Stack Contents: 5 10 14 9 100 2
Thread B pops 5
Thread B pushes 8 16 24 of which 8 was from the same memory that earlier stored 5
Stack Contents: 8 16 24 10 14 9 100 2
Thread A gains control
At this time, result is still a valid pointer and *result = 8
But next points to 10, skipping 16 and 24!!!
纠正方法相当简单:不存储下一个节点。清单 18 给出代码。
清单 18. 从非阻塞堆栈弹出数据
T Stack<T>::pop( )
{
while (1) {
Node* result = top;
if (result == NULL)
throw std::string(“Cannot pop from empty stack”);
if (top && __sync_bool_compare_and_swap(&top, result, result->next)) { // CAS
return top->data;
}
}
}
这样,即使线程 B 在线程 A 试图弹出数据的同时修改了堆栈顶,也可以确保不会跳过堆栈中的元素。
结束语
本系列讨论了如何设计支持并发访问的数据结构。您已经看到,设计可以基于互斥锁,也可以是无锁的。无论采用哪种方式,要考虑的问题不仅仅是这些数据结构的基本功能 — 具体来说,必须一直记住线程会争夺执行权,要考虑线程重新执行时如何恢复操作。目前,解决方案(尤其是无锁解决方案)与平台/编译器紧密相关。请研究用于实现线程和锁的 Boost 库,阅读 John Valois 关于无锁链表的文章(见参考资料 中的链接)。C++0x
标准提供了 std::thread
类,但是目前大多数编译器对它的支持很有限,甚至不支持它。
转自(https://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html)