-
RACScheduler
:信号调度器,是一个线性执行队列,rac中的信号可以在RACScheduler上执行任务、发送结果,底层用GCD封装的 -
RACObserve
:是一个宏定义,是使用了原生的KVO
与响应式编程
相结合的产物;不是所有的property
都可以被RACObserve
,该property
必须支持KVO
,比如NSURLCache
的currentDiskUsage
就不能被RACObserve
-
RACSubscriber
:是一个协议类,订阅者,发送信号 -
RACDisposable
:它可以帮助我们取消订阅,在信号发送完毕,失败都可以,就类似通知结束后销毁通知一样效果 -
RACSubject
:信号提供者,可以自己充当信号,自己可以发送信号,拥有RACSignal和RACSubscriber两者功能。
RACPassthroughSubscriber核心订阅者
继续上节课我们使用信号的案例
- (void)racBase{
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
// block调用时刻:每当有订阅者订阅信号,就会调用block。
[subscriber sendNext:@"Hello World"];
RACDisposable *disposable = [RACDisposable disposableWithBlock:^{
// 销毁信号
NSLog(@"开始销毁");
}];
return disposable;
}];
// 订阅信号,才会激活信号.
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"x == %@",x);
}];
}
// 打印日志
2022-12-07 22:58:46.445311+0800 001---RAC底层分析[58542:4644847] x == Hello World
2022-12-07 22:58:46.445532+0800 001---RAC底层分析[58542:4644847] 开始销毁
- 查看订阅信号
subscribeNext
源码
<!-- RAC源码 -->
// 查看订阅信号subscribeNext源码
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
#pragma mark Managing Subscribers
// 查看subscribe源码
// 重点: RAC 精髓
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// 多态 --- 厉害之处:RAC流程的三个主要元素这里全部具备,并且都进行了保存,就可以随时随地复用
/** RACPassthroughSubscriber
* subscriber 订阅者 --- RACSubscriber传进来
* signal 信号
* disposable 销毁者
*/
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
// 多线程--cpu--自己跑了
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
通过- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
源码,我们发现RAC
的精髓是保存了RAC
流程中的三个核心元素
,于是就可以随时随地复用。
案例一:使用subscriber
发送错误信号
<!-- 生命全局调度者 -->
@interface ViewController ()
@property (nonatomic, strong) id<RACSubscriber> kcSubscriber;
@end
<!-- 全局调度者接收 -->
- (void)viewDidLoad {
[super viewDidLoad];
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
self.kcSubscriber = subscriber;
// block调用时刻:每当有订阅者订阅信号,就会调用block。
RACDisposable *disposable = [RACDisposable disposableWithBlock:^{
// 销毁信号
NSLog(@"开始销毁");
}];
return disposable;
}];
// 订阅错误信号
[signal subscribeError:^(NSError * _Nullable error) {
NSLog(@"%@", error);
}];
}
<!-- 页面创建的按钮点击事件 -->
- (IBAction)didClickBtnClickTwo:(id)sender {
// 发送错误信号
NSError *error = [NSError errorWithDomain:NSURLErrorDomain code:10088 userInfo:@{@"LGError":@"hhaha "}];
[self.kcSubscriber sendError:error];
}
// 点击按钮发送信号,查看打印日志
2022-12-07 23:08:08.072170+0800 001---RAC底层分析[58665:4653906] 开始销毁
2022-12-07 23:08:08.072417+0800 001---RAC底层分析[58665:4653906] Error Domain=NSURLErrorDomain Code=10088 "(null)" UserInfo={LGError=hhaha }
// 也可以使用kcSubscriber发送完成信号 以及 普通信号
- (IBAction)didClickBtnClickOne:(id)sender {
[self.kcSubscriber sendNext:@"RAC"];
}
- (IBAction)didClickBtnClickThree:(id)sender {
// 发送完成信号
[self.kcSubscriber sendCompleted];
}
通过该案例我们能够得出- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
源码对subscriber
的封装,以便二次利用,不用重复代码。
- 继续查看源码
[[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);
self = [super init];
// 保存了RAC核心三要素
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}
疑问?当订阅者发送了错误信号
或者完成信号
,再次点击按钮就不会发送了?
原因是当订阅者发送了错误信号
或者完成信号
,会被自动销毁。
- 查看
sendError:
和sendCompleted
源码
- (void)sendError:(NSError *)error {
// 如果被销毁,直接return;下面就不会再发送错误信号
if (self.disposable.disposed) return;
if (RACSIGNAL_ERROR_ENABLED()) {
RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
}
[self.innerSubscriber sendError:error];
}
- (void)sendCompleted {
if (self.disposable.disposed) return;
if (RACSIGNAL_COMPLETED_ENABLED()) {
RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
}
[self.innerSubscriber sendCompleted];
}
- 查看
sendNext
源码,我们发现也会判断是否销毁
- (void)sendNext:(id)value {
if (self.disposable.disposed) return;
if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}
// 真正的订阅 RACSubscriber
[self.innerSubscriber sendNext:value];
}
- 查看
sendNext
、sendError
、sendCompleted
源码
#pragma mark RACSubscriber
- (void)sendNext:(id)value {
// 同步锁
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
// 深浅拷贝
// SDK -- 不能侵入外部参数值,所以这里进行深copy
void (^errorBlock)(NSError *) = [self.error copy];
// 上一行虽然拿到了错误block,但是这里会直接销毁
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
if (otherDisposable.disposed) return;
RACCompoundDisposable *selfDisposable = self.disposable;
[selfDisposable addDisposable:otherDisposable]; // 添加对象
@unsafeify(otherDisposable);
// If this subscription terminates, purge its disposable to avoid unbounded
// memory growth.
[otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
@strongify(otherDisposable);
[selfDisposable removeDisposable:otherDisposable];
}]];
}
通过上面sendError
、sendCompleted
源码,我们发现调用了发送错误或者发送完成信号就会销毁,那么sendNext
什么时候销毁呢?
- RAC中也有一个
dealloc
析构函数,当ViewController
销毁的时候,就会触发RAC
销毁
// 添加@weakify(self),防止RAC循环引用,导致ViewController不能销毁
@weakify(self)
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
@strongify(self)
self.kcSubscriber = subscriber;
// block调用时刻:每当有订阅者订阅信号,就会调用block。
RACDisposable *disposable = [RACDisposable disposableWithBlock:^{
// 销毁信号
NSLog(@"开始销毁");
}];
return disposable;
}];
// RAC销毁
- (void)dealloc {
[self.disposable dispose];
}
小结: 订阅者
保存了所有RAC的元素
- RAC signal subscribe dispose,可以随时订阅信号以及发送信号
- 发送
错误信号
、完成信号
会自动销毁,sendNext
会随着页面生命周期调用dealloc
销毁。
RACScheduler调度者
继续查看RAC
中的源码精髓- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
方法,来了解RACScheduler
调度者的底层原理
#pragma mark Managing Subscribers
// 重点: RAC 精髓
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
// 调度者的核心就是多线程
// 多线程--依赖于cpu--内部自己运行跑了,完全不需要外部介入
// 这里不需要完全依靠cpu,要依赖于自己,更加自由
// schedule 回调就是对当前线程判断,进行调度,看调度是在主线程还是在子线程
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
// block回调回来执行到这里
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
// RACScheduler.subscriptionScheduler 生成一个订阅调度者
+ (RACScheduler *)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
// 生成一个单例对象,RACSubscriptionScheduler 订阅调度者
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}
// schedule回调
- (RACDisposable *)schedule:(void (^)(void))block {
// 断言
NSCParameterAssert(block != NULL);
// 当前Scheduler不为空,直接回调block;因为当前一直是在主线程,从没有切换到子线程操作过
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
+ (RACScheduler *)currentScheduler {
// RACSchedulerCurrentSchedulerKey 自定义的键值对
// 当前线程字典
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
// 自定义键值为空,设置成默认主线程
// 如果用户没有操作过线程,RAC就会认为当前就是主线程
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
上面我们通过使用RAC
学习了调度者在主线程的源码流程
,下面来学习调度者在子线程的源码流程
案例二:通过RACScheduler
调度者在子线程的使用,我们来分析在子线程的流程
- (void)viewDidLoad {
[super viewDidLoad];
// 创建全局并发队列
dispatch_async(dispatch_get_global_queue(0, 0), ^{
@weakify(self)
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
@strongify(self)
NSLog(@"来了,请求网络");
// 3:发送信号
[subscriber sendNext:@"Hello World"];
// 4:销毁
// return self.dis; 销毁回调
return [RACDisposable disposableWithBlock:^{
NSLog(@"我们销毁了");
}];
}];
// 2: 订阅信号
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"0:订阅到了:%@",x);
}];
});
}
通过断点调试,我们发现RACScheduler.currentScheduler
即没有自定义调度者,也不是主线程调度者,返回为nil
;于是拿起后台备用调度者
来使用。
@interface RACSubscriptionScheduler ()
// A private background scheduler on which to subscribe if the +currentScheduler
// is unknown.
@property (nonatomic, strong, readonly) RACScheduler *backgroundScheduler;
@end
@implementation RACSubscriptionScheduler
#pragma mark Lifecycle
- (instancetype)init {
self = [super initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.subscriptionScheduler"];
// RACSubscriptionScheduler 订阅调度者在初始化的时候就会创建后台调度者
_backgroundScheduler = [RACScheduler scheduler];
return self;
}
<!-- 查看[RACScheduler scheduler];类方法 -->
+ (RACScheduler *)scheduler {
// 优先级越高,被GCD调度的机会就越大,越优先执行
return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority {
return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler"];
}
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
// 全局并发队列
// 如果在子线程操作,并且没有指定子线程,就会开启一条全局子线程,其实就是对GCD的底层封装
return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}
- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);
if (name == nil) {
name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
}
// 指定队列去执行
dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;
dispatch_set_target_queue(queue, targetQueue);
return [super initWithName:name queue:queue];
}
疑问?initWithName
方法中指定队列,为什么是DISPATCH_QUEUE_SERIAL
?
因为当前信号是一个个来的,要保证调度是异步的同时,还要保证调度是顺序执行的,使用DISPATCH_QUEUE_SERIAL
是为了保证顺序执行。即牺牲了一定的并发性,来保证执行顺序
。
查看[self.backgroundScheduler schedule:block];
方法源码
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
- (void)performAsCurrentScheduler:(void (^)(void))block {
NSCParameterAssert(block != NULL);
// If we're using a concurrent queue, we could end up in here concurrently,
// in which case we *don't* want to clear the current scheduler immediately
// after our block is done executing, but only *after* all our concurrent
// invocations are done.
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
// KVC -- backGround
// RACSchedulerCurrentSchedulerKey对应的值就是 创建的后台调度者
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
// autoreleasepool的作用:使用的地方 1. 大量临时变量 2. 自定义线程 3. 执行非UI操作
// 当前是调度一些线程中的东西,并且也不是在进行UI操作,所以这里使用autoreleasepool
// 使用的第二个作用,延迟其生命周期,防止析构释放
@autoreleasepool {
block();
}
if (previousScheduler != nil) {
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
// 移除防止KVC找不到
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
在子线程调度一次之后,下次就会直接从NSThread.currentThread.threadDictionary
中取出previousScheduler
,会优化掉一些性能问题;如果是在主线程的话直接拿出RACScheduler.mainThreadScheduler
。
RACDisposable销毁
RACDisposable
并不是真正的销毁,只是类似于信号响应的销毁,下面分析原因:
- 前面我们学习了
sendNext
会随着页面生命周期销毁,我们接着这一块继续看源码
- (instancetype)init {
self = [super init];
@unsafeify(self);
RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
@strongify(self);
@synchronized (self) {
self.next = nil;
self.error = nil;
self.completed = nil;
}
}];
// RACCompoundDisposable 复合销毁者,里面有很多个 单个销毁者
// 原因是 随着这一次的销毁,里面需要销毁的东西有很多,就构成了 复合销毁者
_disposable = [RACCompoundDisposable compoundDisposable];
[_disposable addDisposable:selfDisposable];
return self;
}
- (void)dealloc {
// 单个销毁者
[self.disposable dispose];
}
- 继续跟进源码,分析
addDisposable
方法
// 所有销毁者 +
// 该方法主要作用:就是往数组中添加单个销毁者
- (void)addDisposable:(RACDisposable *)disposable {
NSCParameterAssert(disposable != self);
if (disposable == nil || disposable.disposed) return;
BOOL shouldDispose = NO;
// 加锁的原因:防止线程不安全 。读写操作必须添加一个锁保证线程安全
pthread_mutex_lock(&_mutex);
{
// 往数组添加值
if (_disposed) {
shouldDispose = YES;
} else {
// 性能调试 得出 RACCompoundDisposableInlineCount = 2
// RACCompoundDisposableInlineCount 这个值是2
#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
if (_inlineDisposables[i] == nil) {
// 数组中添加单个销毁者
_inlineDisposables[i] = disposable;
goto foundSlot;
}
}
#endif
// 如果 RACCompoundDisposableInlineCount 不是等于2 就会执行下面的代码
if (_disposables == NULL) _disposables = RACCreateDisposablesArray();
CFArrayAppendValue(_disposables, (__bridge void *)disposable);
if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) {
RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
}
#if RACCompoundDisposableInlineCount
foundSlot:;
#endif
}
}
pthread_mutex_unlock(&_mutex);
// Performed outside of the lock in case the compound disposable is used
// recursively.
// 数组数量超过2就销毁,腾出空间让数组添加
if (shouldDispose) [disposable dispose];
}
-
addDisposable
方法的核心就是[disposable dispose];
,下面继续跟进分析RACCompoundDisposable
的dispose
- (void)dispose {
#if RACCompoundDisposableInlineCount
RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
#endif
CFArrayRef remainingDisposables = NULL;
// 锁
pthread_mutex_lock(&_mutex);
{
_disposed = YES;
#if RACCompoundDisposableInlineCount
// 把所有的容器置空
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
// 完全拷贝/部分拷贝 的区别,自己下去了解学习
inlineCopy[i] = _inlineDisposables[i];
_inlineDisposables[i] = nil;
}
#endif
remainingDisposables = _disposables;
// 这时整个数组为NULL
_disposables = NULL;
}
pthread_mutex_unlock(&_mutex);
#if RACCompoundDisposableInlineCount
// Dispose outside of the lock in case the compound disposable is used
// recursively.
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
// dispose 瞬间销毁 ---> while 耗时
[inlineCopy[i] dispose];
}
#endif
if (remainingDisposables == NULL) return;
CFIndex count = CFArrayGetCount(remainingDisposables);
// 所有符合式的容器 去调用 disposeEach 这个函数
// disposeEach函数就是 遍历去销毁
CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
CFRelease(remainingDisposables);
}
- 上面我们了解了复合式销毁,下面查看
disposeEach
函数,了解单个销毁
// 外面 --> 用compund - 有值 数组
// 数组 --> dispose --- 销毁对象
static void disposeEach(const void *value, void *context) {
RACDisposable *disposable = (__bridge id)value;
[disposable dispose];
}
- (void)dispose {
void (^disposeBlock)(void) = NULL;
while (YES) {
// 临时变量
// _disposeBlock 就是 我们使用信号时的销毁回调
/* [RACDisposable disposableWithBlock:^{
// 销毁信号
NSLog(@"开始销毁");
}]; */
void *blockPtr = _disposeBlock;
// 防止内存偏移 因为多线程操作 很可以出现线程把性能的内存地址比较差 销毁 进行偏移
// A B C 表示OSAtomicCompareAndSwapPtrBarrier函数的三个参数
// A VS C 参数是否相等?(比较的是内存地址)
// 如果相等的话 ---> 就把NULL给到_disposeBlock地址空间进行销毁 --> 整个函数会返回YES
// 没有匹配上,整个函数返回NO,找不到就死循环
// OSAtomicCompareAndSwapPtrBarrier函数的作用就是 必须找到
if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
// 一直找 ---> disposable --
// 包括自身也进行销毁
if (blockPtr != (__bridge void *)self) {
disposeBlock = CFBridgingRelease(blockPtr);
}
break;
}
}
if (disposeBlock != nil) disposeBlock();
}
// 析构的时候会再次把_disposeBlock地址空间置为NULL,以防止之前没有销毁掉
- (void)dealloc {
if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;
CFRelease(_disposeBlock);
_disposeBlock = NULL;
}
RAC的回调核心里面都有一个disposable
销毁,subscribeNext
信号订阅方法也会返回一个disposable
,可以随时随地进行销毁。
案例三:subscribeNext
方法获取到disposable
进行销毁,不执行订阅回调
- (void)viewDidLoad {
[super viewDidLoad];
// 1:创建信号
dispatch_async(dispatch_get_global_queue(0, 0), ^{
@weakify(self)
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
@strongify(self)
NSLog(@"来了,请求网络");
// 3:发送信号
[subscriber sendNext:@"Hello World"];
// 4:销毁
// return self.dis; 销毁回调
return [RACDisposable disposableWithBlock:^{
NSLog(@"我们销毁了");
}];
}];
// 2: 订阅信号
RACDisposable *disp = [signal subscribeNext:^(id _Nullable x) {
NSLog(@"0:订阅到了:%@",x);
}];
[disp dispose];
});
}
// 打印日志
2022-12-09 22:36:19.796853+0800 001---RAC底层分析[13481:553240] 来了,请求网络
2022-12-09 22:36:19.797013+0800 001---RAC底层分析[13481:553240] 我们销毁了
RACObserve
RAC
中使用KVO
的方式如下,我们跟进一下源码看看其原理是什么?
self.name = @"hello world";
[RACObserve(self, name) subscribeNext:^(id _Nullable x) {
NSLog(@"%@",x);
}];
- 查看
RACObserve
方法,发现是一个宏定义,_RACObserve
是一个内部的宏定义
#define RACObserve(TARGET, KEYPATH) _RACObserve(TARGET, KEYPATH)
// \代表换行符号
#define _RACObserve(TARGET, KEYPATH) \
({ \
__weak id target_ = (TARGET); \
[target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self]; \
})
// 我们发现 rac_valuesForKeyPath 返回的是RACSignal,有了signal就可以进行订阅信号 [signal subscribeNext:]
#if OS_OBJECT_HAVE_OBJC_SUPPORT
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak NSObject *)observer;
#else
// Swift builds with OS_OBJECT_HAVE_OBJC_SUPPORT=0 for Playgrounds and LLDB :(
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(NSObject *)observer;
#endif
- 查看
rac_valuesForKeyPath
方法的实现,其中核心方法是rac_valuesAndChangesForKeyPath
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak NSObject *)observer {
// rac_valuesAndChangesForKeyPath 值发生改变的观察
return [[[self
rac_valuesAndChangesForKeyPath:keyPath options:NSKeyValueObservingOptionInitial observer:observer]
map:^(RACTuple *value) {
// -map: because it doesn't require the block trampoline that -reduceEach: uses
return value[0];
}]
setNameWithFormat:@"RACObserve(%@, %@)", RACDescription(self), keyPath];
}
- 查看
rac_valuesAndChangesForKeyPath
源码实现,其中的核心是看他怎么进行block回传的
- (RACSignal *)rac_valuesAndChangesForKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver {
NSObject *strongObserver = weakObserver;
keyPath = [keyPath copy];
NSRecursiveLock *objectLock = [[NSRecursiveLock alloc] init];
objectLock.name = @"org.reactivecocoa.ReactiveObjC.NSObjectRACPropertySubscribing";
__weak NSObject *weakSelf = self;
RACSignal *deallocSignal = [[RACSignal
zip:@[
// 发生销毁的时候就会有 销毁信号rac_willDeallocSignal
self.rac_willDeallocSignal,
strongObserver.rac_willDeallocSignal ?: [RACSignal never]
]]
doCompleted:^{
// Forces deallocation to wait if the object variables are currently
// being read on another thread.
[objectLock lock];
@onExit {
[objectLock unlock];
};
}];
return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
// Hold onto the lock the whole time we're setting up the KVO
// observation, because any resurrection that might be caused by our
// retaining below must be balanced out by the time -dealloc returns
// (if another thread is waiting on the lock above).
[objectLock lock];
@onExit {
[objectLock unlock];
};
__strong NSObject *observer __attribute__((objc_precise_lifetime)) = weakObserver;
__strong NSObject *self __attribute__((objc_precise_lifetime)) = weakSelf;
if (self == nil) {
[subscriber sendCompleted];
return nil;
}
// KVO --- value -- block()
// KVO --- 发现有值发生改变 - 调用block
// 值发生改变,就会触发RAC中 KVO的回调 subscribeNext:^{}
return [self rac_observeKeyPath:keyPath options:options observer:observer block:^(id value, NSDictionary *change, BOOL causedByDealloc, BOOL affectedOnlyLastComponent) {
[subscriber sendNext:RACTuplePack(value, change)];
}];
}]
takeUntil:deallocSignal]
setNameWithFormat:@"%@ -rac_valueAndChangesForKeyPath: %@ options: %lu observer: %@", RACDescription(self), keyPath, (unsigned long)options, RACDescription(strongObserver)];
}
- 继续跟进源码中的
rac_observeKeyPath:
方法,其核心是RACKVOTrampoline
信息反弹对象
- (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {
NSCParameterAssert(block != nil);
NSCParameterAssert(keyPath.rac_keyPathComponents.count > 0);
keyPath = [keyPath copy];
NSObject *strongObserver = weakObserver;
// KVO -- key -- person.dog.name
// KVO 能观察键值 还能观察路由 比如可以观察 Dog.name
NSArray *keyPathComponents = keyPath.rac_keyPathComponents;
BOOL keyPathHasOneComponent = (keyPathComponents.count == 1);
NSString *keyPathHead = keyPathComponents[0];
NSString *keyPathTail = keyPath.rac_keyPathByDeletingFirstKeyPathComponent;
// KVO中创建了很多的变量等信息,这些东西都需要销毁,于是创建了复合式销毁对象
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// The disposable that groups all disposal necessary to clean up the callbacks
// added to the value of the first key path component.
RACSerialDisposable *firstComponentSerialDisposable = [RACSerialDisposable serialDisposableWithDisposable:[RACCompoundDisposable compoundDisposable]];
RACCompoundDisposable * (^firstComponentDisposable)(void) = ^{
return (RACCompoundDisposable *)firstComponentSerialDisposable.disposable;
};
// RACCompoundDisposable dispoable
[disposable addDisposable:firstComponentSerialDisposable];
BOOL shouldAddDeallocObserver = NO;
// 这里self 就是外部调用者的对象 self,这里是拿到self的属性列表
objc_property_t property = class_getProperty(object_getClass(self), keyPathHead.UTF8String);
if (property != NULL) {
rac_propertyAttributes *attributes = rac_copyPropertyAttributes(property);
if (attributes != NULL) {
@onExit {
free(attributes);
};
BOOL isObject = attributes->objectClass != nil || strstr(attributes->type, @encode(id)) == attributes->type;
BOOL isProtocol = attributes->objectClass == NSClassFromString(@"Protocol");
BOOL isBlock = strcmp(attributes->type, @encode(void(^)())) == 0;
BOOL isWeak = attributes->weak;
// If this property isn't actually an object (or is a Class object),
// no point in observing the deallocation of the wrapper returned by
// KVC.
//
// If this property is an object, but not declared `weak`, we
// don't need to watch for it spontaneously being set to nil.
//
// Attempting to observe non-weak properties will result in
// broken behavior for dynamic getters, so don't even try.
shouldAddDeallocObserver = isObject && isWeak && !isBlock && !isProtocol;
}
}
// Adds the callback block to the value's deallocation. Also adds the logic to
// clean up the callback to the firstComponentDisposable.
void (^addDeallocObserverToPropertyValue)(NSObject *) = ^(NSObject *value) {
if (!shouldAddDeallocObserver) return;
// If a key path value is the observer, commonly when a key path begins
// with "self", we prevent deallocation triggered callbacks for any such key
// path components. Thus, the observer's deallocation is not considered a
// change to the key path.
if (value == weakObserver) return;
NSDictionary *change = @{
NSKeyValueChangeKindKey: @(NSKeyValueChangeSetting),
NSKeyValueChangeNewKey: NSNull.null,
};
RACCompoundDisposable *valueDisposable = value.rac_deallocDisposable;
RACDisposable *deallocDisposable = [RACDisposable disposableWithBlock:^{
block(nil, change, YES, keyPathHasOneComponent);
}];
[valueDisposable addDisposable:deallocDisposable];
[firstComponentDisposable() addDisposable:[RACDisposable disposableWithBlock:^{
[valueDisposable removeDisposable:deallocDisposable];
}]];
};
// Adds the callback block to the remaining path components on the value. Also
// adds the logic to clean up the callbacks to the firstComponentDisposable.
void (^addObserverToValue)(NSObject *) = ^(NSObject *value) {
RACDisposable *observerDisposable = [value rac_observeKeyPath:keyPathTail options:(options & ~NSKeyValueObservingOptionInitial) observer:weakObserver block:block];
[firstComponentDisposable() addDisposable:observerDisposable];
};
// Observe only the first key path component, when the value changes clean up
// the callbacks on the old value, add callbacks to the new value and call the
// callback block as needed.
//
// Note this does not use NSKeyValueObservingOptionInitial so this only
// handles changes to the value, callbacks to the initial value must be added
// separately.
NSKeyValueObservingOptions trampolineOptions = (options | NSKeyValueObservingOptionPrior) & ~NSKeyValueObservingOptionInitial;
// RACKVOTrampoline 表示所有信息的反弹
// KVO: info 面向对象
RACKVOTrampoline *trampoline = [[RACKVOTrampoline alloc] initWithTarget:self observer:strongObserver keyPath:keyPathHead options:trampolineOptions block:^(id trampolineTarget, id trampolineObserver, NSDictionary *change) {
// If this is a prior notification, clean up all the callbacks added to the
// previous value and call the callback block. Everything else is deferred
// until after we get the notification after the change.
if ([change[NSKeyValueChangeNotificationIsPriorKey] boolValue]) {
[firstComponentDisposable() dispose];
if ((options & NSKeyValueObservingOptionPrior) != 0) {
block([trampolineTarget valueForKeyPath:keyPath], change, NO, keyPathHasOneComponent);
}
return;
}
// From here the notification is not prior.
NSObject *value = [trampolineTarget valueForKey:keyPathHead];
// If the value has changed but is nil, there is no need to add callbacks to
// it, just call the callback block.
if (value == nil) {
block(nil, change, NO, keyPathHasOneComponent);
return;
}
// From here the notification is not prior and the value is not nil.
// Create a new firstComponentDisposable while getting rid of the old one at
// the same time, in case this is being called concurrently.
RACDisposable *oldFirstComponentDisposable = [firstComponentSerialDisposable swapInDisposable:[RACCompoundDisposable compoundDisposable]];
[oldFirstComponentDisposable dispose];
addDeallocObserverToPropertyValue(value);
// If there are no further key path components, there is no need to add the
// other callbacks, just call the callback block with the value itself.
if (keyPathHasOneComponent) {
block(value, change, NO, keyPathHasOneComponent);
return;
}
// The value has changed, is not nil, and there are more key path components
// to consider. Add the callbacks to the value for the remaining key path
// components and call the callback block with the current value of the full
// key path.
addObserverToValue(value);
block([value valueForKeyPath:keyPathTail], change, NO, keyPathHasOneComponent);
}];
// Stop the KVO observation when this one is disposed of.
[disposable addDisposable:trampoline];
// Add the callbacks to the initial value if needed.
NSObject *value = [self valueForKey:keyPathHead];
if (value != nil) {
addDeallocObserverToPropertyValue(value);
if (!keyPathHasOneComponent) {
addObserverToValue(value);
}
}
// Call the block with the initial value if needed.
if ((options & NSKeyValueObservingOptionInitial) != 0) {
id initialValue = [self valueForKeyPath:keyPath];
NSDictionary *initialChange = @{
NSKeyValueChangeKindKey: @(NSKeyValueChangeSetting),
NSKeyValueChangeNewKey: initialValue ?: NSNull.null,
};
block(initialValue, initialChange, NO, keyPathHasOneComponent);
}
RACCompoundDisposable *observerDisposable = strongObserver.rac_deallocDisposable;
RACCompoundDisposable *selfDisposable = self.rac_deallocDisposable;
// Dispose of this observation if the receiver or the observer deallocate.
[observerDisposable addDisposable:disposable];
[selfDisposable addDisposable:disposable];
return [RACDisposable disposableWithBlock:^{
[disposable dispose];
[observerDisposable removeDisposable:disposable];
[selfDisposable removeDisposable:disposable];
}];
}
- 跟进
[[RACKVOTrampoline alloc] initWithTarget...
方法
- (instancetype)initWithTarget:(__weak NSObject *)target observer:(__weak NSObject *)observer keyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options block:(RACKVOBlock)block {
NSCParameterAssert(keyPath != nil);
NSCParameterAssert(block != nil);
NSObject *strongTarget = target;
if (strongTarget == nil) return nil;
self = [super init];
// 属性保存法
_keyPath = [keyPath copy];
_block = [block copy];
_weakTarget = target;
_unsafeTarget = strongTarget;
_observer = observer;
// 移交代理 --- 观察对象
// VC ---> person dog (一个VC里面有很多个对象)
[RACKVOProxy.sharedProxy addObserver:self forContext:(__bridge void *)self];
[strongTarget addObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath options:options context:(__bridge void *)self];
[strongTarget.rac_deallocDisposable addDisposable:self];
[self.observer.rac_deallocDisposable addDisposable:self];
return self;
}
- 继续跟进
[RACKVOProxy.sharedProxy addObserver...
方法
核心是RACKVOProxy
移交代理 、trampolines
漫游表
// RACKVOProxy 最主要是创建trampolines漫游表 并且把监听到的值 扔给漫游表
// trampolines漫游表 将 监听的值 返回回去
// RACKVOProxy添加监听观察者
- (void)addObserver:(__weak NSObject *)observer forContext:(void *)context {
NSValue *valueContext = [NSValue valueWithPointer:context];
dispatch_sync(self.queue, ^{
[self.trampolines setObject:observer forKey:valueContext];
});
}
// RACKVOProxy的初始化 使用了DISPATCH_QUEUE_SERIAL串行队列 确保顺序不会错乱
// _trampolines保证了漫游表是一一对应的
- (instancetype)init {
self = [super init];
// name dog
// name person
_queue = dispatch_queue_create("org.reactivecocoa.ReactiveObjC.RACKVOProxy", DISPATCH_QUEUE_SERIAL);
_trampolines = [NSMapTable strongToWeakObjectsMapTable];
return self;
}
// RACKVOProxy发现值发生改变
// name age ---> vlaue --> 观察对象 dog person
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
NSValue *valueContext = [NSValue valueWithPointer:context];
__block NSObject *trueObserver;
dispatch_sync(self.queue, ^{
trueObserver = [self.trampolines objectForKey:valueContext];
});
if (trueObserver != nil) {
// 将所有的值 都会返回到 RACKVOTrampoline 漫游表里面去
[trueObserver observeValueForKeyPath:keyPath ofObject:object change:change context:context];
}
}
-
RACKVOTrampoline
漫游表
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
if (context != (__bridge void *)self) {
[super observeValueForKeyPath:keyPath ofObject:object change:change context:context];
return;
}
RACKVOBlock block;
id observer;
id target;
// 面向对象 的化整为零
@synchronized (self) {
block = self.block;
observer = self.observer;
target = self.weakTarget;
}
if (block == nil || target == nil) return;
block(target, observer, change);
}
-
RACKVOProxy
移除观察者,其实就是在RACKVOTrampoline
的析构dealloc
的dispose
里面进行移除了
- (void)dealloc {
[self dispose];
}
#pragma mark Observation
- (void)dispose {
NSObject *target;
NSObject *observer;
@synchronized (self) {
_block = nil;
// The target should still exist at this point, because we still need to
// tear down its KVO observation. Therefore, we can use the unsafe
// reference (and need to, because the weak one will have been zeroed by
// now).
target = self.unsafeTarget;
observer = self.observer;
_unsafeTarget = nil;
_observer = nil;
}
// coupsdis
[target.rac_deallocDisposable removeDisposable:self];
[observer.rac_deallocDisposable removeDisposable:self];
[target removeObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath context:(__bridge void *)self];
[RACKVOProxy.sharedProxy removeObserver:self forContext:(__bridge void *)self];
}