前言
WebRTC中的线程管理是通过ThreadManager对象来实施的,该类起着牧羊者的作用,rtc::Thread类对象就是羊群。其通过什么样的技术来实现对rtc::Thread管理的?在不同的系统平台下如何实现?下文将进行阐述。
该类的声明和定义与Thread类一样,位于rtc_base目录下的thread.h与thread.cc文件中。先看其类的声明:
class ThreadManager {
public:
static const int kForever = -1;
static ThreadManager* Instance();
Thread* CurrentThread();
void SetCurrentThread(Thread* thread);
Thread* WrapCurrentThread();
void UnwrapCurrentThread();
bool IsMainThread();
private:
ThreadManager();
~ThreadManager();
#if defined(WEBRTC_POSIX)
pthread_key_t key_;
#endif
#if defined(WEBRTC_WIN)
const DWORD key_;
#endif
const PlatformThreadRef main_thread_ref_;
RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
};
ThreadManager的构造
ThreadManager实现为单例模式,通过静态方法Instance()来获取唯一的实例。其构造与 析构函数均声明为private。
先看静态方法Instance的实现:
ThreadManager* ThreadManager::Instance() {
static ThreadManager* const thread_manager = new ThreadManager();
return thread_manager;
}
该方法很简单,但是注意这个方法不是线程安全的,那么在WebRTC的多线程环境下是如何保证ThreadManager对象被安全的构造?WebRTC通过一定机制确保了Instance()方法第一次的调用肯定是在单线程的环境下,也即在主线程中被调用,因此是线程安全的。如何实现这点?
- WebRTC中启动新线程的标准方法是通过创建Thread对象,然后调用Thread.Start()方法来启用新的线程,而该方法的内部会直接调用一次Insance(),如下截图:
-
WebRTC启动新线程的非标准方法,即用户继承了Thread对象,并且不能通过Thread.Start()方法来启用新线程。此时,WebRTC中是如何保证这点的?如下截图,Thread的WrapCurrent()方法的说明以及其实现说明了此种情况:
继承Thread的类,如果不能通过Thread.Start()来启动线程时,应该在构造中调用WrapCurrent()方法,该方法如下图所示,首先就会调用ThreadManager::Instance()来获取ThreadManager的单例对象。
至此,WebRTC通过上述的方式确保了ThreadManager对象被安全的构造。
private构造函数的实现:
#if defined(WEBRTC_POSIX)
ThreadManager::ThreadManager() : main_thread_ref_(CurrentThreadRef()) {
#if defined(WEBRTC_MAC)
InitCocoaMultiThreading();
#endif
pthread_key_create(&key_, nullptr);
}
#endif
#if defined(WEBRTC_WIN)
ThreadManager::ThreadManager()
: key_(TlsAlloc()), main_thread_ref_(CurrentThreadRef()) {}
#endif
我们可以看到在Windows和类Unix系统中实现进行了区分,WEBRTC_POSIX宏表征该系统是类Unix系统,而WEBRTC_WIN宏表征是Windows系统。虽然实现稍微有些许不同,在MAC下还需要调用InitCocoaMultiThreading()方法来初始化多线程库。但是两个构造函数均初始化了成员key_与main_thread_ref_(我们可以看到WebRTC中的私有成员均以下划线结尾)。其中key是线程管理的关键。
key_的初始化:在Windows平台上,key_被声明为DWORD类型,赋值为TlsAlloc()函数的返回值,TlsAlloc()函数是Windows的系统API,Tls表示的是线程局部存储Thread Local Storage的缩写,其为每个可能的线程分配了一个线程局部变量的槽位,该槽位用来存储WebRTC的Thread线程对象指针。如果不了解相关概念,可以看微软的官方文档,或者TLS--线程局部存储这篇博客来了解。在类Unix系统上,key_被声明pthread_key_t类型,使用方法pthread_key_create(&key_, nullptr);赋值。实质是类Unix系统上的线程局部存储实现,隶属于线程库pthread,因此方法与变量均以pthread开头。总之,在ThreadManager的构造之初,WebRTC就为各个线程所对应的Thread对象制造了一个线程局部变量的槽位,成为多线程管理的关键。
main_thread_ref_的初始化:该成员为PlatformThreadRef类型的对象,赋值为CurrentThreadRef()方法的返回值,如下源码所示:在Windows系统下,取值为WinAPI GetCurrentThreadId()返回的当前线程描述符,DWORD类型;在FUCHSIA系统下(该系统是Google新开发的操作系统,像Android还是基于Linux内核属于类Unix范畴,遵循POSIX规范,但FUCHSIA是基于新内核zircon开发的),返回zx_thread_self(),zx_handle_t类型;在类Unix系统下,通过pthread库的pthread_self()返回,pthread_t类型。总之,如前文所述,这部分代码肯定是在主线程中所运行,因此,main_thread_ref_存储了主线程TID在不同平台下的不同表示。
PlatformThreadRef CurrentThreadRef() {
#if defined(WEBRTC_WIN)
return GetCurrentThreadId();
#elif defined(WEBRTC_FUCHSIA)
return zx_thread_self();
#elif defined(WEBRTC_POSIX)
return pthread_self();
#endif
}
private析构函数的实现:
ThreadManager::~ThreadManager() {
// By above RTC_DEFINE_STATIC_LOCAL.
RTC_NOTREACHED() << "ThreadManager should never be destructed.";
}
根据日志,我们看到ThreadManager单例对象的析构函数是永不会被调用的,直到整个进程结束自动去释放该对象所占用的空间。否则,会触发断言,在标准错误输出上述错误日志后,调用系统的abort()函数。后续会对RTC_NOTREACHED宏进行展开描述,看看其究竟是如何处理的。
获取,设置当前线程关联的Thread对象
#if defined(WEBRTC_WIN)
Thread* ThreadManager::CurrentThread() {
return static_cast<Thread*>(TlsGetValue(key_));
}
void ThreadManager::SetCurrentThread(Thread* thread) {
RTC_DCHECK(!CurrentThread() || !thread);
TlsSetValue(key_, thread);
}
#endif
#if defined(WEBRTC_POSIX)
Thread* ThreadManager::CurrentThread() {
return static_cast<Thread*>(pthread_getspecific(key_));
}
void ThreadManager::SetCurrentThread(Thread* thread) {
#if RTC_DLOG_IS_ON
if (CurrentThread() && thread) {
RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?";
}
#endif // RTC_DLOG_IS_ON
pthread_setspecific(key_, thread);
}
#endif
如前文所述,不论是何种平台,在ThreadManager的构造之初就为Thread指针分配了线程局部存储的槽位key_,通过不同平台的get,set方法就可以将当前线程所关联的Thread对象指针从该槽位取出或设置进去。但是,有这么几个点需要注意:
- Thread是用户层线程的表征,可以通过其来访问,操作该线程在内核中的数据结构。但用户层和内核层的线程表征,二者并非是共存关系。以主线程来说,进程一运行起来其线程内核结构就存在,但是用户层主线程的表征Thread对象是不存在的,因此,在程序入口main()函数开头调用ThreadManager::CurrentThread()方法,得到的必然是空指针。如果想要将主线程纳入管理,必然要先创建一个Thread对象,然后调用ThreadManager::SetCurrentThread(Thread* thread)设置到当前线程的线程局部存储的槽位中。正如example目录下的peerconnection_client示例工程那样做的,其中Win32Thread就是Thread类的子类。
- 对于非主线程,如何纳入管理?由前文所说,主线程外,WebRTC的其他线程以Thread.Start()来启动,新的线程中会执行Thread.PreRun()方法。该方法中就调用了ThreadManager::SetCurrentThread(Thread* thread)方法将新的线程纳入ThreadManager的管理,在线程结束后,调用ThreadManager::SetCurrentThread(nullptr)解除管理。
包装当前线程为Thread对象,当前线程去包装
Thread* ThreadManager::WrapCurrentThread() {
Thread* result = CurrentThread();
if (nullptr == result) {
result = new Thread(SocketServer::CreateDefault());
result->WrapCurrentWithThreadManager(this, true);
}
return result;
}
如果已经有Thread对象与当前线程关联,那么直接返回该对象。否则构造一个新的Thread对象,并通过该对象的WrapCurrentWithThreadManager()方法将新建的Thread对象纳入ThreadManager的管理之中:
bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
bool need_synchronize_access) {
RTC_DCHECK(!IsRunning());
#if defined(WEBRTC_WIN)
if (need_synchronize_access) {
// We explicitly ask for no rights other than synchronization.
// This gives us the best chance of succeeding.
thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
if (!thread_) {
RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
return false;
}
thread_id_ = GetCurrentThreadId();
}
#elif defined(WEBRTC_POSIX)
thread_ = pthread_self();
#endif
owned_ = false;
thread_manager->SetCurrentThread(this);
return true;
}
在Windows系统与类Unix系统下的差别一点在于Thread.thread_的赋值方式。Windows系统上,使用OpenThread()来打开当前已存在的线程,获取其句柄,此处注明只获取该线程的同步操作权限,也即在该线程进行Wait等操作,这样能提高该方法的成功率;而类Unix系统上,使用pthread库的pthread_self()方法来获取当前线程pthread_t对象。另外将Thread.owned_标志位置位false,表示该线程对象是通过wrap而来,而非调用Thread.Start的标准方式而来。最后使用 ThreadManager. SetCurrentThread方法将新创建的Thread对象纳入管理。
void ThreadManager::UnwrapCurrentThread() {
Thread* t = CurrentThread();
if (t && !(t->IsOwned())) {
t->UnwrapCurrent();
delete t;
}
}
对于线程的unwrap操作,会根据该线程是不是wrap而来,即owned_是否为false,来决定是否进行正真的unwrap操作。如果是的话,就调用Thread.UnwrapCurrent方法进行实际操作,并最后删除Thread对象。
void Thread::UnwrapCurrent() {
// Clears the platform-specific thread-specific storage.
ThreadManager::Instance()->SetCurrentThread(nullptr);
#if defined(WEBRTC_WIN)
if (thread_ != nullptr) {
if (!CloseHandle(thread_)) {
RTC_LOG_GLE(LS_ERROR)
<< "When unwrapping thread, failed to close handle.";
}
thread_ = nullptr;
thread_id_ = 0;
}
#elif defined(WEBRTC_POSIX)
thread_ = 0;
#endif
}
unwrap操作首先需要将当前线程对象所占的槽位置空,即调用ThreadManager::Instance()->SetCurrentThread(nullptr); 来完成。其次是销毁线程的句柄,在Windows下需要先调用CloseHandle(thread_)销毁句柄,然后句柄置空,类Unix系统下直接将成员thread_置空即可。
判断当前线程是否为主线程
bool ThreadManager::IsMainThread() {
return IsThreadRefEqual(CurrentThreadRef(), main_thread_ref_);
}
bool IsThreadRefEqual(const PlatformThreadRef& a, const PlatformThreadRef& b) {
#if defined(WEBRTC_WIN) || defined(WEBRTC_FUCHSIA)
return a == b;
#elif defined(WEBRTC_POSIX)
return pthread_equal(a, b);
#endif
}
没有太多要说明的,注意类Unix系统下使用pthread库中的pthread_equal()方法进行判断。
总结
- WebRTC中ThreadManager类扮演者牧羊者的角色,通过线程局部存储(TLS,Thread Local Storage)的技术提供了对Thread管理,而这种管理其实就是为每个线程包装一个与其相关联的rtc::Thread类对象,并将该对象的地址存储在线程本身的局部存储的某个槽中。
- 不同平台的TLS实现API是不一样的,在Windows上通过Windows API TlsAlloc()来分配槽位(即key值),对应于类Unix系统使用pthread库中的pthread_key_create()来分配;为获取或者设置槽位中的值(即Thread对象地址),在Windows上使用TlsGetValue()和TlsSetValue(),对应于类Unix系统使用pthread_getspecific()与pthread_setspecific()
- ThreadManager类对象是个单例,但是一个非线程安全的实现,如何保证ThreadManager对象的在WebRTC的多线程环境下安全的初始化?
以上就是本文所阐述的基本内容。