RAC底层原理分析上

RAC核心类
  • RACScheduler:信号调度器,是一个线性执行队列,rac中的信号可以在RACScheduler上执行任务、发送结果,底层用GCD封装的
  • RACObserve:是一个宏定义,是使用了原生的 KVO响应式编程 相结合的产物;不是所有的property都可以被RACObserve,该property必须支持KVO,比如NSURLCachecurrentDiskUsage就不能被RACObserve
  • RACSubscriber:是一个协议类,订阅者,发送信号
  • RACDisposable:它可以帮助我们取消订阅,在信号发送完毕,失败都可以,就类似通知结束后销毁通知一样效果
  • RACSubject:信号提供者,可以自己充当信号,自己可以发送信号,拥有RACSignal和RACSubscriber两者功能。

RACPassthroughSubscriber核心订阅者

继续上节课我们使用信号的案例

- (void)racBase{
    RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber>  _Nonnull subscriber) {
        // block调用时刻:每当有订阅者订阅信号,就会调用block。
        [subscriber sendNext:@"Hello World"];
        
        RACDisposable *disposable = [RACDisposable disposableWithBlock:^{
            // 销毁信号
            NSLog(@"开始销毁");
        }];
        return disposable;
    }];
    
    // 订阅信号,才会激活信号.
    [signal subscribeNext:^(id  _Nullable x) {
        NSLog(@"x == %@",x);
    }];
}

// 打印日志
2022-12-07 22:58:46.445311+0800 001---RAC底层分析[58542:4644847] x == Hello World
2022-12-07 22:58:46.445532+0800 001---RAC底层分析[58542:4644847] 开始销毁
  • 查看订阅信号subscribeNext源码
<!-- RAC源码 -->
// 查看订阅信号subscribeNext源码
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    NSCParameterAssert(nextBlock != NULL);
    RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    return [self subscribe:o];
}

#pragma mark Managing Subscribers
// 查看subscribe源码
// 重点: RAC 精髓
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    
    // 多态 --- 厉害之处:RAC流程的三个主要元素这里全部具备,并且都进行了保存,就可以随时随地复用
    /** RACPassthroughSubscriber
     * subscriber 订阅者  --- RACSubscriber传进来
     * signal 信号
     * disposable 销毁者
     */

    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    if (self.didSubscribe != NULL) {
        
        // 多线程--cpu--自己跑了
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            
            [disposable addDisposable:innerDisposable];
        }];

        [disposable addDisposable:schedulingDisposable];
    }
    
    return disposable;
}

通过- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber源码,我们发现RAC的精髓是保存了RAC流程中的三个核心元素,于是就可以随时随地复用。

案例一:使用subscriber发送错误信号

<!-- 生命全局调度者 -->
@interface ViewController ()
@property (nonatomic, strong) id<RACSubscriber> kcSubscriber;
@end

<!-- 全局调度者接收 -->
- (void)viewDidLoad {
    [super viewDidLoad];
    
    RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber>  _Nonnull subscriber) {
        self.kcSubscriber = subscriber;
        // block调用时刻:每当有订阅者订阅信号,就会调用block。
        RACDisposable *disposable = [RACDisposable disposableWithBlock:^{
            // 销毁信号
            NSLog(@"开始销毁");
        }];
        return disposable;
    }];
    
    // 订阅错误信号
    [signal subscribeError:^(NSError * _Nullable error) {
        NSLog(@"%@", error);
    }];
}

<!-- 页面创建的按钮点击事件 -->
- (IBAction)didClickBtnClickTwo:(id)sender {
    // 发送错误信号
    NSError *error = [NSError errorWithDomain:NSURLErrorDomain code:10088 userInfo:@{@"LGError":@"hhaha "}];
    [self.kcSubscriber sendError:error];
}

// 点击按钮发送信号,查看打印日志
2022-12-07 23:08:08.072170+0800 001---RAC底层分析[58665:4653906] 开始销毁
2022-12-07 23:08:08.072417+0800 001---RAC底层分析[58665:4653906] Error Domain=NSURLErrorDomain Code=10088 "(null)" UserInfo={LGError=hhaha }

// 也可以使用kcSubscriber发送完成信号 以及 普通信号
- (IBAction)didClickBtnClickOne:(id)sender {
    [self.kcSubscriber sendNext:@"RAC"];
}

- (IBAction)didClickBtnClickThree:(id)sender {
    // 发送完成信号
    [self.kcSubscriber sendCompleted];
}

通过该案例我们能够得出- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber源码对subscriber的封装,以便二次利用,不用重复代码。

  • 继续查看源码[[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
    NSCParameterAssert(subscriber != nil);

    self = [super init];
    // 保存了RAC核心三要素
    _innerSubscriber = subscriber;
    _signal = signal;
    _disposable = disposable;

    [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
    return self;
}

疑问?当订阅者发送了错误信号或者完成信号,再次点击按钮就不会发送了?
原因是当订阅者发送了错误信号或者完成信号,会被自动销毁。

  • 查看sendError:sendCompleted源码
- (void)sendError:(NSError *)error {
    // 如果被销毁,直接return;下面就不会再发送错误信号
    if (self.disposable.disposed) return;

    if (RACSIGNAL_ERROR_ENABLED()) {
        RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
    }

    [self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
    if (self.disposable.disposed) return;

    if (RACSIGNAL_COMPLETED_ENABLED()) {
        RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
    }

    [self.innerSubscriber sendCompleted];
}
  • 查看sendNext源码,我们发现也会判断是否销毁
- (void)sendNext:(id)value {
    if (self.disposable.disposed) return;

    if (RACSIGNAL_NEXT_ENABLED()) {
        RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
    }
    // 真正的订阅  RACSubscriber
    [self.innerSubscriber sendNext:value];
}
  • 查看sendNextsendErrorsendCompleted源码
#pragma mark RACSubscriber

- (void)sendNext:(id)value {
    // 同步锁
    @synchronized (self) {
        void (^nextBlock)(id) = [self.next copy];
        if (nextBlock == nil) return;
        nextBlock(value);
    }
}

- (void)sendError:(NSError *)e {
    @synchronized (self) {
        
        // 深浅拷贝
        // SDK -- 不能侵入外部参数值,所以这里进行深copy
        void (^errorBlock)(NSError *) = [self.error copy];
        // 上一行虽然拿到了错误block,但是这里会直接销毁
        [self.disposable dispose];

        if (errorBlock == nil) return;
        errorBlock(e);
    }
}

- (void)sendCompleted {
    @synchronized (self) {
        void (^completedBlock)(void) = [self.completed copy];
        [self.disposable dispose];
        if (completedBlock == nil) return;
        completedBlock();
    }
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
    if (otherDisposable.disposed) return;

    RACCompoundDisposable *selfDisposable = self.disposable;
    [selfDisposable addDisposable:otherDisposable]; // 添加对象

    @unsafeify(otherDisposable);

    // If this subscription terminates, purge its disposable to avoid unbounded
    // memory growth.
    [otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
        @strongify(otherDisposable);
        [selfDisposable removeDisposable:otherDisposable];
    }]];
}

通过上面sendErrorsendCompleted源码,我们发现调用了发送错误或者发送完成信号就会销毁,那么sendNext什么时候销毁呢?

  • RAC中也有一个dealloc析构函数,当ViewController销毁的时候,就会触发RAC销毁
// 添加@weakify(self),防止RAC循环引用,导致ViewController不能销毁
@weakify(self)
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber>  _Nonnull subscriber) {
    @strongify(self)

    self.kcSubscriber = subscriber;
        // block调用时刻:每当有订阅者订阅信号,就会调用block。
        RACDisposable *disposable = [RACDisposable disposableWithBlock:^{
            // 销毁信号
            NSLog(@"开始销毁");
        }];
        return disposable;
}];

// RAC销毁
- (void)dealloc {
    [self.disposable dispose];
}

小结: 订阅者保存了所有RAC的元素

核心订阅者
  • RAC signal subscribe dispose,可以随时订阅信号以及发送信号
  • 发送错误信号完成信号会自动销毁,sendNext会随着页面生命周期调用dealloc销毁。

RACScheduler调度者

继续查看RAC中的源码精髓- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber方法,来了解RACScheduler调度者的底层原理

#pragma mark Managing Subscribers
// 重点: RAC 精髓
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    if (self.didSubscribe != NULL) {
        
        // 调度者的核心就是多线程
        // 多线程--依赖于cpu--内部自己运行跑了,完全不需要外部介入
        // 这里不需要完全依靠cpu,要依赖于自己,更加自由
        // schedule 回调就是对当前线程判断,进行调度,看调度是在主线程还是在子线程
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            // block回调回来执行到这里
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            
            [disposable addDisposable:innerDisposable];
        }];

        [disposable addDisposable:schedulingDisposable];
    }
    
    return disposable;
}

// RACScheduler.subscriptionScheduler 生成一个订阅调度者
+ (RACScheduler *)subscriptionScheduler {
    static dispatch_once_t onceToken;
    static RACScheduler *subscriptionScheduler;
    dispatch_once(&onceToken, ^{
        // 生成一个单例对象,RACSubscriptionScheduler 订阅调度者
        subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
    });

    return subscriptionScheduler;
}

// schedule回调
- (RACDisposable *)schedule:(void (^)(void))block {
    // 断言
    NSCParameterAssert(block != NULL);
    // 当前Scheduler不为空,直接回调block;因为当前一直是在主线程,从没有切换到子线程操作过
    if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
    block();
    return nil;
}

+ (RACScheduler *)currentScheduler {
    // RACSchedulerCurrentSchedulerKey 自定义的键值对
    // 当前线程字典
    RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
    if (scheduler != nil) return scheduler;
    // 自定义键值为空,设置成默认主线程
    // 如果用户没有操作过线程,RAC就会认为当前就是主线程
    if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

    return nil;
}
调度者原理

上面我们通过使用RAC学习了调度者在主线程的源码流程,下面来学习调度者在子线程的源码流程

案例二:通过RACScheduler调度者在子线程的使用,我们来分析在子线程的流程

- (void)viewDidLoad {
    [super viewDidLoad];
    // 创建全局并发队列
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
       
        @weakify(self)
        RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber>  _Nonnull subscriber) {
            @strongify(self)
            NSLog(@"来了,请求网络");
            // 3:发送信号
            [subscriber sendNext:@"Hello World"];
            // 4:销毁
            // return self.dis; 销毁回调
            return [RACDisposable disposableWithBlock:^{
                NSLog(@"我们销毁了");
            }];
        }];
        // 2: 订阅信号
        [signal subscribeNext:^(id  _Nullable x) {
            NSLog(@"0:订阅到了:%@",x);
        }];
    });
}
schedule回调

通过断点调试,我们发现RACScheduler.currentScheduler即没有自定义调度者,也不是主线程调度者,返回为nil;于是拿起后台备用调度者来使用。

@interface RACSubscriptionScheduler ()

// A private background scheduler on which to subscribe if the +currentScheduler
// is unknown.
@property (nonatomic, strong, readonly) RACScheduler *backgroundScheduler;

@end

@implementation RACSubscriptionScheduler

#pragma mark Lifecycle

- (instancetype)init {
    self = [super initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.subscriptionScheduler"];
    // RACSubscriptionScheduler 订阅调度者在初始化的时候就会创建后台调度者
    _backgroundScheduler = [RACScheduler scheduler];

    return self;
}

<!-- 查看[RACScheduler scheduler];类方法 -->
+ (RACScheduler *)scheduler {
    // 优先级越高,被GCD调度的机会就越大,越优先执行
    return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}

+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority {
    return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler"];
}

+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
    // 全局并发队列
    // 如果在子线程操作,并且没有指定子线程,就会开启一条全局子线程,其实就是对GCD的底层封装
    return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}

- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
    NSCParameterAssert(targetQueue != NULL);

    if (name == nil) {
        name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
    }
    // 指定队列去执行
    dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
    if (queue == NULL) return nil;

    dispatch_set_target_queue(queue, targetQueue);

    return [super initWithName:name queue:queue];
}

疑问?initWithName方法中指定队列,为什么是DISPATCH_QUEUE_SERIAL?
因为当前信号是一个个来的,要保证调度是异步的同时,还要保证调度是顺序执行的,使用DISPATCH_QUEUE_SERIAL是为了保证顺序执行。即牺牲了一定的并发性,来保证执行顺序

查看[self.backgroundScheduler schedule:block];方法源码

- (RACDisposable *)schedule:(void (^)(void))block {
    NSCParameterAssert(block != NULL);
    RACDisposable *disposable = [[RACDisposable alloc] init];
    dispatch_async(self.queue, ^{
        if (disposable.disposed) return;
        [self performAsCurrentScheduler:block];
    });

    return disposable;
}

- (void)performAsCurrentScheduler:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    // If we're using a concurrent queue, we could end up in here concurrently,
    // in which case we *don't* want to clear the current scheduler immediately
    // after our block is done executing, but only *after* all our concurrent
    // invocations are done.
    
    RACScheduler *previousScheduler = RACScheduler.currentScheduler;
    // KVC --  backGround
    // RACSchedulerCurrentSchedulerKey对应的值就是 创建的后台调度者
    NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
    // autoreleasepool的作用:使用的地方 1. 大量临时变量 2. 自定义线程 3. 执行非UI操作
    // 当前是调度一些线程中的东西,并且也不是在进行UI操作,所以这里使用autoreleasepool
    // 使用的第二个作用,延迟其生命周期,防止析构释放
    @autoreleasepool {
        block();
    }

    if (previousScheduler != nil) {
        NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
    } else {
        // 移除防止KVC找不到
        [NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
    }
}

在子线程调度一次之后,下次就会直接从NSThread.currentThread.threadDictionary中取出previousScheduler,会优化掉一些性能问题;如果是在主线程的话直接拿出RACScheduler.mainThreadScheduler

RACDisposable销毁

RACDisposable并不是真正的销毁,只是类似于信号响应的销毁,下面分析原因:

  • 前面我们学习了sendNext会随着页面生命周期销毁,我们接着这一块继续看源码
- (instancetype)init {
    self = [super init];

    @unsafeify(self);

    RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
        @strongify(self);

        @synchronized (self) {
            self.next = nil;
            self.error = nil;
            self.completed = nil;
        }
    }];
    // RACCompoundDisposable 复合销毁者,里面有很多个 单个销毁者
    // 原因是 随着这一次的销毁,里面需要销毁的东西有很多,就构成了 复合销毁者
    _disposable = [RACCompoundDisposable compoundDisposable];
    [_disposable addDisposable:selfDisposable];

    return self;
}

- (void)dealloc {
    // 单个销毁者
    [self.disposable dispose];
}
  • 继续跟进源码,分析addDisposable方法
// 所有销毁者 +
// 该方法主要作用:就是往数组中添加单个销毁者
- (void)addDisposable:(RACDisposable *)disposable {
    NSCParameterAssert(disposable != self);
    if (disposable == nil || disposable.disposed) return;

    BOOL shouldDispose = NO;
    // 加锁的原因:防止线程不安全 。读写操作必须添加一个锁保证线程安全
    pthread_mutex_lock(&_mutex);
    {
        // 往数组添加值
        if (_disposed) {
            shouldDispose = YES;
        } else {
            // 性能调试 得出  RACCompoundDisposableInlineCount = 2
            // RACCompoundDisposableInlineCount 这个值是2
            #if RACCompoundDisposableInlineCount
            for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
                if (_inlineDisposables[i] == nil) {
                    // 数组中添加单个销毁者
                    _inlineDisposables[i] = disposable;
                    goto foundSlot;
                }
            }
            #endif
            // 如果 RACCompoundDisposableInlineCount 不是等于2 就会执行下面的代码
            if (_disposables == NULL) _disposables = RACCreateDisposablesArray();
            CFArrayAppendValue(_disposables, (__bridge void *)disposable);

            if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) {
                RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
            }

        #if RACCompoundDisposableInlineCount
        foundSlot:;
        #endif
        }
    }
    pthread_mutex_unlock(&_mutex);

    // Performed outside of the lock in case the compound disposable is used
    // recursively.
    // 数组数量超过2就销毁,腾出空间让数组添加
    if (shouldDispose) [disposable dispose];
}
  • addDisposable方法的核心就是[disposable dispose];,下面继续跟进分析RACCompoundDisposabledispose
- (void)dispose {
    #if RACCompoundDisposableInlineCount
    RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
    #endif

    CFArrayRef remainingDisposables = NULL;

    // 锁
    pthread_mutex_lock(&_mutex);
    {
        _disposed = YES;

        #if RACCompoundDisposableInlineCount
        // 把所有的容器置空
        for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
            // 完全拷贝/部分拷贝 的区别,自己下去了解学习
            inlineCopy[i] = _inlineDisposables[i];
            _inlineDisposables[i] = nil;
        }
        #endif

        remainingDisposables = _disposables;
        // 这时整个数组为NULL
        _disposables = NULL;
    }
    pthread_mutex_unlock(&_mutex);

    #if RACCompoundDisposableInlineCount
    // Dispose outside of the lock in case the compound disposable is used
    // recursively.
    for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
        
        // dispose 瞬间销毁 ---> while 耗时
        [inlineCopy[i] dispose];
    }
    #endif

    if (remainingDisposables == NULL) return;

    CFIndex count = CFArrayGetCount(remainingDisposables);
    // 所有符合式的容器 去调用 disposeEach 这个函数
    // disposeEach函数就是 遍历去销毁
    CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
    CFRelease(remainingDisposables);
}
  • 上面我们了解了复合式销毁,下面查看disposeEach函数,了解单个销毁
// 外面 --> 用compund - 有值 数组
// 数组 --> dispose --- 销毁对象
static void disposeEach(const void *value, void *context) {
    RACDisposable *disposable = (__bridge id)value;
    [disposable dispose];
}

- (void)dispose {
    void (^disposeBlock)(void) = NULL;
    while (YES) {
        // 临时变量
        // _disposeBlock 就是 我们使用信号时的销毁回调
        /* [RACDisposable disposableWithBlock:^{
            // 销毁信号
            NSLog(@"开始销毁");
        }]; */
        void *blockPtr = _disposeBlock;
        // 防止内存偏移 因为多线程操作 很可以出现线程把性能的内存地址比较差 销毁 进行偏移
        
        // A B C 表示OSAtomicCompareAndSwapPtrBarrier函数的三个参数
        // A VS C 参数是否相等?(比较的是内存地址)
        // 如果相等的话 ---> 就把NULL给到_disposeBlock地址空间进行销毁  --> 整个函数会返回YES
        // 没有匹配上,整个函数返回NO,找不到就死循环
        // OSAtomicCompareAndSwapPtrBarrier函数的作用就是 必须找到
        if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
            // 一直找 ---> disposable --
            // 包括自身也进行销毁
            if (blockPtr != (__bridge void *)self) {
                disposeBlock = CFBridgingRelease(blockPtr);
            }
            break;
        }
    }

    if (disposeBlock != nil) disposeBlock();
}

// 析构的时候会再次把_disposeBlock地址空间置为NULL,以防止之前没有销毁掉
- (void)dealloc {
    if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;
    CFRelease(_disposeBlock);
    _disposeBlock = NULL;
}

RAC的回调核心里面都有一个disposable销毁,subscribeNext信号订阅方法也会返回一个disposable,可以随时随地进行销毁。

案例三:subscribeNext方法获取到disposable进行销毁,不执行订阅回调

- (void)viewDidLoad {
    [super viewDidLoad];
    // 1:创建信号
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
       
        @weakify(self)
        RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber>  _Nonnull subscriber) {
            @strongify(self)
            NSLog(@"来了,请求网络");
            // 3:发送信号
            [subscriber sendNext:@"Hello World"];
            // 4:销毁
            // return self.dis; 销毁回调
            return [RACDisposable disposableWithBlock:^{
                NSLog(@"我们销毁了");
            }];
        }];
        // 2: 订阅信号
        RACDisposable *disp = [signal subscribeNext:^(id  _Nullable x) {
            NSLog(@"0:订阅到了:%@",x);
        }];
        
        [disp dispose];
    });
}

// 打印日志
2022-12-09 22:36:19.796853+0800 001---RAC底层分析[13481:553240] 来了,请求网络
2022-12-09 22:36:19.797013+0800 001---RAC底层分析[13481:553240] 我们销毁了
RACDisposable销毁流程

RACObserve

RAC中使用KVO的方式如下,我们跟进一下源码看看其原理是什么?

self.name = @"hello world";
[RACObserve(self, name) subscribeNext:^(id  _Nullable x) {
    NSLog(@"%@",x);
}];
  • 查看RACObserve方法,发现是一个宏定义,_RACObserve是一个内部的宏定义
#define RACObserve(TARGET, KEYPATH) _RACObserve(TARGET, KEYPATH)

//  \代表换行符号
#define _RACObserve(TARGET, KEYPATH) \
({ \
    __weak id target_ = (TARGET); \
    [target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self]; \
})

// 我们发现 rac_valuesForKeyPath 返回的是RACSignal,有了signal就可以进行订阅信号 [signal subscribeNext:]
#if OS_OBJECT_HAVE_OBJC_SUPPORT
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak NSObject *)observer;
#else
// Swift builds with OS_OBJECT_HAVE_OBJC_SUPPORT=0 for Playgrounds and LLDB :(
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(NSObject *)observer;
#endif
  • 查看rac_valuesForKeyPath方法的实现,其中核心方法是rac_valuesAndChangesForKeyPath
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak NSObject *)observer {
    // rac_valuesAndChangesForKeyPath 值发生改变的观察
    return [[[self
        rac_valuesAndChangesForKeyPath:keyPath options:NSKeyValueObservingOptionInitial observer:observer]
        map:^(RACTuple *value) {
            // -map: because it doesn't require the block trampoline that -reduceEach: uses
            return value[0];
        }]
        setNameWithFormat:@"RACObserve(%@, %@)", RACDescription(self), keyPath];
}
  • 查看rac_valuesAndChangesForKeyPath源码实现,其中的核心是看他怎么进行block回传的
- (RACSignal *)rac_valuesAndChangesForKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver {
    NSObject *strongObserver = weakObserver;
    keyPath = [keyPath copy];

    NSRecursiveLock *objectLock = [[NSRecursiveLock alloc] init];
    objectLock.name = @"org.reactivecocoa.ReactiveObjC.NSObjectRACPropertySubscribing";

    __weak NSObject *weakSelf = self;

    RACSignal *deallocSignal = [[RACSignal
        zip:@[
            // 发生销毁的时候就会有 销毁信号rac_willDeallocSignal
            self.rac_willDeallocSignal,
            strongObserver.rac_willDeallocSignal ?: [RACSignal never]
        ]]
        doCompleted:^{
            // Forces deallocation to wait if the object variables are currently
            // being read on another thread.
            [objectLock lock];
            @onExit {
                [objectLock unlock];
            };
        }];

    return [[[RACSignal
        createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
            // Hold onto the lock the whole time we're setting up the KVO
            // observation, because any resurrection that might be caused by our
            // retaining below must be balanced out by the time -dealloc returns
            // (if another thread is waiting on the lock above).
            [objectLock lock];
            @onExit {
                [objectLock unlock];
            };
            __strong NSObject *observer __attribute__((objc_precise_lifetime)) = weakObserver;
            __strong NSObject *self __attribute__((objc_precise_lifetime)) = weakSelf;

            if (self == nil) {
                [subscriber sendCompleted];
                return nil;
            }
            // KVO --- value -- block()
            // KVO --- 发现有值发生改变 - 调用block
            // 值发生改变,就会触发RAC中 KVO的回调 subscribeNext:^{}
            return [self rac_observeKeyPath:keyPath options:options observer:observer block:^(id value, NSDictionary *change, BOOL causedByDealloc, BOOL affectedOnlyLastComponent) {
                [subscriber sendNext:RACTuplePack(value, change)];
            }];
        }]
        takeUntil:deallocSignal]
        setNameWithFormat:@"%@ -rac_valueAndChangesForKeyPath: %@ options: %lu observer: %@", RACDescription(self), keyPath, (unsigned long)options, RACDescription(strongObserver)];
}
  • 继续跟进源码中的rac_observeKeyPath:方法,其核心是RACKVOTrampoline信息反弹对象
- (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {
    NSCParameterAssert(block != nil);
    NSCParameterAssert(keyPath.rac_keyPathComponents.count > 0);

    keyPath = [keyPath copy];

    NSObject *strongObserver = weakObserver;
    // KVO -- key -- person.dog.name
    // KVO 能观察键值 还能观察路由 比如可以观察 Dog.name
    NSArray *keyPathComponents = keyPath.rac_keyPathComponents;
    BOOL keyPathHasOneComponent = (keyPathComponents.count == 1);
    NSString *keyPathHead = keyPathComponents[0];
    NSString *keyPathTail = keyPath.rac_keyPathByDeletingFirstKeyPathComponent;
    // KVO中创建了很多的变量等信息,这些东西都需要销毁,于是创建了复合式销毁对象
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

    // The disposable that groups all disposal necessary to clean up the callbacks
    // added to the value of the first key path component.
    RACSerialDisposable *firstComponentSerialDisposable = [RACSerialDisposable serialDisposableWithDisposable:[RACCompoundDisposable compoundDisposable]];
    RACCompoundDisposable * (^firstComponentDisposable)(void) = ^{
        return (RACCompoundDisposable *)firstComponentSerialDisposable.disposable;
    };

    // RACCompoundDisposable dispoable
    [disposable addDisposable:firstComponentSerialDisposable];

    BOOL shouldAddDeallocObserver = NO;
    // 这里self 就是外部调用者的对象 self,这里是拿到self的属性列表
    objc_property_t property = class_getProperty(object_getClass(self), keyPathHead.UTF8String);
    if (property != NULL) {
        rac_propertyAttributes *attributes = rac_copyPropertyAttributes(property);
        if (attributes != NULL) {
            @onExit {
                free(attributes);
            };

            BOOL isObject = attributes->objectClass != nil || strstr(attributes->type, @encode(id)) == attributes->type;
            BOOL isProtocol = attributes->objectClass == NSClassFromString(@"Protocol");
            BOOL isBlock = strcmp(attributes->type, @encode(void(^)())) == 0;
            BOOL isWeak = attributes->weak;

            // If this property isn't actually an object (or is a Class object),
            // no point in observing the deallocation of the wrapper returned by
            // KVC.
            //
            // If this property is an object, but not declared `weak`, we
            // don't need to watch for it spontaneously being set to nil.
            //
            // Attempting to observe non-weak properties will result in
            // broken behavior for dynamic getters, so don't even try.
            shouldAddDeallocObserver = isObject && isWeak && !isBlock && !isProtocol;
        }
    }

    // Adds the callback block to the value's deallocation. Also adds the logic to
    // clean up the callback to the firstComponentDisposable.
    void (^addDeallocObserverToPropertyValue)(NSObject *) = ^(NSObject *value) {
        if (!shouldAddDeallocObserver) return;

        // If a key path value is the observer, commonly when a key path begins
        // with "self", we prevent deallocation triggered callbacks for any such key
        // path components. Thus, the observer's deallocation is not considered a
        // change to the key path.
        if (value == weakObserver) return;

        NSDictionary *change = @{
            NSKeyValueChangeKindKey: @(NSKeyValueChangeSetting),
            NSKeyValueChangeNewKey: NSNull.null,
        };

        RACCompoundDisposable *valueDisposable = value.rac_deallocDisposable;
        RACDisposable *deallocDisposable = [RACDisposable disposableWithBlock:^{
            block(nil, change, YES, keyPathHasOneComponent);
        }];

        [valueDisposable addDisposable:deallocDisposable];
        [firstComponentDisposable() addDisposable:[RACDisposable disposableWithBlock:^{
            [valueDisposable removeDisposable:deallocDisposable];
        }]];
    };

    // Adds the callback block to the remaining path components on the value. Also
    // adds the logic to clean up the callbacks to the firstComponentDisposable.
    void (^addObserverToValue)(NSObject *) = ^(NSObject *value) {
        RACDisposable *observerDisposable = [value rac_observeKeyPath:keyPathTail options:(options & ~NSKeyValueObservingOptionInitial) observer:weakObserver block:block];
        [firstComponentDisposable() addDisposable:observerDisposable];
    };

    // Observe only the first key path component, when the value changes clean up
    // the callbacks on the old value, add callbacks to the new value and call the
    // callback block as needed.
    //
    // Note this does not use NSKeyValueObservingOptionInitial so this only
    // handles changes to the value, callbacks to the initial value must be added
    // separately.
    
    NSKeyValueObservingOptions trampolineOptions = (options | NSKeyValueObservingOptionPrior) & ~NSKeyValueObservingOptionInitial;
    // RACKVOTrampoline 表示所有信息的反弹
    // KVO: info 面向对象
    RACKVOTrampoline *trampoline = [[RACKVOTrampoline alloc] initWithTarget:self observer:strongObserver keyPath:keyPathHead options:trampolineOptions block:^(id trampolineTarget, id trampolineObserver, NSDictionary *change) {
        // If this is a prior notification, clean up all the callbacks added to the
        // previous value and call the callback block. Everything else is deferred
        // until after we get the notification after the change.
        if ([change[NSKeyValueChangeNotificationIsPriorKey] boolValue]) {
            [firstComponentDisposable() dispose];

            if ((options & NSKeyValueObservingOptionPrior) != 0) {
                block([trampolineTarget valueForKeyPath:keyPath], change, NO, keyPathHasOneComponent);
            }

            return;
        }

        // From here the notification is not prior.
        NSObject *value = [trampolineTarget valueForKey:keyPathHead];

        // If the value has changed but is nil, there is no need to add callbacks to
        // it, just call the callback block.
        if (value == nil) {
            block(nil, change, NO, keyPathHasOneComponent);
            return;
        }

        // From here the notification is not prior and the value is not nil.

        // Create a new firstComponentDisposable while getting rid of the old one at
        // the same time, in case this is being called concurrently.
        RACDisposable *oldFirstComponentDisposable = [firstComponentSerialDisposable swapInDisposable:[RACCompoundDisposable compoundDisposable]];
        [oldFirstComponentDisposable dispose];

        addDeallocObserverToPropertyValue(value);

        // If there are no further key path components, there is no need to add the
        // other callbacks, just call the callback block with the value itself.
        if (keyPathHasOneComponent) {
            block(value, change, NO, keyPathHasOneComponent);
            return;
        }

        // The value has changed, is not nil, and there are more key path components
        // to consider. Add the callbacks to the value for the remaining key path
        // components and call the callback block with the current value of the full
        // key path.
        addObserverToValue(value);
        block([value valueForKeyPath:keyPathTail], change, NO, keyPathHasOneComponent);
    }];

    // Stop the KVO observation when this one is disposed of.
    [disposable addDisposable:trampoline];

    // Add the callbacks to the initial value if needed.
    NSObject *value = [self valueForKey:keyPathHead];
    if (value != nil) {
        addDeallocObserverToPropertyValue(value);

        if (!keyPathHasOneComponent) {
            addObserverToValue(value);
        }
    }

    // Call the block with the initial value if needed.
    if ((options & NSKeyValueObservingOptionInitial) != 0) {
        id initialValue = [self valueForKeyPath:keyPath];
        NSDictionary *initialChange = @{
            NSKeyValueChangeKindKey: @(NSKeyValueChangeSetting),
            NSKeyValueChangeNewKey: initialValue ?: NSNull.null,
        };
        block(initialValue, initialChange, NO, keyPathHasOneComponent);
    }


    RACCompoundDisposable *observerDisposable = strongObserver.rac_deallocDisposable;
    RACCompoundDisposable *selfDisposable = self.rac_deallocDisposable;
    // Dispose of this observation if the receiver or the observer deallocate.
    [observerDisposable addDisposable:disposable];
    [selfDisposable addDisposable:disposable];

    return [RACDisposable disposableWithBlock:^{
        [disposable dispose];
        [observerDisposable removeDisposable:disposable];
        [selfDisposable removeDisposable:disposable];
    }];
}
  • 跟进[[RACKVOTrampoline alloc] initWithTarget...方法
- (instancetype)initWithTarget:(__weak NSObject *)target observer:(__weak NSObject *)observer keyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options block:(RACKVOBlock)block {
    NSCParameterAssert(keyPath != nil);
    NSCParameterAssert(block != nil);

    NSObject *strongTarget = target;
    if (strongTarget == nil) return nil;

    self = [super init];
    // 属性保存法
    _keyPath = [keyPath copy];

    _block = [block copy];
    _weakTarget = target;
    _unsafeTarget = strongTarget;
    _observer = observer;

    // 移交代理 --- 观察对象
    // VC ---> person dog (一个VC里面有很多个对象)
    [RACKVOProxy.sharedProxy addObserver:self forContext:(__bridge void *)self];
    [strongTarget addObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath options:options context:(__bridge void *)self];

    [strongTarget.rac_deallocDisposable addDisposable:self];
    [self.observer.rac_deallocDisposable addDisposable:self];

    return self;
}
  • 继续跟进[RACKVOProxy.sharedProxy addObserver...方法
    核心是RACKVOProxy移交代理 、trampolines漫游表
// RACKVOProxy 最主要是创建trampolines漫游表 并且把监听到的值 扔给漫游表
// trampolines漫游表 将 监听的值 返回回去
// RACKVOProxy添加监听观察者
- (void)addObserver:(__weak NSObject *)observer forContext:(void *)context {
    NSValue *valueContext = [NSValue valueWithPointer:context];
    dispatch_sync(self.queue, ^{
        [self.trampolines setObject:observer forKey:valueContext];
    });
}

// RACKVOProxy的初始化 使用了DISPATCH_QUEUE_SERIAL串行队列 确保顺序不会错乱
// _trampolines保证了漫游表是一一对应的
- (instancetype)init {
    self = [super init];
    // name dog
    // name person
    _queue = dispatch_queue_create("org.reactivecocoa.ReactiveObjC.RACKVOProxy", DISPATCH_QUEUE_SERIAL);
    _trampolines = [NSMapTable strongToWeakObjectsMapTable];

    return self;
}

// RACKVOProxy发现值发生改变
// name age ---> vlaue --> 观察对象 dog person
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
    NSValue *valueContext = [NSValue valueWithPointer:context];
    __block NSObject *trueObserver;

    dispatch_sync(self.queue, ^{
        trueObserver = [self.trampolines objectForKey:valueContext];
    });

    if (trueObserver != nil) {
       // 将所有的值 都会返回到 RACKVOTrampoline 漫游表里面去
        [trueObserver observeValueForKeyPath:keyPath ofObject:object change:change context:context];
    }
}
  • RACKVOTrampoline漫游表
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
    if (context != (__bridge void *)self) {
        [super observeValueForKeyPath:keyPath ofObject:object change:change context:context];
        return;
    }

    RACKVOBlock block;
    id observer;
    id target;

    // 面向对象 的化整为零
    @synchronized (self) {
        block = self.block;
        observer = self.observer;
        target = self.weakTarget;
    }

    if (block == nil || target == nil) return;

    block(target, observer, change);
}
  • RACKVOProxy移除观察者,其实就是在RACKVOTrampoline的析构deallocdispose里面进行移除了
- (void)dealloc {
    [self dispose];
}

#pragma mark Observation

- (void)dispose {
    NSObject *target;
    NSObject *observer;

    @synchronized (self) {
        _block = nil;

        // The target should still exist at this point, because we still need to
        // tear down its KVO observation. Therefore, we can use the unsafe
        // reference (and need to, because the weak one will have been zeroed by
        // now).
        target = self.unsafeTarget;
        observer = self.observer;

        _unsafeTarget = nil;
        _observer = nil;
    }
    // coupsdis
    [target.rac_deallocDisposable removeDisposable:self];
    [observer.rac_deallocDisposable removeDisposable:self];

    [target removeObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath context:(__bridge void *)self];
    [RACKVOProxy.sharedProxy removeObserver:self forContext:(__bridge void *)self];
}
RACObserve观察流程
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容