ReactiveCocoa学习笔记<一> RACSignal 工作原理

ReactiveCocoa 中最核心的概念之一就是信号RACStream。RACStream中有两个子类——RACSignal 和 RACSequence。本文先来分析RACSignal。

学习之前先来一段使用RACSignal的代码

RACSignal *signal = [RACSignal createSignal:
                     ^RACDisposable *(id<RACSubscriber> subscriber)
{
    [subscriber sendNext:@1];
    [subscriber sendNext:@2];
    [subscriber sendNext:@3];
    [subscriber sendCompleted];
    return [RACDisposable disposableWithBlock:^{
        NSLog(@"signal dispose");
    }];
}];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
    NSLog(@"subscribe value = %@", x);
} error:^(NSError *error) {
    NSLog(@"error: %@", error);
} completed:^{
    NSLog(@"completed");
}];

[disposable dispose];

这是RACSignal被订阅的整个过程,我们通过源码来看看整个过程中发生了什么.

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
    return [RACDynamicSignal createSignal:didSubscribe];
}

RACSignal创建的方法内部是交由RACDynamicSignal来实现的, 同时接收了一个didSubscribeblock

RACDisposable * (^)(id<RACSubscriber> subscriber)   

这个block的参数是一个遵循<RACSubscriber>协议的对象,同时返回一个RACDisposable对象.

@interface RACDynamicSignal ()

// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);

@end


+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
    RACDynamicSignal *signal = [[self alloc] init];
    signal->_didSubscribe = [didSubscribe copy];
    return [signal setNameWithFormat:@"+createSignal:"];
}

RACDynamicSignal这个类非常简单,只是保存了一个didSubscribeblock, 调用createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe方法 只是将didSubscribe保存, 并给RACSignal设置name

setNameWithFormat:是RACStream中实现的方法, RACDynamicSignal是RACStream的子类.

至此,一个基本的RACSignal就被创建完成. 那么didSubscribe这个block是在什么被调用的呢?

我们来接来看subscribeNext:(void (^)(id x))nextBlock这个方法

- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    NSCParameterAssert(nextBlock != NULL);
    
    RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    return [self subscribe:o];
}

这个方法也非常简单,在内部创建了一个RACSubscriber对象, 还记得在创建RACSignal中 block的参数id<RACSubscriber> subscriber吗? 不过RACSubscriber *o这个对象并不是subscriber,但是也有一些关系,至于是什么关系继续往下看就知道.

@interface RACSubscriber ()

// These callbacks should only be accessed while synchronized on self.
@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);

@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

这个对象内部只保存了4个属性分别是 3个block next error completed 和一个 RACCompoundDisposable *disposable(RACCompoundDisposable这个东西等下再说), 同时RACSubscriber遵循<RACSubscriber>协议

@protocol RACSubscriber <NSObject>
@required
- (void)sendNext:(nullable id)value;
- (void)sendError:(nullable NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end

那么这个对象是怎么被传递给block并被调用的呢,我们接着往下看 [self subscribe:o]方法, 这个方法实际上调用的是-[RACDynamicSignal subscribe:]方法.

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    if (self.didSubscribe != NULL) {
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            [disposable addDisposable:innerDisposable];
        }];

        [disposable addDisposable:schedulingDisposable];
    }
    
    return disposable;
}

在这个方法中创建一个RACCompoundDisposable *disposable对象,并return disposable.
同时创建了RACPassthroughSubscriber:同样遵循<RACSubscriber>协议, 作用是转发所有事件给另一个subscriber.

- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
    NSCParameterAssert(subscriber != nil);

    self = [super init];

    _innerSubscriber = subscriber;
    _signal = signal;
    _disposable = disposable;

    [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
    return self;
}

初始化RACPassthroughSubscriber对象过程中,保存了-[RACDynamicSignal subscribe:]传来的参数subscriber 作为 _innerSubscriber

接着往下看 RACScheduler.subscriptionScheduler

+ (RACScheduler *)subscriptionScheduler {
    static dispatch_once_t onceToken;
    static RACScheduler *subscriptionScheduler;
    dispatch_once(&onceToken, ^{
        subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
    });

    return subscriptionScheduler;
}

RACScheduler:使用这个类用来在指定的时间和线程(队列)中执行工作.
RACScheduler.subscriptionScheduler返回的是一个单例.在满足下面两个条件时才使用这个方法:

  • subscription发生时,存在有效的+currentScheduler
  • 回调block需要立刻被执行

接着RACScheduler会调用schedule:方法

//RACSubscriptionScheduler
 - (instancetype)init {
        self = [super initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.subscriptionScheduler"];
    
        _backgroundScheduler = [RACScheduler scheduler];
    
        return self;
    }   
 - (RACDisposable *)schedule:(void (^)(void))block {
        NSCParameterAssert(block != NULL);
    
        if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
    
        block();
        return nil;
    }  

//RACScheduler
+ (RACScheduler *)currentScheduler {
    RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
    if (scheduler != nil) return scheduler;
    if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

    return nil;
}
+ (RACScheduler *)mainThreadScheduler {
    static dispatch_once_t onceToken;
    static RACScheduler *mainThreadScheduler;
    dispatch_once(&onceToken, ^{
        mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
    });
    
    return mainThreadScheduler;
}

如果RACScheduler.currentScheduler != nil 直接调用block回调, 否则调用[self.backgroundScheduler schedule:block]方法.

所以schedule:(void (^)(void))block的入参block会被立刻执行.

RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];

这是在block中执行的操作,在这里第一个方法
createSignal: ^RACDisposable *(id<RACSubscriber> subscriber
传入的block被调用同时将RACPassthroughSubscriber *subscriber对象作为block的参数传递了出去.

[subscriber sendNext:@1];

这是在block中执行的操作, 实际上调用的

- (void)sendNext:(id)value {
    if (self.disposable.disposed) return;

    if (RACSIGNAL_NEXT_ENABLED()) {
        RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
    }

    [self.innerSubscriber sendNext:value];
}   

-[RACPassthroughSubscriber sendNext:],这个方法中有调用了[self.innerSubscriber sendNext:value]

- (void)sendNext:(id)value {
    @synchronized (self) {
        void (^nextBlock)(id) = [self.next copy];
        if (nextBlock == nil) return;

        nextBlock(value);
    }
}     

这是-[RACSubscriber sendNext:]方法的实现,在这里调用了
RACDisposable *disposable = [signal subscribeNext:^(id x){};
传入的block.同时这个block的调用时线程安全的

到这里一个完整的signal响应事件, 从订阅到响应算是完整了.

回过头来重新整理一下整个RACSignal的整个流程

  1. 创建RACSignal对象,在RACDynamicSignal中保存didSubscribe
  2. 订阅RACSignal,创建RACSubscriber 保存sendNext, sendError, sendComplete block
  3. 调用-[RACDynamicSignal subscribe:] 以第二步创建的RACSubscriber对象作为_innerSubscriber创建RACPassthroughSubscriber, 创建RACScheduler.subscriptionScheduler 在schedule:回调block中 执行在第一步中保存的didSubscribe block
  4. didSubscribe block的参数 subscriber 调用sendNext:等方法 执行第二步中保存的相应的block.

RACDisposable

在整个过程有很多RACDisposable存在,接下来我们来讨论一下RACDisposable这个类的作用.

RACDisposable有三个子类:

RACDisposable:

主要是用来阻断信号的作用.
主要方法有两个分别是:

- (BOOL)isDisposed {
    
    return _disposeBlock == NULL;
}

_disposeBlock是一个指针指向这个对象本身或者指向block, 最终这个指针指向哪个有初始化的方法决定

- (instancetype)init {
    self = [super init];

    _disposeBlock = (__bridge void *)self;
    OSMemoryBarrier();

    return self;
}

- (instancetype)initWithBlock:(void (^)(void))block {
    NSCParameterAssert(block != nil);

    self = [super init];

    _disposeBlock = (void *)CFBridgingRetain([block copy]); 
    OSMemoryBarrier();

    return self;
}

如果使用-init方法初始化指向对象本身, 如果使用-initWithBlock:方法初始化则指向block

第二个方法是

- (void)dispose {
    void (^disposeBlock)(void) = NULL;

    while (YES) {
        void *blockPtr = _disposeBlock;
        if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
            if (blockPtr != (__bridge void *)self) {
                disposeBlock = CFBridgingRelease(blockPtr);
            }

            break;
        }
    }

    if (disposeBlock != nil) disposeBlock();
}

调用-dispose如果指针指向block则调用block并且是指针指向NULL,否则直接指向NULL.

工作流程:当调用 - dispose方法后_disposeBlock == NULL 在-isDisposed 返回结果为YES说明信号被阻断.

在上面[RACPassthroughSubscriber sendNext:]方法中首先会判断[self.disposealbe isDisposed]如果为YES则直接return,达到阻断信号的作用.

RACCompoundDisposable:

可以管理多个RACDisposable或他的子类,他内部持有一个数组_disposables存放了多个RACDisposable,调用-[RACCompoundDisposable dispose]是对_disposables中的每一个对象调用dispose方法.(dispose是RACDisposable的方法,所以他的子类都有这个方法,只是实现的方式不同)

在RACCompoundDisposable内部管理了两个数组

//一个简单的宏 
#define RACCompoundDisposableInlineCount 2 
//两个元素 的数组 优先使用
RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount]
//如果_disposables > _inlineDisposables 数量使用_disposables
CFMutableArrayRef _disposables;

- (void)dispose {
    #if RACCompoundDisposableInlineCount
    RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
    #endif

    CFArrayRef remainingDisposables = NULL;
    
    // 加锁
    pthread_mutex_lock(&_mutex);
    {   
        // 信号被阻断
        _disposed = YES;

        #if RACCompoundDisposableInlineCount
        for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
            inlineCopy[i] = _inlineDisposables[i];
            _inlineDisposables[i] = nil;
        }
        #endif

        remainingDisposables = _disposables;
        _disposables = NULL;
    }
    pthread_mutex_unlock(&_mutex);

    #if RACCompoundDisposableInlineCount
    // Dispose outside of the lock in case the compound disposable is used
    // recursively.
    for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
        [inlineCopy[i] dispose];
    }
    #endif

    if (remainingDisposables == NULL) return;

    CFIndex count = CFArrayGetCount(remainingDisposables);
    CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
    CFRelease(remainingDisposables);
}

在这个方法里面依次对管理的数组中的disposeable对象执行-dispose方法, 同时修改_disposed = YES

- (BOOL)isDisposed {
    pthread_mutex_lock(&_mutex);
    BOOL disposed = _disposed;
    pthread_mutex_unlock(&_mutex);

    return disposed;
}

RACScopedDisposable

+ (instancetype)scopedDisposableWithDisposable:(RACDisposable *)disposable {
    return [self disposableWithBlock:^{
        [disposable dispose];
    }];
}

- (void)dealloc {
    [self dispose];
}

- (RACScopedDisposable *)asScopedDisposable {
    // totally already are
    return self;
}

在这个类中实现了这三个方法在 在-dealloc方法中调用他自己的-dispose方法,同时在初始化中执行block 执行初始化方法入参的[disposable dispose]方法

RACSerialDisposable

内部持有一个RACDisposable,同时可以和外部交换RACDisposable

- (RACDisposable *)swapInDisposable:(RACDisposable *)newDisposable {
    RACDisposable *existingDisposable;
    BOOL alreadyDisposed;

    pthread_mutex_lock(&_mutex);
    alreadyDisposed = _disposed;
    if (!alreadyDisposed) {
        existingDisposable = _disposable;
        _disposable = newDisposable;
    }
    pthread_mutex_unlock(&_mutex);

    if (alreadyDisposed) {
        [newDisposable dispose];
        return nil;
    }

    return existingDisposable;
}

上面是swap方法实现, 首先声明一个BOOL alreadyDisposed记录此对象是_disposed 交换 newDisposable 和 _disposable 如果alreadyDisposed == YES 执行[newDisposable dispose]; return nil; 否则 返回 原来的_disposable

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335

推荐阅读更多精彩内容