在本篇文章中,我们主要来探索GCD的两个函数dispatch_once 和 dispatch_sync
,我们主要来探索两个问题:
- 1,
dispatch_once
是如何保证只执行一次的? - 2,
dispatch_sync
在当前串行队列
为目标串行队列
时,为什么会死锁?
dispatch_once
我们通常使用 dispatch_once
来创建单例
,该函数如下
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
NSLog(@"dddd");
});
我们来看下其实现,在这里直接贴码了
void
_dispatch_once_f(dispatch_once_t *predicate, void *_Nullable context,
dispatch_function_t function)
{
if (DISPATCH_EXPECT(*predicate, ~0l) != ~0l) {
dispatch_once_f(predicate, context, function); // libdispatch
} else {
dispatch_compiler_barrier();
}
DISPATCH_COMPILER_CAN_ASSUME(*predicate == ~0l);
}
dispatch_once_f
函数在 libdispatch
库中,我们在libdispatch
库中,我们来看下其实现:
DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
static inline void
_dispatch_once_mark_done_if_quiesced(dispatch_once_gate_t dgo, uintptr_t gen)
{
if (_dispatch_once_generation() - gen >= DISPATCH_ONCE_GEN_SAFE_DELTA) {
/*
* See explanation above, when the quiescing counter approach is taken
* then this store needs only to be relaxed as it is used as a witness
* that the required barriers have happened.
*/
os_atomic_store(&dgo->dgo_once, DLOCK_ONCE_DONE, relaxed);
}
}
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
_dispatch_once_gate_tryenter
: 将 onceToken
与 DLOCK_ONCE_UNLOCKED ((uintptr_t)0)
进行比较,如果onceToken!= DLOCK_ONCE_UNLOCKED
,就加锁
,并返回YES
。
DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
_dispatch_client_callout(ctxt, func); // 1
_dispatch_once_gate_broadcast(l); // 2
}
- 1,
_dispatch_client_callout
:对传入的block
进行调用。 - 2,
_dispatch_once_gate_broadcast
:对外进行广播,已经调用了block。
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
_dispatch_once_mark_done
:将 onceToken的值修改为 DLOCK_ONCE_DONE (~(uintptr_t)0)
。
当下一次调用的时候,v == DLOCK_ONCE_DONE
,直接return
,就不会在次执行block。
如果多个线程同时执行,首先,访问线程会通过_dispatch_lock_value_for_self
方法进行加锁,另外的线程将会进行等待 _dispatch_once_wait
。
dispatch_sync
当我们调用dispatch_sync
函数时,会调用_dispatch_sync_f_inline
函数。
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
对于串行队列
而言,其dq_width == 1
,则会调用 _dispatch_barrier_sync_f
函数。
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self(); \\ 1
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
- 1,先获取当前线程的
pid
。
static inline void
_dispatch_introspection_sync_begin(dispatch_queue_class_t dq)
{
if (!_dispatch_introspection.debug_queue_inversions) return;
_dispatch_introspection_order_record(dq._dq);
}
_dispatch_introspection_order_record
对当前的线程的一些信息进行处理和记录。
_dispatch_sync_function_invoke_inline
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
因为同步不会开辟新线程,所以会根据当前线程id,让该线程的block
执行完毕后,才会执行后面的代码,让该线程挂起。
如何判断当前线程id的线程已经是挂起状态?
我们看下_dispatch_queue_try_acquire_barrier_sync_and_suspend
该函数
DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) |
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
uint64_t old_state, new_state;
return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
if (old_state != (init | role)) {
os_atomic_rmw_loop_give_up(break);
}
new_state = value | role;
});
}
我们通过查看函数调用栈,可以看出,当发生同步死锁
时,调用了_dispatch_sync_f_slow
函数:
DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
当执行到这一步时,当前线程的状态是suspend
。同步函数
会往当前线程
加入同步任务
。 _dispatch_trace_item_push
函数就起到添加任务的作用。紧接着调用__DISPATCH_WAIT_FOR_QUEUE__
函数
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);// 判断当前队列是否是要等待的队列
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
}
DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// 将传入的tid与挂起线程的tid进行异或运算,相同为0
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
_dispatch_lock_is_locked_by
函数将 当前线程的tid
与传入线程的tid
进行异或运算。然后判断是否等于0,在这里,如果两个 tid
值相同,该函数则会返回YES
,这时,系统也不知道该怎么做,就抛出一个异常crash,就是死锁
。
总结:
1,
dispatch_once
:在运行的时候会改变onceToken
的值,在传入的block
执行过后,onceToken
的值会变为DLOCK_ONCE_DONE (~(uintptr_t)0)
。当其他线程在调用该函数时,会直接return
。2,
dispatch_sync
:当dq_width == 1
时,为串行队列,当执行到_dispatch_sync_f_slow
函数时,会通过_dispatch_lock_is_locked_by
函数比对传入线程
与当前线程
的tid
。如果两个tid值一样,则会抛出异常,因为此时系统也不知道该先调度哪个任务。这样就造成了死锁
。