随着计算机硬件的发展,多线程编程在项目开发中越发重要。Java提供了诸如ConCurrentHashMap、CopyOnWriteArrayList等并发容器,而C++的STL容器都不支持并发。在实际开发中如果在多线程开发中对STL的容器进行加锁操作,一是会引起散弹效应(多线程使用容器的各处都散落着锁的处理),二是控制粒度不受控。自然将多线程的支持封装在容器内部可以让代码更加清晰。
STL容器
C++提供的STL容器大大方便了C++从业者,它在一定程度上提升了开发者的效率,其中STL迭代器的设计将容器与算法分离,备受推崇。但是在多线程开发中吹毛求疵的去看待STL迭代器,STL的迭代器提供了对STL容器内数据直接操作的能力,使得容器的封闭性被破坏。当然STL容器本身产生时间较为久远,当时C++不支持lambda及函数式编程。站在当下,STL设计者会给出更为优雅的设计方式。由于STL容器迭代器暴露了容器数据使得多线程安全变得很困难,所以本文的并发容器采用函数式编程的方式来避免此问题。
C++并发容器
C++并发map
C++的STL的map容器有两种,一种是基于红黑树的std::map,一种是C++11引入的基于hash表的unordered_map。因为map的操作会引起整棵红黑树的变化,所以如果支持多线程需要对整棵树进行锁定,会造成效率的问题。而unordered_map由于使用拉链法进行存储,不需要对整体进行锁定。参考Java的提供的ConCurrentHashMap,提供C++版的ConCurrentHashMap,但该容器不提供扩容操作,由于拉链法的hash表为数组+链表方式存储。本文为了简化代码,使用数组+map的方式存储。每个Bucket的锁是分离的,这样简单的实现了多线程的细粒度的锁控制。代码如下:
template<typename K, typename V, typename Hash = std::hash<K>>
class ConcurrentHashMap : NonCopyable
{
public:
ConcurrentHashMap(unsigned bucketNumber = kDefaultBucketNum, const Hash &hash = Hash())
: table_(bucketNumber),
hash_(hash)
{
}
template<typename Predicate>
bool for_one(const K &key, Predicate &p)
{
return table_[hashcode(key)].for_one(key, p);
}
template<typename Predicate>
void for_each(Predicate &p)
{
for (auto &bucket : table_)
{
bucket.for_each(p);
}
}
void insert(const K &key, V &&value)
{
table_[hashcode(key)].insert(key, std::move(value));
}
void put(const K &key, V &&value)
{
table_[hashcode(key)].put(key, std::move(value));
}
void erase(const K &key)
{
table_[hashcode(key)].erase(key);
}
std::size_t size() const
{
std::size_t size = 0;
for (auto &bucket : table_)
{
size += bucket.size();
}
return size;
}
std::size_t count(const K &key) const
{
return table_[hashcode(key)].count(key);
}
private:
static const unsigned kDefaultBucketNum = 31; //Prime Number is better
class Bucket
{
public:
void insert(const K &key, V &&value)
{
std::lock_guard<std::mutex> lock(mutex_);
item_.emplace(key, std::move(value));
}
void put(const K &key, V &&value)
{
std::lock_guard<std::mutex> lock(mutex_);
item_.erase(key);
item_.emplace(key, std::move(value));
}
void erase(const K &key)
{
std::lock_guard<std::mutex> lock(mutex_);
item_.erase(key);
}
template<typename Predicate>
bool for_one(const K &key, Predicate &p)
{
std::lock_guard<std::mutex> lock(mutex_);
const ConstIterator it = item_.find(key);
return it == item_.end() ? false : (p(it->second), true);
}
template<typename Predicate>
void for_each(Predicate &p)
{
std::lock_guard<std::mutex> lock(mutex_);
std::for_each(item_.begin(), item_.end(), p);
}
std::size_t size() const
{
std::lock_guard<std::mutex> lock(mutex_);
return item_.size();
}
std::size_t count(const K &key) const
{
std::lock_guard<std::mutex> lock(mutex_);
return item_.count(key);
}
private:
using Item = std::map<K, V>;
using ConstIterator = typename Item::const_iterator;
Item item_;
mutable std::mutex mutex_;
};
inline std::size_t hashcode(const K &key)
{
return hash_(key) % table_.size();
}
std::vector<Bucket> table_;
Hash hash_;
};
C++并发list
C++的STL的list容器有两种,一种是双向链表std::list,一种是C++11引入的单向链表std::forward_list。如果在多线程中对链表进行并发读写当然可以简单粗暴的使用锁锁住整个链表,但这同样存在性能低下的问题。因此考虑更细粒度的单个节点的锁控制,但是如果使用双向链表,由于节点可能从两个方向都访问,使得锁的控制非常复杂,因此使用单向链表来包装并发的list,以删除一个节点为例,只要获取到该节点前一个节点的锁,其他线程就无法对该节点进行操作,这样保证了多线程的安全性。
template<typename T>
class ConcurrentList : NonCopyable
{
public:
ConcurrentList(){}
~ConcurrentList()
{
remove_if([](T&) {return true;});
}
void push_front(T&& value)
{
std::unique_ptr<Node> next(new Node(std::move(value)));
std::lock_guard<std::mutex> lk(head_.mutex_);
next->next_ = std::move(head_.next_);
head_.next_ = std::move(next);
}
template<typename Predicate>
void for_each(Predicate p)
{
Node* current = &head_;
std::unique_lock<std::mutex> current_lock(current->mutex_);
while (Node* const next = current->next_.get())
{
std::unique_lock<std::mutex> next_lock(next->mutex_);
current_lock.unlock();
p(next->value_);
current = next;
current_lock = std::move(next_lock);
}
}
template<typename Predicate, typename Consumer>
bool for_one(Predicate p, Consumer c)
{
Node* current = &head_;
std::unique_lock<std::mutex> current_lock(current->mutex_);
while (Node* const next = current->next_.get())
{
std::unique_lock<std::mutex> next_lock(next->mutex_);
current_lock.unlock();
if (p(next->value_))
{
c(next->value_);
return true;
}
current = next;
current_lock = std::move(next_lock);
}
return false;
}
template<typename Predicate>
void remove_if(Predicate p)
{
Node* current = &head_;
std::unique_lock<std::mutex> current_lock(head_.mutex_);
while (Node* const next = current->next_.get())
{
std::unique_lock<std::mutex> next_lock(next->mutex_);
if (p(next->value_))
{
std::unique_ptr<Node> old_next = std::move(current->next_);
current->next_ = std::move(next->next_);
next_lock.unlock();
} else {
current_lock.unlock();
current = next;
current_lock = std::move(next_lock);
}
}
}
struct Node
{
Node() : next_(){}
Node(T&& value) : value_(std::move(value)){}
std::mutex mutex_;
T value_;
std::unique_ptr<Node> next_;
};
Node head_;
};
C++并发Queue
C++的STL的Queue容器有两种,一种是双端队列std::deque,一种是单端队列std::queue。多线程的队列就是典型的生产者消费者模型。
template<typename T>
class BlockingQueue
{
public:
using MutexLockGuard = std::lock_guard<std::mutex>;
BlockingQueue()
: _mutex(),
_notEmpty(),
_queue()
{
}
BlockingQueue(const BlockingQueue &) = delete;
BlockingQueue &operator=(const BlockingQueue &) = delete;
void put(const T &x)
{
{
MutexLockGuard lock(_mutex);
_queue.push_back(x);
}
_notEmpty.notify_one();
}
void put(T &&x)
{
{
MutexLockGuard lock(_mutex);
_queue.push_back(std::move(x));
}
_notEmpty.notify_one();
}
T take()
{
std::unique_lock<std::mutex> lock(_mutex);
_notEmpty.wait(lock, [this]
{ return !this->_queue.empty();});
assert(!_queue.empty());
T front(std::move(_queue.front()));
_queue.pop_front();
return front;
}
size_t size() const
{
MutexLockGuard lock(_mutex);
return _queue.size();
}
private:
mutable std::mutex _mutex;
std::condition_variable _notEmpty;
std::queue<T> _queue;
};
C++并发容器使用-函数式编程
并发容器提供的接口大都是函数式接口,最后给出函数式编程的三种形式。函数式编程将过程性的操作提取为有意义的类或lambda表达式,语义性更强。
lambda表达式
TEST(MapTest, test_insert)
{
ConcurrentHashMap<int, int> map;
for (int i = 0; i < 1000; ++i)
{
map.insert(i, std::move(i));
}
auto runnable = [&map]
{
for (int i = 0; i < 1000; ++i)
{
map.insert(i, i + 100);
}
};
std::vector<std::thread> threads;
for (int i = 0; i < 100; ++i)
threads.emplace_back(runnable);
for (auto &thread : threads)
thread.join();
EXPECT_EQ(map.size(), 1000u);
for (int i = 0; i < 1000; ++i)
{
int num = 10000;
auto predicate = [=, &num](const int &a)
{
num = a;
};
map.for_one(i, predicate);
EXPECT_EQ(num, i);
}
}
仿函数
struct PlusTen
{
void operator()(std::pair<A* const, std::unique_ptr<A>>& p) const
{
p.second->a += 10;
}
};
struct NumberLessThanTen{
NumberLessThanTen(int num):num(num){}
void operator()(std::pair<A* const, std::unique_ptr<A>>& p) const
{
p.second->a < 10 ? num++ : 0;
}
mutable int num;
};
TEST(MapTest, test_for_each)
{
ConcurrentHashMap<A*, std::unique_ptr<A>> map;
for (int i = 0; i < 10; ++i)
{
std::unique_ptr<A> a(new A(i));
map.put(a.get(), std::move(a));
}
PlusTen plus;
map.for_each(plus);
NumberLessThanTen less(0);
map.for_each(less);
EXPECT_EQ(less.num, 0);
}
std::bind
struct OfNumber{
void isEven(int &number, std::pair<A* const, std::unique_ptr<A>>& p) const
{
p.second->a %2 == 0 ? number++ : 0;
}
};
TEST(MapTest, test_for_OfNumber)
{
ConcurrentHashMap<A*, std::unique_ptr<A>> map;
for (int i = 0; i < 10; ++i)
{
std::unique_ptr<A> a(new A(i));
map.put(a.get(), std::move(a));
}
OfNumber rule;
int number = 0;
auto f = std::bind(&OfNumber::isEven, std::ref(rule), std::ref(number), std::placeholders::_1);
map.for_each(f);
EXPECT_EQ(number, 5);
}
WalkeR-ZG