第一部分 RACSubject
之前有提到过RACSignal是不具备发送信号的能力的,但是RACSubject这个类就可以做到订阅/发送为一体。
先看段代码:
//1创建信号,
RACSubject * subject = [RACSubject subject];
//2订阅信号
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"%@",x);
}];
//3发送数据
[subject sendNext:@"发送数据"];
1.创建信号
RACSubject继承自RACSignal,进入源码内部看看
//RACSubject.m
+ (instancetype)subject {
return [[self alloc] init];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
RACSubject有2个属性。一个RACCompoundDisposable类型,一个可变数组。
//
// This should only be used while synchronized on `self`.
@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
// Contains all of the receiver's subscriptions to other signals.
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
从上面的注释可以看出一个是用来取消信号的,一个保存订阅者得。
2.订阅信号
//RACSignal.m
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
//进入subscriberWithNext本质就是创建一个RACSubscriber对象。RACSubscriber.m
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy]; //sendNext:的时候会用到。
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
//进入subscribe。RACSubject.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber]; //把创建的RACSubscriber对象保存到subscribers数组中
}
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];
return disposable;
}
通过上面的代码,可知订阅(subscribeNext)的本质就是,用一个block,生成一个RACSubscriber对象,对象有属性指向这个block,并把RACSubscriber对象放到signal中(RACSubscriber中有个可变数组),等有信号的时候通过这个subscriber调用block即可。也就是subscribeNext:后面的block执行。一句话,订阅就是把后面的block放到信号里,有信号的时候就执行。
3.发送数据
RACSubject.m
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value]; //调用RACSubscriber存在next属性中的block。
}];
}
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
遍历_ subscribers数组,发送数据。最后调用RACSubscriber的sendNext:方法,其只是调用了存于next属性中的block。
小结
1、创建的subject的是内部会创建一个数组_subscribers用来保存所有的订阅者
2、订阅信息的时候会创建订阅者,并且保存到数组中
3、遍历subject中_subscribers中的订阅者,依次发送信息
对比RACSignal,他们之间不同的地方是:他可以被订阅多次,并且只能是先订阅后发布。
//----多次订阅----//
//创建信号,
RACSubject * subject = [RACSubject subject];
//发送数据
[subject sendNext:@"想发送数据看看能不能收到?"];
//订阅信号
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"1-%@",x);
}];
//再订阅一次
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"2-%@",x);
}];
//发送数据
[subject sendNext:@"发送数据,这个数据能收到!"];
打印的数据如下
1-发送数据,这个数据能收到!
2-发送数据,这个数据能收到!
这就验证了,需要先订阅,然后发送的数据才能收到,并且可以多次订阅。
如果我想先发送再订阅,并且也要能收到怎么处理呢?这就需要RACReplaySubject啦!
第二部分 RACReplaySubject
RACReplaySubject,他继承RACSubject,他的目的就是为例解决上面必须先订阅后发送的问题。
RACReplaySubject * replaySubject = [[RACReplaySubject alloc] init];
[replaySubject sendNext: @"试试这个能不能收到?"];
[replaySubject subscribeNext:^(id _Nullable x) {
NSLog(@"%@",x);
}];
先发送信号,然后再订阅,依然可以打印出结果。
看看源码这个是怎么实现的。
1. 创建信号
//生成RACReplaySubject对象。
- (instancetype)init {
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
2.发送信号
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
可以看到代码中在发送之前做了一件事情,把要发送的数据保存到数组中,然后调用父类的发送方法,发送完了看发送成功了没,成功了就删除数据,避免一个数据多次发送。
3.订阅信号
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
小结
1、创建的时候会在父类的基础之上多做一步,创建一个数组用来保存发送的数据
2、发送数据,但是此时发送会失败啊,为什么?因为没有人订阅啊,我发给谁啊。
3、订阅信号,先遍历一次保存数据的数组,如果有就执行2 。