前言
MessageQueue提供了两方面的功能,消息循环中的消息队列功能以及通过持有SocketServer对象带来的IO多路复用功能。在MessageQueue内部这两部分功能不是完全孤立的,而是相互配合在一起使用。尤其是在MessageQueue的核心方法Get()中体现得淋漓尽致。
MessageQueue的实现位于rtc_base/message_queue.h以及rtc_base/message_queue.cc中,其声明如下:
class MessageQueue {
public:
static const int kForever = -1;
MessageQueue(SocketServer* ss, bool init_queue);
MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue);
virtual ~MessageQueue();
SocketServer* socketserver();
virtual void Quit();
virtual bool IsQuitting();
virtual void Restart();
virtual bool IsProcessingMessagesForTesting();
virtual bool Get(Message* pmsg, int cmsWait = kForever, bool process_io = true);
virtual bool Peek(Message* pmsg, int cmsWait = 0);
virtual void Post(const Location& posted_from, MessageHandler* phandler,
uint32_t id = 0, MessageData* pdata = nullptr, bool time_sensitive = false);
virtual void PostDelayed(const Location& posted_from, int cmsDelay,
MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
virtual void PostAt(const Location& posted_from, int64_t tstamp,
MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
// TODO(honghaiz): Remove this when all the dependencies are removed.
virtual void PostAt(const Location& posted_from, uint32_t tstamp,
MessageHandler* phandler, uint32_t id = 0, MessageData* pdata = nullptr);
virtual void Clear(MessageHandler* phandler, uint32_t id = MQID_ANY,
MessageList* removed = nullptr);
virtual void Dispatch(Message* pmsg);
virtual void ReceiveSends();
// Amount of time until the next message can be retrieved
virtual int GetDelay();
bool empty() const { return size() == 0u; }
size_t size() const {
CritScope cs(&crit_); // msgq_.size() is not thread safe.
return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
}
// Internally posts a message which causes the doomed object to be deleted
template <class T>
void Dispose(T* doomed) {
if (doomed) {
Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
}
}
sigslot::signal0<> SignalQueueDestroyed;
protected:
class PriorityQueue : public std::priority_queue<DelayedMessage> {
public:
container_type& container() { return c; }
void reheap() { make_heap(c.begin(), c.end(), comp); }
};
void DoDelayPost(const Location& posted_from, int64_t cmsDelay, int64_t tstamp,
MessageHandler* phandler, uint32_t id, MessageData* pdata);
void DoInit();
void ClearInternal(MessageHandler* phandler, uint32_t id,
MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
void WakeUpSocketServer();
bool fPeekKeep_;
Message msgPeek_;
MessageList msgq_ RTC_GUARDED_BY(crit_);
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_);
CriticalSection crit_;
bool fInitialized_;
bool fDestroyed_;
private:
volatile int stop_;
SocketServer* const ss_;
std::unique_ptr<SocketServer> own_ss_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue);
};
MQ的基本成员
根据功能划分,可以将MQ的基本成员分为三类
MQ状态指示:
- bool fInitialized_:指示MQ已经被初始化,即已经被添加到MQM的管理队列中;
- bool fDestroyed_:指示MQ已经被销毁,即已经被MQM移除,并且MQ将立马被析构;
- volatile int stop_:指示MQ是否已经Quit,即停止工作,不继续接受处理消息;
消息循环相关:MQ中有3个地方存储了消息:msgPeek_,msgq_ ,dmsgq_ 。
- MessageList msgq_ RTC_GUARDED_BY(crit_):即时消息列表,存储即时消息,先入先出;
- PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_):延迟消息列表,存储延迟消息,按触发时间排序;
- uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_):下一个延迟消息的序号,单调递增;
- Message msgPeek_:存储被Peek出来的一个即时消息;
- bool fPeekKeep_:指示是否存在一个被Peek出来的消息;
IO多路复用相关:
- SocketServer* const ss_:持有的SocketServer类,用以完成IO多路复用操作;
- std::unique_ptr<SocketServer> own_ss_: 与ss_一样,只是经过转移语句,使得该SocketServer对象只由该MQ持有。
MQ的构造及析构
构造 做了这么几件事:初始化所有的成员;断言ss不能传空指针,将MQ的指针传递给ss使得二者相互持有相互访问;将MQ加入到MQM的管理指针,fInitialized_标志置为true,指示该MQ已经初始化完成。
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
: fPeekKeep_(false), dmsgq_next_num_(0), fInitialized_(false),
fDestroyed_(false), stop_(0), ss_(ss) {
RTC_DCHECK(ss);
ss_->SetMessageQueue(this);
if (init_queue) {
DoInit();
}
}
MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
: MessageQueue(ss.get(), init_queue) {
own_ss_ = std::move(ss);
}
void MessageQueue::DoInit() {
if (fInitialized_) {
return;
}
fInitialized_ = true;
MessageQueueManager::Add(this);
}
析构 基本上是构造的逆操作:设置fDestroyed_为true,表示MQ被销毁;发送信号SignalQueueDestroyed()告知关注了MQ的对象不要再访问该MQ了;从MQM中移除自己;清理MQ中的所有消息;从ss中移除MQ的指针。
MessageQueue::~MessageQueue() {
DoDestroy();
}
void MessageQueue::DoDestroy() {
if (fDestroyed_) {
return;
}
fDestroyed_ = true;
// The signal is done from here to ensure
// that it always gets called when the queue
// is going away.
SignalQueueDestroyed();
MessageQueueManager::Remove(this);
ClearInternal(nullptr, MQID_ANY, nullptr);
if (ss_) {
ss_->SetMessageQueue(nullptr);
}
}
MQ的Size
由于MQ有3个地方存储了消息,一个是Peek消息msgPeek_,一个即时消息队列msgq_,一个是延迟消息队列dmsgq_。那么计算MQ中存储的消息个数时,这三个地方都得算上。
bool empty() const { return size() == 0u; }
size_t size() const {
CritScope cs(&crit_); // msgq_.size() is not thread safe.
return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u);
}
MQ的运行状态
MQ的运行状态由成员volatile int stop_来指示,相关的函数有以下几个,如源码所示。方法都非常简单,无非就是对stop_变量进行原子性的置1和置0,本身就是线程安全的,所有并没有上锁。另外需要知道的几点如下:
- 当MQ停止运行后,MQ将不再接受处理Send,Post消息;
- 当MQ停止运行时,已经成功投递的消息仍将会被处理;
- 为了确保上述这点,MessageHandler的销毁和MessageQueue的销毁是独立,分开的;
- 并非所有的MQ都会处理消息,比如SignalThread线程。在这种情况下,为了确定投递的消息是否会被处理,应该使用IsProcessingMessagesForTesting()探知下。
void MessageQueue::Quit() {
AtomicOps::ReleaseStore(&stop_, 1);
WakeUpSocketServer();
}
bool MessageQueue::IsQuitting() {
return AtomicOps::AcquireLoad(&stop_) != 0;
}
bool MessageQueue::IsProcessingMessagesForTesting() {
return !IsQuitting();
}
void MessageQueue::Restart() {
AtomicOps::ReleaseStore(&stop_, 0);
}
消息获取
MQ中消息获取相关的函数有两个,Peek()与Get(),其中Get()是核心内容,再看Get()方法之前,先看看Peek()方法干了啥~~
Peek(): 简而言之就是查看之前是否已经Peek过一个MSG到成员msgPeek_中,若已经Peek过一个则直接将该消息返回;若没有,则通过Get()方法从消息队列中取出一个消息,成功则将该消息交给msgPeek_成员,并将fPeekKeep_标志置为true。
bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
// fPeekKeep_为真,表示已经Peek过一个MSG到msgPeek_
// 直接将该MSG返回
if (fPeekKeep_) {
*pmsg = msgPeek_;
return true;
}
// 若没有之前没有Peek过一个MSG
if (!Get(pmsg, cmsWait))
return false;
//将Get到的消息放在msgPeek_中保存,并设置标志位
msgPeek_ = *pmsg;
fPeekKeep_ = true;
return true;
}
Get():方法的声明如下所示,注释说明了Get()方法的内部算法的流程,Get()方法会阻塞的处理IO,直到有消息可以处理 或者 cmsWait时间已经过去 或者 Stop()方法被调用。
// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
// 3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,
int cmsWait = kForever,
bool process_io = true);
源码如下:
bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
// 是否存在一个Peek过的消息没有被处理?
// 优先处理该消息 // 步骤1
if (fPeekKeep_) {
*pmsg = msgPeek_;
fPeekKeep_ = false;
return true;
}
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = TimeMillis();
int64_t msCurrent = msStart;
while (true) {
// 检查是否有send消息,若存在,先阻塞处理send消息 // 步骤2
ReceiveSends();
// 检查所有post消息(即时消息+延迟消息)
int64_t cmsDelayNext = kForever;
bool first_pass = true;
while (true) {
// 上锁进行消息队列的访问
{
CritScope cs(&crit_);
// 内部第一次循环,先检查延迟消息队列 // 步骤3
if (first_pass) {
first_pass = false;
// 将延迟消息队列dmsgq_中已经超过触发时间的消息全部取出放入到即时消息队列msgq_中
// 计算当前时间距离下一个最早将要到达触发时间的消息还有多长时间cmsDelayNext。
while (!dmsgq_.empty()) {
if (msCurrent < dmsgq_.top().msTrigger_) {
cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
break;
}
msgq_.push_back(dmsgq_.top().msg_);
dmsgq_.pop();
}
}
// 从即时消息队列msgq_队首取出第一个消息 // 步骤4
if (msgq_.empty()) {
break;
} else {
*pmsg = msgq_.front();
msgq_.pop_front();
}
} // crit_ is released here.
// 如果消息对时间敏感,那么如果超过了最大忍耐时间kMaxMsgLatency才被处理
// 则打印警告日志
if (pmsg->ts_sensitive) {
int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
if (delay > 0) {
RTC_LOG_F(LS_WARNING)
<< "id: " << pmsg->message_id
<< " delay: " << (delay + kMaxMsgLatency) << "ms";
}
}
// 如果取出是需要销毁的消息,则销毁该消息,继续取下一个消息。
if (MQID_DISPOSE == pmsg->message_id) {
RTC_DCHECK(nullptr == pmsg->phandler);
delete pmsg->pdata;
*pmsg = Message();
continue;
}
return true;
}
// 走到这,说明当前没有消息要处理,很可能是处于Quit状态了,先判断一下
if (IsQuitting())
break;
// 计算留给IO处理的时间 // 步骤5
int64_t cmsNext;
if (cmsWait == kForever) { //Get无限期,那么距离下个延迟消息的时间就作为本次IO处理时间
cmsNext = cmsDelayNext;
} else { // Get有超时时间,计算本次IO处理时间
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); // 总体来说还剩多少时间
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) // 总体剩余时间和下一个延迟消息触发时间谁先到达?取其短者
cmsNext = cmsDelayNext;
}
{
// 阻塞处理IO多路复用 // 步骤6
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
return false;
}
// 计算是否所有时间都已耗尽,是否进入下一个大循环 // 步骤7
msCurrent = TimeMillis();
cmsElapsed = TimeDiff(msCurrent, msStart);
if (cmsWait != kForever) {
if (cmsElapsed >= cmsWait)
return false;
}
}
return false;
}
上述算法过程是整个消息循环的核心内容,如上注释,大概可以分为7个步骤:
- 检查Peek消息。先检查之前是否已经Peek过一个消息到msgPeek_还未被处理,若有,当前就处理该消息吧,函数返回~ 若无,继续处理其他消息。
- 通过执行 ReceiveSends() 方法来处理所有Send消息。在MQ中该方法为virtual方法,啥也不干,Thread类继承MQ后会实现该方法,在此方法中处理所有Send消息。因此,消息循环中其实优先,阻塞地先处理所有Send消息,实现跨线程的Send消息方法。
- 处理Post消息中的延迟消息。从延迟消息队列dmsgq_中取出所有已经到达触发时间点的延迟消息,并塞入即时消息队列msgq_的队尾。同时计算下一个延迟消息还过多久将被触发(如果延迟消息队列中还有未超时的消息),这个时间可能会被作为后续IO多路复用处理的超时时间。这点在redis,nginx上理念一致。
- 处理即时消息。取出即时消息队列msgq_的队首消息。若该消息是个要销毁的消息,那么销毁该消息,并取下一个即时消息;若取到一个非要销毁的即时消息,那么就先处理该即时消息吧,函数返回;若本步骤没有取到即时消息,表示当前没有消息要处理,那干点啥好呢~处理网络IO吧
-
计算留给网络IO的时间。消息处理才是迫切的,网络IO嘛,看我能给你分配多少时间吧~~ 分两种情形来对待:
1)若外部Get方法无限期,那么下一个延迟消息触发时间到来之前我都可以用来处理IO;若是延迟队列中没有延迟消息呢?也就是消息循环队列中没有任何要处理的消息了,那当然我就可以无限期地,阻塞地将时间都用来处理IO了,直到有消息进入消息队列,将消息循环从IO处理中唤醒为止,继续处理消息。
2)若外部Get方法是有超时时间的,那么我们有必要先计算下已经花费了多长时间,到此刻,我们总共最多还剩多长时间留给IO处理。将总剩余时间跟下一个延迟消息触发时间做个比较,哪个小取哪个作为IO处理的时间;若是延迟队列中没有延迟消息呢?那就将剩下的所有时间都交给IO处理咯,反正也没有消息要处理~ - IO多路复用处理。 阻塞地花费上述计算好的时间进行IO处理。过程中要是处理出错,则函数返回;若是处理时间耗完或者时间没有耗完,但是有新消息进入循环了使得阻塞式的IO处理被唤醒,那么进入下个步骤。
- 计算剩余时间。既然消息已经被处理完过一次,IO也处理完了,先计算下是不是所有时间都已经耗尽?耗尽时间了,我还没找到可用的即时消息,sorry~函数返回false;没有耗尽的话,那么我们计算下剩余的时间,并将剩余的时间把2-7过程再来一遍吧:处理Send消息,检查延迟消息,检查并返回即时消息,再次计算IO处理时间,IO处理,再次计算剩余时间。什么?为什么没有重复步骤1检查Peek消息?同一个线程中我既然在执行Get,怎么可能Peek嘛,怎么可能又蹦跶出一个Peek消息呢?
消息投递
MQ中消息投递相关的函数有这么几个:Post(),PostDelayed(),两个PostAt(),DoDelayPost()。其中Post()用于投递即时消息;PostDelayed(),两个PostAt()用于投递延迟消息,内部都是调用DoDelayPost()来实现。
Post(): 即时消息的投递,源码如下。主要就是将函数的入参封装成一个即时消息Message对象,然后放置到即时队列msgq_的队尾。需要注意的有四点:
- 如果消息循环已经处理停止状态,即stop_状态值不为0,那么消息循环拒绝消息入队,消息数据会被释放掉。此时,投递消息者是不知情的。
- 如果消息对时间敏感,即想知道该消息是否即时被处理了,最大延迟不超过kMaxMsgLatency 150ms;
- 为了线程安全,队列的入队操作是需要加锁的,CritScope cs(&crit_)对象的构造和析构确保了这点;
- 消息入队后,由于处理消息是首要任务,因此,需要调用WakeUpSocketServer()使得IO多路复用的处理赶紧返回,即调用ss_->WakeUp()方法实现。由于这块儿是IO多路复用实现内容,后续会专门写文章分析,此处只要知道该方法能够使得阻塞式的IO多路复用能结束阻塞,回到消息处理上来即可。
void MessageQueue::Post(const Location& posted_from, MessageHandler* phandler,
uint32_t id, MessageData* pdata, bool time_sensitive) {
if (IsQuitting()) {
delete pdata;
return;
}
{
CritScope cs(&crit_);
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
if (time_sensitive) {
msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
}
msgq_.push_back(msg);
}
WakeUpSocketServer();
}
void MessageQueue::WakeUpSocketServer() {
ss_->WakeUp();
}
PostDelayed(),PostAt(),DoDelayPost(): 延迟消息的投递,源码如下。PostDelayed(),PostAt()均是将各自的入参稍作转换后,再调用DoDelayPost()方法,将入参封装成延迟消息DelayedMesssage,然后加入到延迟消息队列dmsgq_中,并从IO多路复用的阻塞中唤醒来处理消息。与Post()方法中的做法并无二致。需要额外注意的地方有这么几点:
- 延迟消息的序号计算,成员dmsgq_next_num_是一个uint32_t类型的数据,也即处理4,294,967,296条消息后会溢出回归到0,此时,优先级队里中消息的排序可能会受到影响。但是考虑到一点,正如源码注释上解释的那样:优先级队里中的消息会优先按照触发时间排序,那么最多影响到的不过是那些触发时间相同的消息而已。即便是影响到了部分触发时间相同的消息,那也不过是很短的时间,并不会造成很大的影响。
- 一点疑惑:任然是延迟消息序号的问题,源码上使用了RTC_DCHECK_NE(0, dmsgq_next_num_),在debug模式下进行断言。当dmsgq_next_num_溢出回归为0时,必将触发断言,那么debug模式下,只让每个消息循环处理这么多条消息?
void MessageQueue::PostDelayed(const Location& posted_from, int cmsDelay,
MessageHandler* phandler, uint32_t id, MessageData* pdata) {
return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
pdata);
}
void MessageQueue::PostAt(const Location& posted_from, uint32_t tstamp,
MessageHandler* phandler, uint32_t id, MessageData* pdata) {
// This should work even if it is used (unexpectedly).
int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
}
void MessageQueue::PostAt(const Location& posted_from, int64_t tstamp,
MessageHandler* phandler, uint32_t id, MessageData* pdata) {
return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
pdata);
}
void MessageQueue::DoDelayPost(const Location& posted_from, int64_t cmsDelay, int64_t tstamp,
MessageHandler* phandler, uint32_t id, MessageData* pdata) {
if (IsQuitting()) {
delete pdata;
return;
}
{
CritScope cs(&crit_);
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
dmsgq_.push(dmsg);
// If this message queue processes 1 message every millisecond for 50 days,
// we will wrap this number. Even then, only messages with identical times
// will be misordered, and then only briefly. This is probably ok.
++dmsgq_next_num_;
RTC_DCHECK_NE(0, dmsgq_next_num_);
}
WakeUpSocketServer();
}
消息处理
Dispatch():方法内容一目了然,先打印一条trace日志;然后记录消息处理的开始时间start_time;调用消息的MessageHandler的OnMessage方法进行消息处理;记录消息处理的结束时间end_time;计算消息处理花费了多长时间diff,如果消息花费时间过程,超过kSlowDispatchLoggingThreshold(50ms),则打印一条警告日志,告知从哪儿构建的消息花费了多长时间才消费完。
void MessageQueue::Dispatch(Message* pmsg) {
TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
pmsg->posted_from.file_and_line(), "src_func",
pmsg->posted_from.function_name());
int64_t start_time = TimeMillis();
pmsg->phandler->OnMessage(pmsg);
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= kSlowDispatchLoggingThreshold) {
RTC_LOG(LS_INFO) << "Message took " << diff
<< "ms to dispatch. Posted from: "
<< pmsg->posted_from.ToString();
}
}
消息清理
Clear(): 清理函数逻辑也相当简单,目标也相当明确,就是要讲满足条件的消息从MQ中删除。需要注意的点有以下几个:
- 线程安全,上锁处理;
- MQ中消息可能存在的位置有3个:Peek消息msgPeek_,即时消息队列msgq_,延迟消息队列dmsgq_;因此,需要从这3个地方去挨个查找能匹配的消息。
- 如果Clear()方法传入了一个MessageList* removed,匹配的消息都会进入该list;若是没有传入这样一个list,那么消息数据都将会立马销毁。
void MessageQueue::Clear(MessageHandler* phandler, uint32_t id, MessageList* removed) {
CritScope cs(&crit_);
ClearInternal(phandler, id, removed);
}
void MessageQueue::ClearInternal(MessageHandler* phandler, uint32_t id, MessageList* removed) {
// Remove messages with phandler
if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
if (removed) {
removed->push_back(msgPeek_);
} else {
delete msgPeek_.pdata;
}
fPeekKeep_ = false;
}
// Remove from ordered message queue
for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
if (it->Match(phandler, id)) {
if (removed) {
removed->push_back(*it);
} else {
delete it->pdata;
}
it = msgq_.erase(it);
} else {
++it;
}
}
// Remove from priority queue. Not directly iterable, so use this approach
PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
for (PriorityQueue::container_type::iterator it = new_end;
it != dmsgq_.container().end(); ++it) {
if (it->msg_.Match(phandler, id)) {
if (removed) {
removed->push_back(it->msg_);
} else {
delete it->msg_.pdata;
}
} else {
*new_end++ = *it;
}
}
dmsgq_.container().erase(new_end, dmsgq_.container().end());
dmsgq_.reheap();
}
销毁消息
Dispose(): 之前在WebRTC源码分析-线程基础之Message && MessageData && MessageHandler中对DisposeData消息数据专门进行过阐述。再配合之前Get()方法中对DisposeData消息数据的处理方式,我们很容易理解该函数的作用:如果想要销毁某个对象,而不方便立马销毁,那么就可以将调用消息循环的Dispose()方法让消息循环帮忙进行数据销毁。
// Internally posts a message which causes the doomed object to be deleted
template <class T>
void Dispose(T* doomed) {
if (doomed) {
Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed));
}
}