目录
1.RACScheduler是如何封装GCD的
2.RACSequence的一些子类
3.RACScheduler是如何“取消”并发任务的
4.RACScheduler是如何和RAC其他组件进行完美整合的
RACSequence的一些子类
RACSequence总共有以下5个子类。
1. RACTestScheduler
这个类主要是一个测试类,主要用在单元测试中,它是用来验证异步调用没有花费大量的时间等待。RACTestScheduler也可以用在多线程当中,一次只能在排队的方法队列中选择一个方法执行。
@interface RACTestSchedulerAction : NSObject
@property (nonatomic, copy, readonly) NSDate *date;
@property (nonatomic, copy, readonly) void (^block)(void);
@property (nonatomic, strong, readonly) RACDisposable *disposable;
- (id)initWithDate:(NSDate *)date block:(void (^)(void))block;
@end
在单元测试中,ReactiveCocoa为了方便比较每个方法的调用,新建了一个RACTestSchedulerAction对象,用来更加方便的比较和描述测试的全过程。RACTestSchedulerAction的定义如上。现在再来解释一下参数。
date是一个时间,时间主要是用来比较和决定下一次该轮到哪个闭包要开始执行了。void (^block)(void)闭包是RACScheduler中的一个任务。disposable是控制一个action是否可以执行的。一旦disposed了,那么这个action就不会被执行。initWithDate: block: 方法是初始化一个新的action。在单元测试过程中,需要调用step方法来进行查看每次调用闭包的情况。
(void)step {
[self step:1];
}
- (void)stepAll {
[self step:NSUIntegerMax];
}
step和stepAll方法都是调用step:方法。step只是执行一次RACScheduler中的任务,stepAll是执行所有的RACScheduler中的任务。既然都是调用step:,那接下来分析一下step:的具体实现。
- (void)step:(NSUInteger)ticks {
@synchronized (self) {
for (NSUInteger i = 0; i < ticks; i++) {
const void *actionPtr = NULL;
if (!CFBinaryHeapGetMinimumIfPresent(self.scheduledActions, &actionPtr)) break;
RACTestSchedulerAction *action = (__bridge id)actionPtr;
CFBinaryHeapRemoveMinimumValue(self.scheduledActions);
if (action.disposable.disposed) continue;
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
action.block();
if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
}
}
step:的实现主要就是一个for循环。循环的次数就是入参ticks决定的。首先const void *actionPtr是一个指向函数的指针。在上述实现中有一个很重要的函数——CFBinaryHeapGetMinimumIfPresent。该函数的原型如下:
Boolean CFBinaryHeapGetMinimumIfPresent(CFBinaryHeapRef heap, const void **value)
这个函数的主要作用的是在二分堆heap中查找一个最小值。
static CFComparisonResult RACCompareScheduledActions(const void *ptr1, const void *ptr2, void *info) {
RACTestSchedulerAction *action1 = (__bridge id)ptr1;
RACTestSchedulerAction *action2 = (__bridge id)ptr2;
return CFDateCompare((__bridge CFDateRef)action1.date, (__bridge CFDateRef)action2.date, NULL);
}
比较规则如上,就是比较两者的date的值。从二分堆中找出这样一个最小值,对应的就是scheduler中的任务。如果最小值有几个相等最小值,就随机返回一个最小值。返回的函数放在actionPtr中。整个函数的返回值是一个BOOL值,如果二分堆不为空,能找到最小值就返回YES,如果二分堆为空,就找不到最小值了,就返回NO。
stepAll方法里面传入了NSUIntegerMax,这个for循环也不会死循环,因为到堆中所有的任务都执行完成之后,CFBinaryHeapGetMinimumIfPresent返回NO,就会执行break,跳出循环。
这里会把currentScheduler保存到线程字典里面。接着会执行action.block,执行任务。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != nil);
@synchronized (self) {
NSDate *uniqueDate = [NSDate dateWithTimeIntervalSinceReferenceDate:self.numberOfDirectlyScheduledBlocks];
self.numberOfDirectlyScheduledBlocks++;
RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:uniqueDate block:block];
CFBinaryHeapAddValue(self.scheduledActions, (__bridge void *)action);
return action.disposable;
}
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != nil);
@synchronized (self) {
RACTestSchedulerAction *action = [[RACTestSchedulerAction alloc] initWithDate:date block:block];
CFBinaryHeapAddValue(self.scheduledActions, (__bridge void *)action);
return action.disposable;
}
}
schedule:方法里面会累加numberOfDirectlyScheduledBlocks
值,这个值也会初始化成时间,以便比较各个方法该调度的时间。numberOfDirectlyScheduledBlocks
最终会代表总共有多少个block任务产生了。然后用CFBinaryHeapAddValue加入到堆中。after:schedule:
就是直接新建RACTestSchedulerAction对象,然后再用CFBinaryHeapAddValue把block闭包加入到堆中。
after: repeatingEvery: withLeeway: schedule:
同样也是新建RACTestSchedulerAction对象,然后再用CFBinaryHeapAddValue把block闭包加入到堆中。
2. RACSubscriptionScheduler
RACSubscriptionScheduler是RACScheduler最后一个单例。RACScheduler中唯一的三个单例现在就齐全了:RACImmediateScheduler,RACTargetQueueScheduler ,RACSubscriptionScheduler。
+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}
RACSubscriptionScheduler 的名字是@”com.ReactiveCocoa.RACScheduler.subscriptionScheduler”
- (id)init {
self = [super initWithName:@"com.ReactiveCocoa.RACScheduler.subscriptionScheduler"];
if (self == nil) return nil;
_backgroundScheduler = [RACScheduler scheduler];
return self;
}
RACSubscriptionScheduler初始化的时候会新建一个Global Dispatch Queue。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
如果RACScheduler.currentScheduler为nil就用backgroundScheduler去调用block闭包,否则就执行block闭包。
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date schedule:block];
}
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
RACScheduler *scheduler = RACScheduler.currentScheduler ?: self.backgroundScheduler;
return [scheduler after:date repeatingEvery:interval withLeeway:leeway schedule:block];
}
两个after方法都有取出RACScheduler.currentScheduler,如果为空就用self.backgroundScheduler去调用各自的after的方法。
RACSubscriptionScheduler中的backgroundScheduler的意义就在此,当RACScheduler.currentScheduler不存在的时候就会替换成self.backgroundScheduler。
- RACImmediateScheduler
这个子类在分析immediateScheduler方法的时候,详细分析过了,这里不再赘述。
4. RACQueueScheduler
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
schedule:会调用performAsCurrentScheduler:方法。
- (void)performAsCurrentScheduler:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
@autoreleasepool {
block();
}
if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
performAsCurrentScheduler:方法会先在调用block( )之前,把当前的scheduler存入线程字典中。
试想,如果现在在一个Concurrent Dispatch Queue中,在执行block( )之前需要先切换线程,切换到当前scheduler中。当执行完block闭包之后,previousScheduler如果不为nil,那么就还原现场,线程字典里面再存回原来的scheduler,反之previousScheduler为nil,那么就移除掉线程字典里面的key。
这里需要值得注意的是:
scheduler本质其实是一个quene,并不是一个线程。它只能保证里面的线程都是串行执行的,但是它不能保证每个任务都在同一个线程里面执行。
如上面这段performAsCurrentScheduler:的实现所表现的那样。所以
在scheduler使用Core Data很容易崩溃,很可能跑到子线程上面去了。一旦写数据的时候到了子线程上,很容易就Crash了。一定要记得回到main queue上。
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
在after中调用dispatch_after方法,经过date时间之后再调用performAsCurrentScheduler:。
wallTimeWithDate:
的实现如下:
+ (dispatch_time_t)wallTimeWithDate:(NSDate *)date {
NSCParameterAssert(date != nil);
double seconds = 0;
double frac = modf(date.timeIntervalSince1970, &seconds);
struct timespec walltime = {
.tv_sec = (time_t)fmin(fmax(seconds, LONG_MIN), LONG_MAX),
.tv_nsec = (long)fmin(fmax(frac * NSEC_PER_SEC, LONG_MIN), LONG_MAX)
};
return dispatch_walltime(&walltime, 0);
}
dispatch_walltime
函数是由POSIX中使用的struct timespec类型的时间得到dispatch_time_t类型的值。dispatch_time函数通常用于计算相对时间,而dispatch_walltime函数用于计算绝对时间。
这段代码其实很简单,就是把date的时间转换成一个dispatch_time_t类型的。由NSDate类对象获取能传递给dispatch_after函数的dispatch_time_t类型的值。
- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(interval > 0.0 && interval = 0.0 && leeway
after: repeatingEvery: withLeeway: schedule:方法里面的实现就是用GCD在self.queue上创建了一个Timer,时间间隔是interval,修正时间是leeway。
leeway这个参数是为dispatch source指定一个期望的定时器事件精度,让系统能够灵活地管理并唤醒内核。例如系统可以使用leeway值来提前或延迟触发定时器,使其更好地与其它系统事件结合。创建自己的定时器时,应该尽量指定一个leeway值。不过就算指定leeway值为0,也不能完完全全期望定时器能够按照精确的纳秒来触发事件。
这个定时器在interval执行入参闭包。在取消任务的时候调用dispatch_source_cancel取消定时器timer。
- RACTargetQueueScheduler
这个子类在分析mainThreadScheduler方法的时候,详细分析过了,这里不再赘述。
RACScheduler是如何“取消”并发任务的
既然RACScheduler是对GCD的封装,那么在GCD的上层可以实现一些GCD所无法完成的“特性”。这里的“特性”是打引号的,因为底层是GCD,上层的特性只能通过一些特殊手段来实现看似是新的特性。在这一点上,RACScheduler就实现了GCD没有的特性——“取消”任务。
既然GCD不方便取消一个任务,那么RACScheduler是怎么做到的呢?
这就体现在RACQueueScheduler上。回头看看RACQueueScheduler的schedule:实现 和 after: schedule:实现。
最核心的代码:
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
在调用performAsCurrentScheduler:之前,加了一个判断,判断当前是否取消了任务,如果取消了任务,就return,不会调用block闭包。这样就实现了取消任务的“假象”。