一、栅栏函数
CPU
的乱序执行能力让我们对多线程的安全保障的努力变得异常困难。因此要保证线程安全,阻止CPU
换序是必需的。遗憾的是,现在并不存在可移植的阻止换序的方法。通常情况下是调用CPU
提供的一条指令,这条指令常常被称为barrier
。一条barrier
指令会阻止CPU
将该指令之前的指令交换到barrier
之后,反之亦然。换句话说,barrier
指令的作用类似于一个拦水坝,阻止换序穿透这个大坝。
栅栏函数最直接的作用:控制任务执行顺序,导致同步效果。
有两个函数:
-
dispatch_barrier_async
:前面的任务执行完毕才会执行barrier
中的逻辑,以及barrier
后加入队列的任务。 -
dispatch_barrier_sync
:作用相同,但是会堵塞线程,影响后面的任务执行 。
⚠️:栅栏函数只能控制同一队列并发,相当于针对队列而言。
1.1 应用
1.1.1 dispatch_barrier_async 与 dispatch_barrier_sync 效果
有如下案例:
- (void)test {
dispatch_queue_t concurrentQueue = dispatch_queue_create("HotpotCat", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
sleep(3);
NSLog(@"1");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"2");
});
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"3:%@",[NSThread currentThread]);
});
dispatch_async(concurrentQueue, ^{
NSLog(@"4");
});
NSLog(@"5");
}
分析:barrier
阻塞的是自己以及concurrentQueue
队列中在它后面加入的任务。由于这里使用的是异步函数所以任务1
,2
,5
顺序不定,3
在4
之前。
输出:
GCDDemo[49708:5622304] 5
GCDDemo[49708:5622437] 2
GCDDemo[49708:5622434] 1
GCDDemo[49708:5622434] 3:<NSThread: 0x600003439040>{number = 6, name = (null)}
GCDDemo[49708:5622434] 4
如果将dispatch_barrier_async
改为dispatch_barrier_sync
同步函数,则任务5
会被阻塞。1
、2
(顺序不定)在3
之前执行,4
和5
(顺序不定)在之后。
1.1.2 栅栏函数存在的问题
1.1.2.1 栅栏函数与全局队列
将concurrentQueue
改为全局队列:
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
dispatch_async(concurrentQueue, ^{
NSLog(@"1");
});
dispatch_async(concurrentQueue, ^{
NSLog(@"2");
});
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"3:%@",[NSThread currentThread]);
});
dispatch_async(concurrentQueue, ^{
NSLog(@"4");
});
NSLog(@"5");
输出:
GCDDemo[49872:5632760] 5
GCDDemo[49872:5632979] 1
GCDDemo[49872:5633673] 2
GCDDemo[49872:5633675] 4
GCDDemo[49872:5633674] 3:<NSThread: 0x600001160240>{number = 10, name = (null)}
这个时候栅栏函数无论同步还是异步都无效了(有可能系统调度刚好符合预期)。
这也就意味着全局并发队列不允许使用栅栏函数,一定是自定义队列才能使用。
1.1.2.1 栅栏函数与不同队列
将任务2
和4
放入另外一个队列:
dispatch_queue_t concurrentQueue = dispatch_queue_create("Hotpot", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t concurrentQueue2 = dispatch_queue_create("Cat", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
sleep(3);
NSLog(@"1");
});
dispatch_async(concurrentQueue2, ^{
NSLog(@"2");
});
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@"3:%@",[NSThread currentThread]);
});
dispatch_async(concurrentQueue2, ^{
NSLog(@"4");
});
NSLog(@"5");
输出:
GCDDemo[49981:5639766] 5
GCDDemo[49981:5640003] 2
GCDDemo[49981:5639998] 4
GCDDemo[49981:5639997] 1
GCDDemo[49981:5639998] 3:<NSThread: 0x600003761500>{number = 5, name = (null)}
这个时候concurrentQueue2
中的任务先执行了,它并不受栅栏函数的影响。那么说明 栅栏函数只对同一个队列中的任务起作用。
1.1.3 栅栏函数作为锁使用
有如下代码:
NSMutableArray *array = [NSMutableArray array];
dispatch_queue_t concurrentQueue = dispatch_queue_create("Hotpot", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 1000; i++) {
dispatch_async(concurrentQueue, ^{
[array addObject:@(i)];
});
}
- 多个线程同时操作
array
。 - 在
addObject
的时候有可能存在同一时间对同一块内存空间写入数据。
比如写第3
个数据的时候,当前数组中数据是(1、2)
这个时候有2
个线程同时写入数据就存在了(
1、2、3)和
(1、2、4)`这个时候数据就发生了混乱造成了错误。
在运行的时候由于线程不安全(可变数组线程不安全),发生了写入错误直接报错:
将数组添加元素的操作放入dispatch_barrier_async
中:
NSMutableArray *array = [NSMutableArray array];
dispatch_queue_t concurrentQueue = dispatch_queue_create("Hotpot", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 1000; i++) {
dispatch_async(concurrentQueue, ^{
dispatch_barrier_async(concurrentQueue , ^{
[array addObject:@(i)];
});
});
}
这样就没问题了,加入栅栏函数写入数据的时候相当于加了锁。
1.2 原理分析
根据1.1
中的案例有3
个问题:
- 1.为什么栅栏函数能起作用?
- 2.为什么全局队列无效?
- 3.为什么任务必须在同一队列才有效?
1.2.1 dispatch_barrier_sync
dispatch_barrier_sync
源码如下:
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
直接调用_dispatch_barrier_sync_f
:
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
仍然是对_dispatch_barrier_sync_f_inline
的调用:
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
//死锁走这里的逻辑,同步栅栏函数也走这里
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
栅栏函数这个时候走的也是_dispatch_sync_f_slow
逻辑:
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
......
_dispatch_trace_item_push(top_dq, &dsc);
//死锁报错
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
断点调试走的是_dispatch_sync_complete_recurse
:
static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
uintptr_t dc_flags)
{
bool barrier = (dc_flags & DC_FLAG_BARRIER);
do {
if (dq == stop_dq) return;
if (barrier) {
//唤醒执行
//_dispatch_lane_wakeup
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
} else {
//已经执行完成没有栅栏函数
_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
}
dq = dq->do_targetq;
barrier = (dq->dq_width == 1);
} while (unlikely(dq->do_targetq));
}
- 这里进行了递归调用,循环条件是
dq->do_targetq
也就是 仅对当前队列有效。 - 唤醒执行栅栏前任务执行
_dispatch_lane_wakeup
逻辑。 - 当栅栏前的任务执行完毕走
_dispatch_lane_non_barrier_complete
逻辑。这也就是为什么栅栏起作用的原因。
dx_wakeup
在全局队列是_dispatch_root_queue_wakeup
,在自定义并行队列是_dispatch_lane_wakeup
。
1.2.1.1 _dispatch_lane_wakeup
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
//barrier完成了就走这里的逻辑,barrier之前的任务执行完毕。
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
//走这里
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
- 在栅栏函数执行完毕后才走
_dispatch_lane_barrier_complete
与_dispatch_lane_non_barrier_complete
中的逻辑就汇合了。 - 没有执行完毕的时候执行
_dispatch_queue_wakeup
。
_dispatch_queue_wakeup
源码如下:
void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
......
if (likely((old_state ^ new_state) & enqueue)) {
......
//_dispatch_queue_push_queue 断点断不住,走这里。
return _dispatch_queue_push_queue(tq, dq, new_state);
}
......
}
最终走的是_dispatch_queue_push_queue
逻辑:
static inline void
_dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq,
uint64_t dq_state)
{
#if DISPATCH_USE_KEVENT_WORKLOOP
if (likely(_dq_state_is_base_wlh(dq_state))) {
_dispatch_trace_runtime_event(worker_request, dq._dq, 1);
return _dispatch_event_loop_poke((dispatch_wlh_t)dq._dq, dq_state,
DISPATCH_EVENT_LOOP_CONSUME_2);
}
#endif // DISPATCH_USE_KEVENT_WORKLOOP
_dispatch_trace_item_push(tq, dq);
//_dispatch_lane_concurrent_push
return dx_push(tq, dq, _dq_state_max_qos(dq_state));
}
内部是对_dispatch_lane_concurrent_push
的调用:
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
这里直接调用_dispatch_lane_push
:
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
......
if (flags) {
//栅栏函数走这里。
//#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
//dx_wakeup 对应 dq_wakeup 自定义全局队列对应 _dispatch_lane_wakeup,全局队列对应 _dispatch_root_queue_wakeup
return dx_wakeup(dq, qos, flags);
}
}
又调用回了_dispatch_lane_wakeup
,相当于一直扫描。
1.2.1.2 _dispatch_lane_non_barrier_complete
static void
_dispatch_lane_non_barrier_complete(dispatch_lane_t dq,
dispatch_wakeup_flags_t flags)
{
......
_dispatch_lane_non_barrier_complete_finish(dq, flags, old_state, new_state);
}
其中是对_dispatch_lane_non_barrier_complete_finish
的调用。
DISPATCH_ALWAYS_INLINE
static void
_dispatch_lane_non_barrier_complete_finish(dispatch_lane_t dq,
dispatch_wakeup_flags_t flags, uint64_t old_state, uint64_t new_state)
{
if (_dq_state_received_override(old_state)) {
// Ensure that the root queue sees that this thread was overridden.
_dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
}
if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
if (_dq_state_is_dirty(old_state)) {
//走_dispatch_lane_barrier_complete逻辑
return _dispatch_lane_barrier_complete(dq, 0, flags);
}
if ((old_state ^ new_state) & DISPATCH_QUEUE_ENQUEUED) {
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
}
dispatch_assert(!_dq_state_is_base_wlh(new_state));
_dispatch_trace_item_push(dq->do_targetq, dq);
return dx_push(dq->do_targetq, dq, _dq_state_max_qos(new_state));
}
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
_dispatch_release_2_tailcall(dq);
}
}
走的是_dispatch_lane_barrier_complete
逻辑:
DISPATCH_NOINLINE
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
dispatch_lane_t dq = dqu._dl;
if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
//串行队列
if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
if (_dispatch_object_is_waiter(dc)) {
//栅栏中的任务逻辑
return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
}
} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
return _dispatch_lane_drain_non_barriers(dq, dc, flags);
}
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
//执行栅栏后续的代码
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
-
_dispatch_lane_drain_barrier_waiter
执行栅栏函数中的任务。 -
_dispatch_lane_class_barrier_complete
执行栅栏函数后续的代码。
调用_dispatch_lane_drain_barrier_waiter
执行栅栏函数中的任务:
static void
_dispatch_lane_drain_barrier_waiter(dispatch_lane_t dq,
struct dispatch_object_s *dc, dispatch_wakeup_flags_t flags,
uint64_t enqueued_bits)
{
......
return _dispatch_barrier_waiter_redirect_or_wake(dq, dc, flags,
old_state, new_state);
}
直接调用_dispatch_barrier_waiter_redirect_or_wake
:
static void
_dispatch_barrier_waiter_redirect_or_wake(dispatch_queue_class_t dqu,
dispatch_object_t dc, dispatch_wakeup_flags_t flags,
uint64_t old_state, uint64_t new_state)
{
......
return _dispatch_waiter_wake(dsc, wlh, old_state, new_state);
}
调用_dispatch_waiter_wake
:
static void
_dispatch_waiter_wake(dispatch_sync_context_t dsc, dispatch_wlh_t wlh,
uint64_t old_state, uint64_t new_state)
{
dispatch_wlh_t waiter_wlh = dsc->dc_data;
if ((_dq_state_is_base_wlh(old_state) && !dsc->dsc_from_async) ||
_dq_state_is_base_wlh(new_state) ||
waiter_wlh != DISPATCH_WLH_ANON) {
_dispatch_event_loop_wake_owner(dsc, wlh, old_state, new_state);
}
if (unlikely(waiter_wlh == DISPATCH_WLH_ANON)) {
//走这里
_dispatch_waiter_wake_wlh_anon(dsc);
}
}
调用_dispatch_waiter_wake_wlh_anon
:
static void
_dispatch_waiter_wake_wlh_anon(dispatch_sync_context_t dsc)
{
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_wqthread_override_start(dsc->dsc_waiter,
dsc->dsc_override_qos);
}
//执行
_dispatch_thread_event_signal(&dsc->dsc_event);
}
其中是对线程发送信号。
对于_dispatch_root_queue_wakeup
而言:
void
_dispatch_root_queue_wakeup(dispatch_queue_global_t dq,
DISPATCH_UNUSED dispatch_qos_t qos, dispatch_wakeup_flags_t flags)
{
if (!(flags & DISPATCH_WAKEUP_BLOCK_WAIT)) {
DISPATCH_INTERNAL_CRASH(dq->dq_priority,
"Don't try to wake up or override a root queue");
}
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
}
内部没有对barrier
的处理,所以全局队列栅栏函数无效。
因为全局队列不仅有你的任务,还有其它系统的任务。如果加barrier
不仅影响你自己的任务还会影响系统的任务。对于全局队列而言栅栏函数就是个普通的异步函数。
整个流程如下:
1.2.2 dispatch_barrier_async
dispatch_barrier_async
源码如下:
void
dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc_flags);
}
调用的是_dispatch_continuation_async
:
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
调用了dx_push
,对应的自定义队列是_dispatch_lane_concurrent_push
。全局队列是_dispatch_root_queue_push
。
_dispatch_lane_concurrent_push
:
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
断点跟踪走的是_dispatch_lane_push
:
DISPATCH_NOINLINE
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
dispatch_wakeup_flags_t flags = 0;
struct dispatch_object_s *prev;
if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}
dispatch_assert(!_dispatch_object_is_global(dq));
qos = _dispatch_queue_push_qos(dq, qos);
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
if (unlikely(os_mpsc_push_was_empty(prev))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2;
}
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
if (flags) {
//栅栏函数走这里。
//#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
//dx_wakeup 对应 dq_wakeup 自定义全局队列对应 _dispatch_lane_wakeup,全局队列对应 _dispatch_root_queue_wakeup
return dx_wakeup(dq, qos, flags);
}
}
栅栏函数走_dispatch_lane_wakeup
逻辑:
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
//走这里
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
继续断点走_dispatch_queue_wakeup
逻辑:
void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
......
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
......
//loop _dispatch_lane_wakeup //_dq_state_merge_qos
return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
}
if (target) {
......
#if HAVE_PTHREAD_WORKQUEUE_QOS
} else if (qos) {
......
if (likely((old_state ^ new_state) & enqueue)) {
...... //_dispatch_queue_push_queue断点断不住,断它内部断点
return _dispatch_queue_push_queue(tq, dq, new_state);
}
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
if (_dq_state_should_override(new_state)) {
return _dispatch_queue_wakeup_with_override(dq, new_state,
flags);
}
}
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
done:
if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
return _dispatch_release_2_tailcall(dq);
}
}
这里断点走了_dispatch_queue_push_queue
逻辑(_dispatch_queue_push_queue
本身断不住,断它内部断点):
static inline void
_dispatch_queue_push_queue(dispatch_queue_t tq, dispatch_queue_class_t dq,
uint64_t dq_state)
{
#if DISPATCH_USE_KEVENT_WORKLOOP
if (likely(_dq_state_is_base_wlh(dq_state))) {
_dispatch_trace_runtime_event(worker_request, dq._dq, 1);
return _dispatch_event_loop_poke((dispatch_wlh_t)dq._dq, dq_state,
DISPATCH_EVENT_LOOP_CONSUME_2);
}
#endif // DISPATCH_USE_KEVENT_WORKLOOP
_dispatch_trace_item_push(tq, dq);
//_dispatch_lane_concurrent_push
return dx_push(tq, dq, _dq_state_max_qos(dq_state));
}
内部走的是_dispatch_lane_concurrent_push
逻辑,这里又继续走了_dispatch_lane_push
的逻辑了,在这里就造成了循环等待。当队列中任务执行完毕后_dispatch_lane_wakeup
中就走_dispatch_lane_barrier_complete
逻辑了。
可以通过
barrier
前面的任务加延迟去验证。直接断点_dispatch_lane_barrier_complete
,当前面的任务执行完毕后就进入_dispatch_lane_barrier_complete
断点了。
_dispatch_lane_barrier_complete
源码如下:
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
......
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
走了_dispatch_lane_class_barrier_complete
逻辑:
static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
uint64_t owned)
{
......
again:
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
......
} else if (unlikely(_dq_state_is_dirty(old_state))) {
......
flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
//自定义并行队列 _dispatch_lane_wakeup
return dx_wakeup(dq, qos, flags);
});
} else {
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
}
});
......
}
调用走的是_dispatch_lane_wakeup
:
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
//barrier完成了就走这里的逻辑,barrier之前的任务执行完毕。
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
//走这里
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
这个时候就又走到了_dispatch_lane_barrier_complete
。
DISPATCH_WAKEUP_BARRIER_COMPLETE
状态是在_dispatch_lane_resume
中进行变更的:
_dispatch_root_queue_push
内部并没有对barrier
的处理,与全局队列逻辑一致。所以barrier
函数传递全局队列无效。
整个过程如下:
二、信号量(dispatch_semaphore_t
)
相关函数:
-
dispatch_semaphore_create
:创建信号量 -
dispatch_semaphore_wait
:信号量等待 -
dispatch_semaphore_signal
:信号量释放
信号量有两个效果:同步作为锁 与 控制GCD
最大并发数。
二元信号量是最简单的一种锁,只有两种状态:占用与非占用。适合只能被唯一一个线程独占访问资源。当二元信号量处于非占用状态时,第一个试图获取该二元信号量的线程会获得该锁,并将二元信号置为占用状态,此后其他的所有视图获取该二元信号量的线程将会等待,直到该锁被释放。
对于允许多个线程并发访问的资源,多元信号量简称信号量,它是一个很好的选择。一个初始值为 N
的信号量允许 N
个线程并发访问。线程访问资源的时候首先获取信号量,进行如下操作:
- 将信号量的值减
1
。 - 如果信号量的值小于
0
,则进入等待状态,否则继续执行。
访问完资源之后,线程释放信号量,进行如下操作:
- 将信号量的值
+1
。 - 如果信号量的值
< 1
,唤醒一个等待中的线程。
2.1 应用
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
dispatch_queue_t queue1 = dispatch_queue_create("HotpotCat", NULL);
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"1 start");
NSLog(@"1 end");
dispatch_semaphore_signal(sem);
});
dispatch_async(queue1, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"2 start");
NSLog(@"2 end");
dispatch_semaphore_signal(sem);
});
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"3 start");
NSLog(@"3 end");
dispatch_semaphore_signal(sem);
});
dispatch_async(queue1, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"4 start");
NSLog(@"4 end");
dispatch_semaphore_signal(sem);
});
对于上面的例子输出:
1 start
1 end
2 start
2 end
3 start
3 end
4 start
4 end
这个时候信号量初始化的是1
,全局队列与自定义串行队列中的任务按顺序依次执行。
当将信号量改为2
后输出:
1 start
2 start
2 end
1 end
3 start
4 start
3 end
4 end
这个时候1、2
先执行无序,3、4
后执行无序。这样就控制了GCD
任务的最大并发数。
修改代码如下:
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(0);
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"1 start");
NSLog(@"1 end");
});
dispatch_async(queue, ^{
sleep(2);
NSLog(@"2 start");
NSLog(@"2 end");
dispatch_semaphore_signal(sem);
});
信号量初始值修改为0
,在任务1
中wait
,在任务2
中signal
,这个时候输出如下:
2 start
2 end
1 start
1 end
任务2
比任务1
先执行了。由于信号量初始化为0
,wait
函数后面任务就执行不了一直等待;等到signal
执行后发送信号wait
就可以执行了。这样就达到了控制流程。任务2
中的信号控制了任务1
的执行。
2.2 源码分析
2.2.1 dispatch_semaphore_create
/*
* @param dsema
* The semaphore. The result of passing NULL in this parameter is undefined.
*/
dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) { //>=0 才有用,否则直接返回
return DISPATCH_BAD_INPUT;// 0
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
- 当
value < 0
的时候无效,只有>= 0
才有效,才会执行后续流程。
2.2.2 dispatch_semaphore_wait
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
//--
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) { //>=0 返回
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
-
--
后value
大于等于0
直接返回0
。执行dispatch_semaphore_wait
后续的代码。 - 否则执行
_dispatch_semaphore_wait_slow
(相当于do-while
循环)。
_dispatch_semaphore_wait_slow
当信号量为0
的时候调用wait
后(< 0
)就走_dispatch_semaphore_wait_slow
逻辑了:
DISPATCH_NOINLINE
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
//超时直接break
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
//NOW的情况下进行超时处理
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
//FOREVER则进入wait逻辑。
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
- 当值为
timeout
的时候直接break
。 - 当值为
DISPATCH_TIME_NOW
的时候循环调用_DSEMA4_TIMEOUT()
。
#define _DSEMA4_TIMEOUT() KERN_OPERATION_TIMED_OUT
- 当值为
DISPATCH_TIME_FOREVER
的时候调用_dispatch_sema4_wait
。
_dispatch_sema4_wait
// void
// _dispatch_sema4_wait(_dispatch_sema4_t *sema)
// {
// int ret = 0;
// do {
// ret = sem_wait(sema);
// } while (ret == -1 && errno == EINTR);
// DISPATCH_SEMAPHORE_VERIFY_RET(ret);
// }
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
kern_return_t kr;
do {
kr = semaphore_wait(*sema);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
-
semaphore_wait
并没有搜到实现,这是pthread
内核封装的实现。 -
_dispatch_sema4_wait
本质上是一个do-while
循环,相当于在这里直接卡住执行不到后面的逻辑了。相当于:
dispatch_async(queue, ^{
// dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
do {
//循环
} while (signal <= 0);
NSLog(@"1 start");
NSLog(@"1 end");
});
结论:当value >= 0
的时候执行后续的代码,否则do-while
循环卡住后续逻辑。
2.2.3 dispatch_semaphore_signal
/*!
* @function dispatch_semaphore_signal
*
* @abstract
* Signal (increment) a semaphore.
*
* @discussion
* Increment the counting semaphore. If the previous value was less than zero,
* this function wakes a waiting thread before returning.
*
* @param dsema The counting semaphore.
* The result of passing NULL in this parameter is undefined.
*
* @result
* This function returns non-zero if a thread is woken. Otherwise, zero is
* returned.
*/
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
//++操作
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
//++ 后还 < 0,则表示做wait操作(--)过多。报错。
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
//发送信号量逻辑,恢复wait等待的操作。
return _dispatch_semaphore_signal_slow(dsema);
}
-
os_atomic_inc2o
执行++
后值大于0
直接返回能够执行。 - 只有
<= 0
的时候才执行后续流程,调用_dispatch_semaphore_signal_slow
进行异常处理。 - 注释说明了当值
< 0
的时候在return
之前唤醒一个等待线程。
_dispatch_semaphore_signal_slow
intptr_t
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
直接调用_dispatch_sema4_signal
。
_dispatch_sema4_signal
#define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \
DISPATCH_VERIFY_MIG(x); \
if (unlikely((x) == KERN_INVALID_NAME)) { \
DISPATCH_CLIENT_CRASH((x), \
"Use-after-free of dispatch_semaphore_t or dispatch_group_t"); \
} else if (unlikely(x)) { \
DISPATCH_INTERNAL_CRASH((x), "mach semaphore API failure"); \
} \
} while (0)
//经过调试走的是这个逻辑
void
_dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
{
do {
kern_return_t kr = semaphore_signal(*sema);//+1
DISPATCH_SEMAPHORE_VERIFY_KR(kr);// == -1 报错
} while (--count);//do-while(0) 只执行一次
}
相当于内部做了+1
操作。这也是当信号量初始值为0
的时候dispatch_semaphore_signal
执行完毕后dispatch_semaphore_wait
能够执行的原因。
小结:
-
dispatch_semaphore_wait
进行--
操作,减完是负值进入do-while
循环,阻塞后续流程。 -
dispatch_semaphore_signal
进行++
操作,加完值不大于0
进入后续报错流程。 -
semaphore_signal
与semaphore_wait
才是信号量能控制最大并发数的根本原因,否则dispatch_semaphore_signal
与dispatch_semaphore_signal
都是判断后直接返回,相当于什么都没做。
三、调度组
最直接的作用: 控制任务执行顺序。
相关API
:
-
dispatch_group_create
创建组 -
dispatch_group_async
进组任务 (与dispatch_group_enter
和dispatch_group_leave
搭配使用效果相同)-
dispatch_group_enter
进组 -
dispatch_group_leave
出组
-
-
dispatch_group_notify
进组任务执行完毕通知 -
dispatch_group_wait
进组任务执行等待时间
3.1 应用
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_async(group, queue, ^{
sleep(3);
NSLog(@"1");
});
dispatch_group_async(group, queue1, ^{
sleep(2);
NSLog(@"2");
});
dispatch_group_async(group, queue1, ^{
sleep(1);
NSLog(@"3");
});
dispatch_group_async(group, queue, ^{
NSLog(@"4");
});
dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"5");
});
有如上案例,任务5
永远在任务1、2、3、4
之后执行。
当然也可以使用enter
与leave
配合dispatch_async
使用:
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
//先 enter 再 leave
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(3);
NSLog(@"1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue1, ^{
sleep(2);
NSLog(@"2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue1, ^{
sleep(1);
NSLog(@"3");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"4");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"5");
});
效果相同,需要注意的是dispatch_group_enter
要比dispatch_group_leave
先调用,并且必须成对出现,否则会崩溃。当然两种形式也可以混着用。
3.2 源码分析
根据上面的分析有3
个问题:
- 1.
dispatch_group_enter
为什么要比dispatch_group_leave
先调用,否则崩溃? - 2.能够实现同步的原理是什么?
- 3.
dispatch_group_async
为什么等价于dispatch_group_enter + dispatch_group_leave
?
之前的版本调度组是封装了信号量,目前新版本的是调度组自己写了一套逻辑。
3.2.1 dispatch_group_create
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
//creat & enter 写在一起的写法,信号量标记位1
dispatch_group_t
_dispatch_group_create_and_enter(void)
{
return _dispatch_group_create_with_count(1);
}
是对_dispatch_group_create_with_count
的调用:
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
调用_dispatch_object_alloc
创建group
,与信号量写法相似
3.2.2 dispatch_group_enter
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
//0-- -> -1,与信号量不同的是没有wait
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
-
0--
变为-1
,与信号量不同的是没有wait
操作。
3.2.3 dispatch_group_leave
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
//-1++ -> 0
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
//#define DISPATCH_GROUP_VALUE_MASK 0x00000000fffffffcULL
// old_state & DISPATCH_GROUP_VALUE_MASK 是一个很大的值
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
//-1 & DISPATCH_GROUP_VALUE_MASK == DISPATCH_GROUP_VALUE_1,old_value = -1
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {//old_value == -1
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
//调用 _dispatch_group_wake,唤醒 dispatch_group_notify
return _dispatch_group_wake(dg, old_state, true);
}
//old_value 为0的情况下直接报错,也就是先leave的情况下直接报错
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
-
-1++
变为0
,当old_value == -1
的时候调用_dispatch_group_wake
唤醒dispatch_group_notify
。 - 既然
old_value == -1
的时候才唤醒,那么多次enter
只有最后一次leave
的时候才能唤醒。 - 当
old_value == 0
的时候直接报错,这也就是为什么先调用leave
直接发生了crash
。
3.2.4 dispatch_group_notify
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
_dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
_dispatch_group_notify(dg, dq, dsn);
}
调用_dispatch_group_notify
:
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {//循环判断 old_state == 0 的时候 wake
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
- 当
old_state == 0
的时候调用_dispatch_group_wake
,也就是调用block
的callout
。与leave
调用了同一个方法。
为什么两个地方都调用了?
因为在leave
的时候dispatch_group_notify
可能已经执行过了,任务已经保存在了group
中,leave
的时候本身尝试调用一次。
当然leave
中也可能是一个延时任务,当调用leave
的时候notify
可能还没有执行,就导致notify
任务还不存在。所以需要在notify
中也调用。
_dispatch_group_wake
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
//异步回调,执行block callout
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
- 调用
_dispatch_continuation_async
相当于调用的是dispatch_async
执行notify
的任务。 - 任务先保存在在
group
中,唤醒notify
的时候才将任务加入队列。
3.2.5 dispatch_group_async
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
//标记 DC_FLAG_GROUP_ASYNC
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
调用_dispatch_continuation_group_async
:
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
//调用enter
dispatch_group_enter(dg);
dc->dc_data = dg;
//dispatch_async
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
- 内部先调用
dispatch_group_enter
,在这里就等待wakeup
的调用了 - 再调用
_dispatch_continuation_async
,相当于dispatch_async
。
那么leave
在什么时候调用呢?
肯定要在callout
执行完毕后调用。_dispatch_continuation_async
的调用以全局队列为例调用_dispatch_root_queue_push
,最终会调用到_dispatch_continuation_invoke_inline
:
在这里就进行了逻辑区分,有
group
的情况下(dispatch_group_async
的时候dc_flags
进行了标记)调用的是_dispatch_continuation_with_group_invoke
:
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
//callout
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
//leave
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
- 在
callout
后调用了dispatch_group_leave
。
dispatch_group_async 底层是对 dispatch_group_enter + dispatch_group_leave 的封装
-
在
dispatch_group_async
中先进行dispatch_group_enter
,然后执行dispatch_async
。 -
在回调中先
_dispatch_client_callout
然后dispatch_group_leave
。
四、Dispatch Source
在任一线程上调用它的的一个函数 dispatch_source_merge_data
后,会执行 dispatch source
事先定义好的句柄(可以把句柄简单理解为一个 block
) 这个过程叫 用户事件(Custom event
)。是 dispatch source
支持处理的一种事件。
句柄是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有很密切的关系。比如:实例句柄(HINSTANCE
),位图句柄(HBITMAP
),设备表述句柄(HDC
),图标句柄(HICON
)等。这当中还有一个通用的句柄,就是HANDLE
。
Dispatch Source
有两点:
-
CPU
负荷非常小,尽量不占用资源 。 - 联结的优势。
-
dispatch source
不受runloop
的影响,底层封装的是pthread
。
相关API
:
-
dispatch_source_create
创建源 -
dispatch_source_set_event_handler
设置源事件回调 -
dispatch_source_merge_data
源事件设置数据 -
dispatch_source_get_data
获取源事件数据 -
dispatch_resume
继续 -
dispatch_suspend
挂起
4.1 应用
dispatch_source_t
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
uintptr_t mask,
dispatch_queue_t _Nullable queue);
-
type
:dispatch
源可处理的事件。比如:DISPATCH_SOURCE_TYPE_TIMER
、DISPATCH_SOURCE_TYPE_DATA_ADD
。-
DISPATCH_SOURCE_TYPE_DATA_ADD
: 将所有触发结果相加,最后统一执行响应。间隔的时间越长,则每次触发都会响应;如果间隔的时间很短,则会将触发后的结果相加后统一触发。也就是利用CPU
空闲时间进行回调。
-
-
handle
:可以理解为句柄、索引或id
,如果要监听进程,需要传入进程的ID
。 -
mask
:可以理解为描述,具体要监听什么。 -
queue
:处理handle
的队列。
有如下一个进度条的案例:
self.completed = 0;
self.queue = dispatch_queue_create("HotpotCat", NULL);
self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
//设置句柄
dispatch_source_set_event_handler(self.source, ^{
NSLog(@"%@",[NSThread currentThread]);
NSUInteger value = dispatch_source_get_data(self.source);
self.completed += value;
double progress = self.completed / 100.0;
NSLog(@"progress: %.2f",progress);
self.progressView.progress = progress;
});
self.isRunning = YES;
//创建后默认是挂起状态
dispatch_resume(self.source);
创建了一个ADD
类型的source
,在handle
获取进度增量并更新进度条。由于创建后source
处于挂起状态,需要先恢复。
可以在按钮的点击事件中进行任务的挂起和恢复:
if (self.isRunning) {
dispatch_suspend(self.source);
dispatch_suspend(self.queue);
NSLog(@"pause");
self.isRunning = NO;
[sender setTitle:@"pause" forState:UIControlStateNormal];
} else {
dispatch_resume(self.source);
dispatch_resume(self.queue);
NSLog(@"running");
self.isRunning = YES;
[sender setTitle:@"running" forState:UIControlStateNormal];
}
任务的执行是一个简单的循环:
for (NSInteger i = 0; i < 100; i++) {
dispatch_async(self.queue, ^{
NSLog(@"merge");
//加不加 sleep 影响 handler 的执行次数。
sleep(1);
dispatch_source_merge_data(self.source, 1);//+1
});
}
- 在循环中调用
dispatch_source_merge_data
触发回调。当queue
挂起后后续任务就不再执行了。 - 在不加
sleep
的情况下handler
的回调是小于100
次的,任务会被合并。
4.2 源码解析
4.2.1 dispatch_source_create
dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
uintptr_t mask, dispatch_queue_t dq)
{
dispatch_source_refs_t dr;
dispatch_source_t ds;
//add对应 _dispatch_source_data_create timer对应 _dispatch_source_timer_create
dr = dux_create(dst, handle, mask)._dr;
if (unlikely(!dr)) {
return DISPATCH_BAD_INPUT;
}
//创建队列
ds = _dispatch_queue_alloc(source,
dux_type(dr)->dst_strict ? DSF_STRICT : DQF_MUTABLE, 1,
DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER)._ds;
ds->dq_label = "source";
ds->ds_refs = dr;
dr->du_owner_wref = _dispatch_ptr2wref(ds);
//没有传队列,获取root_queues
if (unlikely(!dq)) {
dq = _dispatch_get_default_queue(true);
} else {
_dispatch_retain((dispatch_queue_t _Nonnull)dq);
}
//目标队列为传进来的dq
ds->do_targetq = dq;
//是timer 并且设置了interval则调用dispatch_source_set_timer进行设置
//也就是说type为timer的时候即使不设置timer也会默认设置。这里时间间隔设置为了handle
if (dr->du_is_timer && (dr->du_timer_flags & DISPATCH_TIMER_INTERVAL)) {
dispatch_source_set_timer(ds, DISPATCH_TIME_NOW, handle, UINT64_MAX);
}
_dispatch_object_debug(ds, "%s", __func__);
//返回自己创建的source,source本身也是队列。
return ds;
}
- 根据
type
创建对应的队列。add
对应_dispatch_source_data_create
,timer
对应_dispatch_source_timer_create
。 - 如果创建的时候没有传处理
handle
的队列,会默认获取root_queues
中的队列。 - 设置目标队列为传进来的队列。
- 如果
type
为DISPATCH_SOURCE_TYPE_INTERVAL
(应该是私有的)则主动调用一次dispatch_source_set_timer
。 - 返回自己创建的
source
,source
本身也是队列。
_dispatch_source_data_create
static dispatch_unote_t
_dispatch_source_data_create(dispatch_source_type_t dst, uintptr_t handle,
uintptr_t mask)
{
if (handle || mask) {
return DISPATCH_UNOTE_NULL;
}
// bypass _dispatch_unote_create() because this is always "direct"
// even when EV_UDATA_SPECIFIC is 0
dispatch_unote_class_t du = _dispatch_calloc(1u, dst->dst_size);
du->du_type = dst;
du->du_filter = dst->dst_filter;
du->du_is_direct = true;
return (dispatch_unote_t){ ._du = du };
}
直接调用_dispatch_calloc
创建返回。
_dispatch_source_timer_create
static dispatch_unote_t
_dispatch_source_timer_create(dispatch_source_type_t dst,
uintptr_t handle, uintptr_t mask)
{
dispatch_timer_source_refs_t dt;
......
//创建
dt = _dispatch_calloc(1u, dst->dst_size);
dt->du_type = dst;
dt->du_filter = dst->dst_filter;
dt->du_is_timer = true;
dt->du_timer_flags |= (uint8_t)(mask | dst->dst_timer_flags);
dt->du_ident = _dispatch_timer_unote_idx(dt);
dt->dt_timer.target = UINT64_MAX;
dt->dt_timer.deadline = UINT64_MAX;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
return (dispatch_unote_t){ ._dt = dt };
}
内部时间给的默认值是最大值。
4.2.2 dispatch_source_set_event_handler
void
dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true);
}
调用_dispatch_source_set_handler
传递的类型为DS_EVENT_HANDLER
。
DISPATCH_NOINLINE
static void
_dispatch_source_set_handler(dispatch_source_t ds, void *func,
uintptr_t kind, bool is_block)
{
dispatch_continuation_t dc;
//创建dc存储handler
dc = _dispatch_source_handler_alloc(ds, func, kind, is_block);
//挂起
if (_dispatch_lane_try_inactive_suspend(ds)) {
//替换
_dispatch_source_handler_replace(ds, kind, dc);
//恢复
return _dispatch_lane_resume(ds, DISPATCH_RESUME);
}
......
}
- 创建
_dispatch_source_handler_alloc
存储handler
,内部会进行标记非DS_EVENT_HANDLER
会标记为DC_FLAG_CONSUME
。 -
_dispatch_lane_try_inactive_suspend
挂起队列。 -
_dispatch_source_handler_replace
替换handler
。
static inline void
_dispatch_source_handler_replace(dispatch_source_t ds, uintptr_t kind,
dispatch_continuation_t dc)
{
//handler目标回调为空释放handler
if (!dc->dc_func) {
_dispatch_continuation_free(dc);
dc = NULL;
} else if (dc->dc_flags & DC_FLAG_FETCH_CONTEXT) {
dc->dc_ctxt = ds->do_ctxt;
}
//保存
dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
if (dc) _dispatch_source_handler_dispose(dc);
}
-
_dispatch_lane_resume
恢复队列,调用队列对应的awake
。
先调用_dispatch_lane_resume_activate
(这也就是set
后立马调用的原因):
static void
_dispatch_lane_resume_activate(dispatch_lane_t dq)
{
if (dx_vtable(dq)->dq_activate) {
dx_vtable(dq)->dq_activate(dq);
}
_dispatch_lane_resume(dq, DISPATCH_ACTIVATION_DONE);
}
再调用_dispatch_lane_resume
。
4.2.3 dispatch_source_merge_data
void
dispatch_source_merge_data(dispatch_source_t ds, uintptr_t val)
{
dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds);
dispatch_source_refs_t dr = ds->ds_refs;
if (unlikely(dqf & (DSF_CANCELED | DQF_RELEASED))) {
return;
}
//根据类型存值
switch (dr->du_filter) {
case DISPATCH_EVFILT_CUSTOM_ADD:
//有累加
os_atomic_add2o(dr, ds_pending_data, val, relaxed);
break;
case DISPATCH_EVFILT_CUSTOM_OR:
os_atomic_or2o(dr, ds_pending_data, val, relaxed);
break;
case DISPATCH_EVFILT_CUSTOM_REPLACE:
os_atomic_store2o(dr, ds_pending_data, val, relaxed);
break;
default:
DISPATCH_CLIENT_CRASH(dr->du_filter, "Invalid source type");
}
//唤醒执行回调
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}
- 根据类型对值进行处理,处理完之后唤醒队列执行。
对于主线程会执行_dispatch_main_queue_wakeup
,其中会取到dispatch_queue
获取到dc
,最后进行handler
的调用。
4.2.4 dispatch_source_get_data
uintptr_t
dispatch_source_get_data(dispatch_source_t ds)
{
dispatch_source_refs_t dr = ds->ds_refs;
#if DISPATCH_USE_MEMORYSTATUS
if (dr->du_vmpressure_override) {
return NOTE_VM_PRESSURE;
}
#if TARGET_OS_SIMULATOR
if (dr->du_memorypressure_override) {
return NOTE_MEMORYSTATUS_PRESSURE_WARN;
}
#endif
#endif // DISPATCH_USE_MEMORYSTATUS
//获取数据
uint64_t value = os_atomic_load2o(dr, ds_data, relaxed);
return (unsigned long)(dr->du_has_extended_status ?
DISPATCH_SOURCE_GET_DATA(value) : value);
}
与merge_data
相反,一个存一个取。
4.2.5 dispatch_resume
void
dispatch_resume(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
if (unlikely(_dispatch_object_is_global(dou) ||
_dispatch_object_is_root_or_base_queue(dou))) {
return;
}
if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
_dispatch_lane_resume(dou._dl, DISPATCH_RESUME);
}
}
经过调试走的是_dispatch_lane_resume
逻辑,与_dispatch_source_set_handler
中调用的一致。awake
队列。
4.2.6 dispatch_suspend
void
dispatch_suspend(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_suspend, dou);
if (unlikely(_dispatch_object_is_global(dou) ||
_dispatch_object_is_root_or_base_queue(dou))) {
return;
}
if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
return _dispatch_lane_suspend(dou._dl);
}
}
调用_dispatch_lane_suspend
挂起队列。
4.2.7 dispatch_source_cancel
dispatch_source_cancel(dispatch_source_t ds)
{
_dispatch_object_debug(ds, "%s", __func__);
_dispatch_retain_2(ds);
if (_dispatch_queue_atomic_flags_set_orig(ds, DSF_CANCELED) & DSF_CANCELED){
_dispatch_release_2_tailcall(ds);
} else {
//_dispatch_workloop_wakeup
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
}
}
- 调用
_dispatch_workloop_wakeup
:
-
cancel
内部会对状态进行判断,如果是挂起状态会报错。所以需要在运行状态下取消。 - 调用
_dispatch_release_2_tailcall
进行释放操作。
4.2.8 dispatch_source_set_timer
void
dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
uint64_t interval, uint64_t leeway)
{
dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
dispatch_timer_config_t dtc;
if (unlikely(!dt->du_is_timer)) {
DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
}
//根据type配置timer和interval
if (dt->du_timer_flags & DISPATCH_TIMER_INTERVAL) {
dtc = _dispatch_interval_config_create(start, interval, leeway, dt);
} else {
dtc = _dispatch_timer_config_create(start, interval, leeway, dt);
}
if (_dispatch_timer_flags_to_clock(dt->du_timer_flags) != dtc->dtc_clock &&
dt->du_filter == DISPATCH_EVFILT_TIMER_WITH_CLOCK) {
DISPATCH_CLIENT_CRASH(0, "Attempting to modify timer clock");
}
//跟踪配置
_dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
if (dtc) free(dtc);
//唤醒
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}
4.2.9 dispatch_source_set_registration_handler
void
dispatch_source_set_registration_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_REGISTN_HANDLER, true);
}
也是直接调用的_dispatch_source_set_handler
,参数是DS_REGISTN_HANDLER
。
4.2.10 dispatch_source_set_cancel_handler
void
dispatch_source_set_cancel_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_CANCEL_HANDLER, true);
}
- 直接调用的
_dispatch_source_set_handler
,参数是DS_CANCEL_HANDLER
。 - 会根据
DS_REGISTN_HANDLER、DS_CANCEL_HANDLER、DS_EVENT_HANDLER
进行handler
的获取和释放,因为这三者可能同时存在。
那么就有个问题设置timer
类型后我们没有主动调用dispatch_source_merge_data
,那么它是在什么时机调用的呢?在回调中bt
:
frame #2: 0x000000010b6a29c8 libdispatch.dylib`_dispatch_client_callout + 8
frame #3: 0x000000010b6a5316 libdispatch.dylib`_dispatch_continuation_pop + 557
frame #4: 0x000000010b6b8e8b libdispatch.dylib`_dispatch_source_invoke + 2205
frame #5: 0x000000010b6b4508 libdispatch.dylib`_dispatch_root_queue_drain + 351
frame #6: 0x000000010b6b4e6d libdispatch.dylib`_dispatch_worker_thread2 + 135
frame #7: 0x00007fff611639f7 libsystem_pthread.dylib`_pthread_wqthread + 220
frame #8: 0x00007fff61162b77 libsystem_pthread.dylib`start_wqthread + 15
搜索_dispatch_source_invoke
只找到了:
DISPATCH_VTABLE_INSTANCE(source,
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_dispose = _dispatch_source_dispose,
.do_debug = _dispatch_source_debug,
.do_invoke = _dispatch_source_invoke,
.dq_activate = _dispatch_source_activate,
.dq_wakeup = _dispatch_source_wakeup,
.dq_push = _dispatch_lane_push,
);
也就是调用的source
的do_invoke
,调用逻辑为_dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> dx_invoke
:
void
_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags)
{
_dispatch_queue_class_invoke(ds, dic, flags,
DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
#if DISPATCH_EVENT_BACKEND_KEVENT
if (flags & DISPATCH_INVOKE_WORKLOOP_DRAIN) {
dispatch_workloop_t dwl = (dispatch_workloop_t)_dispatch_get_wlh();
dispatch_timer_heap_t dth = dwl->dwl_timer_heap;
if (dth && dth[0].dth_dirty_bits) {
//调用
_dispatch_event_loop_drain_timers(dwl->dwl_timer_heap,
DISPATCH_TIMER_WLH_COUNT);
}
}
#endif // DISPATCH_EVENT_BACKEND_KEVENT
}
4.3 Dispatch Source 封装 Timer
目标是封装一个类似NSTimer
的工具。
void
dispatch_source_set_timer(dispatch_source_t source,
dispatch_time_t start,
uint64_t interval,
uint64_t leeway);
-
source
:事件源。 -
start
:控制计时器第一次触发的时刻。- 参数类型是
dispatch_time_t
(opaque
类型),不能直接操作它。需要dispatch_time
和dispatch_walltime
函数来创建。 - 常量
DISPATCH_TIME_NOW
和DISPATCH_TIME_FOREVER
很常用。 - 当使用
dispatch_time
或者DISPATCH_TIME_NOW
时,系统会使用默认时钟来进行计时。然而当系统休眠的时候,默认时钟是不走的,也就会导致计时器停止。使用dispatch_walltime
可以让计时器按照真实时间间隔进行计时。
- 参数类型是
-
interval
:回调间隔时间。 -
leeway
:计时器触发的精准程度,就算指定为0系统也无法保证完全精确的触发时间,只是会尽可能满足这个需求。
首先实现一个最简单的封装:
- (instancetype)initTimerWithTimeInterval:(NSTimeInterval)interval queue:(dispatch_queue_t)queue leeway:(NSTimeInterval)leeway repeats:(BOOL)repeats handler:(dispatch_block_t)handler {
if (self == [super init]) {
self.timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue);
dispatch_source_set_timer(self.timer, dispatch_walltime(NULL, 0), interval * NSEC_PER_SEC, leeway * NSEC_PER_SEC);
//解决与handler互相持有
__weak typeof(self) weakSelf = self;
//事件回调,这个函数在执行完之后 block 会立马执行一遍。后面隔一定时间间隔再执行一次。
dispatch_source_set_event_handler(self.timer, ^{
if (handler) {
handler();
}
if (!repeats) {
//repeats 为 NO 执行一次后取消
[weakSelf cancel];
}
});
}
return self;
}
这样就满足了最基本的要求,由于handler
的调用在设置和恢复后会立马调用,所以需要过滤需改handler
实现如下:
//忽略 handler 设置完马上回调
if (weakSelf.isAutoFirstCallback) {
@synchronized(weakSelf) {
weakSelf.isAutoFirstCallback = NO;
}
return;
}
//忽略挂起恢复后的立马回调
if (!weakSelf.resumeCallbackEnable && weakSelf.isResumeCallback) {
@synchronized(weakSelf) {
weakSelf.isResumeCallback = NO;
}
return;
}
if (handler) {
handler();
}
if (!repeats) {
//repeats 为 NO 执行一次后取消
[weakSelf cancel];
}
为了更灵活对注册以及取消source
逻辑也进行暴露:
dispatch_source_set_registration_handler(self.timer, ^{
if (weakSelf.startBlock) {
weakSelf.startBlock();
}
});
//取消回调
dispatch_source_set_cancel_handler(self.timer, ^{
if (weakSelf.cancelBlock) {
weakSelf.cancelBlock();
}
});
由于source
本身提供了挂起和恢复的功能,同样对其封装。并且需要进行释放操作,所以提供cancel
功能:
- (void)start {
//为了与isResumeCallback区分开
@synchronized(self) {
if (!self.isStarted && self.timerStatus == HPTimerSuspend) {
self.isStarted = YES;
self.timerStatus = HPTimerResume;
dispatch_resume(self.timer);
}
}
}
- (void)suspend {
//挂起,挂起的时候不能设置timer为nil
@synchronized(self) {
if (self.timerStatus == HPTimerResume) {
self.timerStatus = HPTimerSuspend;
dispatch_suspend(self.timer);
}
}
}
- (void)resume {
//恢复
@synchronized(self) {
if (self.timerStatus == HPTimerSuspend) {
self.isResumeCallback = YES;
self.timerStatus = HPTimerResume;
dispatch_resume(self.timer);
}
}
}
- (void)cancel {
//取消
@synchronized(self) {
if (self.timerStatus != HPTimerCanceled) {
//先恢复再取消
if (self.timerStatus == HPTimerSuspend) {
[self resume];
}
self.timerStatus = HPTimerCanceled;
dispatch_source_cancel(self.timer);
_timer = nil;
}
}
}
- (void)dealloc {
[self cancel];
}
-
dealloc
中主动进行cancel
调用方可以不必在自己的dealloc
中调用。
这样再暴露一些简单接口就可以直接调用了(调用方需要持有timer
):
self.timer = [HPTimer scheduledTimerWithTimeInterval:3 handler:^{
NSLog(@"timer 回调");
}];
完整代码详见 HPTimer
具体的更多介绍和用法可以参考:
iOS多线程——Dispatch Source
timer 注意事项
五、延迟函数(dispatch_after)
void
dispatch_after(dispatch_time_t when, dispatch_queue_t queue,
dispatch_block_t work)
{
_dispatch_after(when, queue, NULL, work, true);
}
直接调用_dispatch_after
:
static inline void
_dispatch_after(dispatch_time_t when, dispatch_queue_t dq,
void *ctxt, void *handler, bool block)
{
dispatch_timer_source_refs_t dt;
dispatch_source_t ds;
uint64_t leeway, delta;
//FOREVER 直接返回什么也不做
if (when == DISPATCH_TIME_FOREVER) {
#if DISPATCH_DEBUG
DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity");
#endif
return;
}
delta = _dispatch_timeout(when);
if (delta == 0) {
if (block) {
//时间为0直接执行handler
return dispatch_async(dq, handler);
}
return dispatch_async_f(dq, ctxt, handler);
}
//精度 = 间隔 / 10
leeway = delta / 10; // <rdar://problem/13447496>
//<1 毫秒 的时候设置最小值为1毫秒
if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC;
//大于60s的时候设置为60s,也就是 1ms <= leeway <= 1min
if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC;
// this function can and should be optimized to not use a dispatch source
//创建 type 为 after 的 source
ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, dq);
dt = ds->ds_timer_refs;
dispatch_continuation_t dc = _dispatch_continuation_alloc();
if (block) {
//包装handler
_dispatch_continuation_init(dc, dq, handler, 0, 0);
} else {
_dispatch_continuation_init_f(dc, dq, ctxt, handler, 0, 0);
}
// reference `ds` so that it doesn't show up as a leak
dc->dc_data = ds;
_dispatch_trace_item_push(dq, dc);
//存储handler
os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed);
dispatch_clock_t clock;
uint64_t target;
_dispatch_time_to_clock_and_value(when, false, &clock, &target);
if (clock != DISPATCH_CLOCK_WALL) {
leeway = _dispatch_time_nano2mach(leeway);
}
dt->du_timer_flags |= _dispatch_timer_flags_from_clock(clock);
dt->dt_timer.target = target;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_timer.deadline = target + leeway;
dispatch_activate(ds);
}
- 延时时间设置为
DISPATCH_TIME_FOREVER
直接返回什么也不做。 - 延时时间为
0
直接调用dispatch_async
执行handler
。 - 精度:
1ms <= leeway <= 1min
要在这个范围,否则会修正。 - 创建
_dispatch_source_type_after
类型的source
。 - 包装存储
handler
。 - 调用
_dispatch_time_to_clock_and_value
进行target
设置。
本质上 dispatch_after 也是对 source的封装。
#define NSEC_PER_SEC 1000000000ull 1秒 = 10亿纳秒 #define NSEC_PER_MSEC 1000000ull 1毫秒 = 100万纳秒 #define USEC_PER_SEC 1000000ull 1秒 = 100万微秒 #define NSEC_PER_USEC 1000ull 1微秒 = 1000 纳秒
1s = 1000ms = 100万us = 10亿ns
1ms = 1000us
1us = 1000ns