这篇文章来自 @我就叫Sunny怎么了 推荐的Mike Ash的博客,主要是讲如何自己动手实现dispatch queue的基本功能。翻译过程中个别地方稍作调整。初次翻译,欢迎纠正!
原文地址:[https://www.mikeash.com/pyblog/friday-qa-2015-09-04-lets-build-dispatch_queue.html]
代码地址:[https://github.com/mikeash/MADispatchQueue]
GCD是Apple近年来开发的很棒的API之一,在"Lets Bulid"系列最新的一期中,我将对dispatch_queue的最基础的特性做重新实现,这个主题由Rob Rix推荐。
概览
一个dispatch queue是一个由全局线程池支持的工作队列。典型的,工作提交到一个队列在后台线程异步执行。所有线程共享一个单一的后台线程池,这能使系统更高效。
这是我要重现的这个API的本质。我为了简单会忽略GCD提供的高级的特性。比如,完成在全局池中增加以及减少线程数量的大量工作,以及系统对CPU的利用。如果你有一堆任务占用了CPU并且你提交了其他任务,GCD会避免为他创建其他的工作线程,因为此时CPU使用率已经达到100%,其他的线程工作会变得低效。我将跳过这点,对线程的数量使用硬编码。我也会略过其他的特性,像定位队列和封闭并发队列。
我们的目标是关注dispatch queues的本质:串行和并发,他们能同步或者异步的派发任务,并且由一个共享的全局线程池支持。
接口:
GCD是一个C语言的API。虽然GCD对象已经在最近的OS版本中转换为OC的对象,但是API维持纯粹的C(附加苹果block的扩展)。这是一个很棒的低层API,GCD提供了非常清晰的接口,但是为了完成我的目标,我宁愿用OC来重现。
OC类名为MADispatchQueue
他只有4个调用方法:
一个获取共享全局队列的方法。GCD有多个不同优先级的全局队列,但是我们为了简单只有一个。
一个初始化方法,为了能创建并发或者串行的队列。
一个异步的dispatch调用
一个同步的dispatch调用
方法的声明:
@interface MADispatchQueue : NSObject
+ (MADispatchQueue *)globalQueue;
- (id)initSerial: (BOOL)serial;
- (void)dispatchAsync: (dispatch_block_t)block;
- (void)dispatchSync: (dispatch_block_t)block;
@end
之后去完成他们所描述的要做的事情。
线程池接口
线程池有一个简单的接口支持队列。他将会做一些实际运行中的,已提交的任务的繁重工作。队列能够可靠的在一个正确的时机提交他们队列中的任务。线程池有一个单一的任务:提交一些工作来运行。因此接口只有一个方法:
@interface MAThreadPool : NSObject
- (void)addBlock: (dispatch_block_t)block;
@end
由于这是核心,所以让我们先实现他。
线程池的实现
先来看实例变量。线程池是能够被多线程访问的,包括内部和外部,并且需要线程安全。GCD脱离他自己的方法尽可能的使用快速原子化的操作,在我的重建中我坚持使用老式的锁。我需要这个锁能够等待以及发送信号,不只是实施互斥,所以我使用了NSCondition而不是一个普通的NSLock。如果你对他不熟悉,可以理解为:NSCondition基本上就是一个锁和一个单一条件变量的封装。
NSCondition *_lock;
为了知道何时自旋向上的新增工作线程,我需要知道线程池里有多少线程,有多少实际在工作的,以及线程的最大数量:
NSUInteger _threadCount;
NSUInteger _activeThreadCount;
NSUInteger _threadCountLimit;
最终,有一串block去执行。使用NSMutableArray,通过在末尾添加新的block以及从开头移除来模拟一个队列。
NSMutableArray *_blocks;
初始化工作很简单。初始化锁,初始化block数组,使用一个任意数量来设置线程的数量限制,这里用128:
- (id)init {
if((self = [super init])) {
_lock = [[NSCondition alloc] init];
_blocks = [[NSMutableArray alloc] init];
_threadCountLimit = 128;
}
return self;
}
工作的线程在一个简单的无限循环中运行,直到blocks数组为空,状态置为等待。一旦有可获取的block,这个block会从数组中出列并执行。当我们这样做的时候,将增加活动的线程数量,那么在结束时需要减少数量。
- (void)workerThreadLoop: (id)ignore {
第一件事是获取锁,记住这必须在循环开始之前。至于理由,在循环的结束时候你将会明白这点。
[_lock lock];
无限循环:
while(1){
如果队列为空,那么锁是等待状态:
while([_blocks count] == 0) {
[_lock wait];
}
记住这个需要通过循环完成,而不是一个if语句。理由可参考:[https://en.wikipedia.org/wiki/Spurious_wakeup]
简单来说,wait这个状态即便没有signaled也有可能return,所以为了修正这种行为,当wait return时,需要重新检验条件。
一旦block可以获得,让丫的出列:
dispatch_block_t block = [_blocks firstObject];
[_blocks removeObjectAtIndex: 0];
通过增加活动线程数量来表明该线程正在活动:
_activeThreadCount++;
现在是时候执行block了,但是我们必须先释放锁,否则我们做不到并发,同时我们将会有各种各样好玩的死锁:
[_lock unlock];
锁安全的放手后,执行block:
block();
block结束后,是时候减少活动线程的数量了。这必须结合锁来完成,以避免资源竞争,循环的最后:
[_lock lock];
_activeThreadCount--;
}
}
现在你能看到为什么在循环的最上层要获取锁。循环中做的最后一件事是减少活动线程的数量,这需要保持锁的状态。在循环的顶层第一件事是检查block的队列。通过在循环外执行第一个锁,后续重复的事情的所有操作能够使用一个单一的锁来操作,二不是锁,解锁,再锁…
addBlock方法:
- (void)addBlock: (dispatch_block_t)block {
这里的每件事情都需要结合锁的获取来完成
[_lock lock];
第一个任务是添加一个新的block到block队列:
[_blocks addObject: block];
如果有一个闲置的线程准备取走这个block,那接下来没什么可做的了。如果没有足够的闲置线程来执行未完成的block,而且工作线程的数量还没有到上限,那么是时候创建一个新的线程了:
NSUInteger idleThreads = _threadCount - _activeThreadCount;
if([_blocks count] > idleThreads && _threadCount < _threadCountLimit) {
[NSThread detachNewThreadSelector: @selector(workerThreadLoop:)
toTarget: self
withObject: nil];
_threadCount++;
}
现在一个工作线程启动的所有准备工作已经完成。 假设他们都是沉睡状态,唤醒一个
[_lock signal];
然后释放锁就完成了
[_lock unlock];
}
这为我们提供了一个线程池,来产出预先设定数量的工作线程,用于为进入的block服务。现在为这个队列做基础的实现。
队列实现
像线程池一样,队列将使用锁来保护他的内容。和线程池不一样的地方是,他不需要做任何等待或者发信号的动作,只是基本的互斥,所以我们使用普通的NSLock:
NSLock *_lock;
像线程池一样,他维护一个挂起的block的队列,使用NSMutableArray:
NSMutableArray *_pendingBlocks;
队列需要知道这是串行的还是并发的:
BOOL _serial;
当这个值为真,它还需要跟踪是否有一个block在线程池中运行:
BOOL _serialRunning;
并发队列无论是否有任务在运行都表现的一样,所以不跟踪这些。
全局队列作为一个全局变量来存储,底层共享的线程池也是。他们都在+initialize方法中创建:
static MADispatchQueue *gGlobalQueue;
static MAThreadPool *gThreadPool;
+ (void)initialize {
if(self == [MADispatchQueue class]) {
gGlobalQueue = [[MADispatchQueue alloc] initSerial: NO];
gThreadPool = [[MAThreadPool alloc] init];
}
}
获取全局队列方法只是返回这个变量,因为initialize方法中确保已经创建了他:
+ (MADispatchQueue *)globalQueue {
return gGlobalQueue;
}
初始化队列由分配锁,挂起block队列以及设置_serial变量这些工作组成:
- (id)initSerial: (BOOL)serial {
if ((self = [super init])) {
_lock = [[NSLock alloc] init];
_pendingBlocks = [[NSMutableArray alloc] init];
_serial = serial;
}
return self;
}
在我们接触剩余的公开API之前,有一个底层的方法需要创建,这个方法将在线程池派发一个单一的block,然后调用他自己来运行另一个block:
- (void)dispatchOneBlock {
这个方法的目的是在线程池运行东西,所以他在这里派发:
[gThreadPool addBlock: ^{
然后他抓住了队列里的第一个block。自然的,这必须结合锁来完成,以避免灾难事故:
[_lock lock];
dispatch_block_t block = [_pendingBlocks firstObject];
[_pendingBlocks removeObjectAtIndex: 0];
[_lock unlock];
随着获得block以及释放锁,block能够安全得在后台线程执行
block();
如果队列是并发的,那么这就是所有要做的。如果这是串行的,还需要:
if(_serial) {
在一个串行队列,将建立额外的block,但是不能在block完成之前唤起。当一个block完成, dispatchOneBlock会查看队列中是否有其他挂起的block,如果有,他会调用自己去派发下一个block,如果没有,他会将队列的运行状态设回NO:
[_lock lock];
if([_pendingBlocks count] > 0) {
[self dispatchOneBlock];
} else {
_serialRunning = NO;
}
[_lock unlock];
}
}];
}
用这个方法来实现dispatchAsync是相当简单的。添加block到挂起的block的队列,设置状态并且视情况唤起dispatchOneBlock:
- (void)dispatchAsync: (dispatch_block_t)block {
[_lock lock];
[_pendingBlocks addObject: block];
如果一个串行队列是闲置状态,那么设置为运行状态并调用dispatchOneBlock来执行要做的事:
if(_serial && !_serialRunning) {
_serialRunning = YES;
[self dispatchOneBlock];
如果队列是并发的,那么无条件的调用dispatchOneBlock。这能确保新的block能够尽可能快的执行,尽管另一个block已经在运行中,因为在并发的情况下允许多个blocks执行:
} else if (!_serial) {
[self dispatchOneBlock];
}
如果一个串行已经运行,那没什么更多要做的了。dispatchOneBlock会执行完所有添加到队列的block。现在释放锁:
[_lock unlock];
}
在dispatchSync方面,GCD当停止队列里其他block时,在调用的线程上直接运行block(如果这是串行)。我们不想尝试做到这么智能。取而代之的,我们只是包装一下dispatchAsync:,使他能够等待完成执行。
他使用一个局部NSCondition变量,附加一个done的BOOL变量来表明什么时候block已经完成:
- (void)dispatchSync: (dispatch_block_t)block {
NSCondition *condition = [[NSCondition alloc] init];
__block BOOL done = NO;
然后他异步的派发block。这里调用的是传入的block,然后设置状态为完成并且让条件锁发送信号:
[self dispatchAsync: ^{
block();
[condition lock];
done = YES;
[condition signal];
[condition unlock];
}];
回到原来正在调用的线程,我们要做的是等待状态被设为done,然后返回。
[condition lock];
while (!done) {
[condition wait];
}
[condition unlock];
}
到此,block的执行已经完成了,这也是MADispatchQueue的API最后一点要做的了。
结论
一个全局的线程池能够通过一组工作的block和一些比较智能的线程来实现。使用一个共享的全局线程池,能够创建一个提供串行/并发和同步/异步派发的基础派发队列的API。本次重建缺少了许多GCD很棒的特性,并且非常低效。不管怎样这让我们很好的了解了内部工作原理。