前言
上一篇文章我们介绍了异步函数中任务的包装和调用流程,这篇文章主要介绍异步函数中线程是如何开辟的,同步函数和栅栏函数的底层源码分析。
目录
一、异步函数中线程的开辟
1、异步函数底层分析:dispatch_async
#ifdef __BLOCKS__
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
// 任务包装器 - 保存 block
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
#endif
dispatch_async
方法中主要调用两个方法:
_dispatch_continuation_init
和_dispatch_continuation_async
_dispatch_continuation_init
方法已经在上一篇文章中分析了,这篇文章从_dispatch_continuation_async
方法开始分析
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
dx_push
是通过宏定义的
#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)
根据dx_vtable(x)
的不同dq_push
调用的方法也不同,其中并发队列
对应的方法定义如下:
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,//并发队列调用的dq_push方法
);
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
现在需要查看当前方法执行的是return _dispatch_continuation_redirect_push(dq, dou, qos);
还是_dispatch_lane_push(dq, dou, qos);
添加三个符号断点并关闭断点:
运行代码并打开新添加的三个符号断点:
点击继续运行进入dispatch_async
符号断点:
点击继续运行可以发现进入了_dispatch_continuation_redirect_push
符号断点:
因此_dispatch_lane_concurrent_push
方法中首先执行的是return _dispatch_continuation_redirect_push(dq, dou, qos);
通过相同的方法可以验证后面执行的方法为_dispatch_root_queue_push
2、异步函数底层线程的开辟
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
...
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_poke(dq, n, 0);
}
}
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
...
return _dispatch_root_queue_poke_slow(dq, n, floor);
}
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
int r = ENOSYS;
_dispatch_root_queues_init();
...
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
...
}
从源码中我们可以看到底层是通过pthread
开辟的的线程
在前一篇文章中我们打印出了异步函数任务Block
调用的堆栈信息
3、跨库调用Block
那么底层是在哪里从libsystem_pthread.dylib
库调用到libdispatch.dylib
库的_dispatch_worker_thread2
方法呢?
上面的_dispatch_root_queue_poke
方法中有这样一句代码:
_dispatch_root_queues_init();
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
这里引出了单例的底层实现:
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {//执行了一次就直接返回
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {//获取一把锁
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);//获取不到锁的线程进入等待
}
这里的dispatch_once_f
方法也是dispatch_once
底层的实现
-
_dispatch_once_gate_tryenterr(l)
保证只有一条线程能获取到这把锁,从而保证了多线程下的线程安全
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
-
_dispatch_once_gate_tryenterr(l)
获取的锁和Block中_dispatch_once_gate_broadcast
方法获取的是同一把锁。
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
});
单例执行一次的说明:
dispatch_once
有两个参数:onceToken
和Block
我们主要关注有两点:第一点为什么单例只会执行一次,第二点Block为什么会被调用。因为onceToken
会在底层封装为dispatch_once_gate_t l
变量l
会用来获取底层原子性的关联,并且onceToken
是静态变量保证了唯一性。通过os_atomic_load
方法判断Block执行的次数,如果执行了就直接返回。如果是第一次执行会获取一把锁,保证当前调用Block的唯一,当Block执行后就解除当前的锁。如果执行Block时还未解锁且有一条线程进入dispatch_once_f
方法,那么就会进入等待状态。
Block调用:
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
_dispatch_client_callout(ctxt, func);// 调用Block
_dispatch_once_gate_broadcast(l);//对外广播已经执行了一次
}
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);//将v标记为DLOCK_ONCE_DONE表示已经执行了一次
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
跨库调用的方法绑定还是在_dispatch_root_queues_init();
方法的_dispatch_root_queues_init_once
参数中
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
...
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
}
...
}
二、同步函数、栅栏函数底层分析
1、异步栅栏函数和同步栅栏函数的区别
dispatch_queue_t concurrentQueue = dispatch_queue_create("differ", DISPATCH_QUEUE_CONCURRENT);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
sleep(2);
NANSLog(@"111");
});
/* 2. 异步栅栏函数 */
dispatch_barrier_async(concurrentQueue, ^{
NANSLog(@"%@",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
sleep(2);
NANSLog(@"222");
});
NANSLog(@"333");
333
111
---<NSThread: 0x600001883540>{number = 3, name = (null)}---
222
dispatch_queue_t concurrentQueue = dispatch_queue_create("differ", DISPATCH_QUEUE_CONCURRENT);
/* 1.异步函数 */
dispatch_async(concurrentQueue, ^{
sleep(2);
NANSLog(@"111");
});
/* 2. 同步栅栏函数 */
dispatch_barrier_sync(concurrentQueue, ^{
NANSLog(@"%@",[NSThread currentThread]);
});
/* 3. 异步函数 */
dispatch_async(concurrentQueue, ^{
sleep(2);
NANSLog(@"222");
});
NANSLog(@"333");
111
<NSThread: 0x6000026981c0>{number = 1, name = main}
333
222
区别:
- 异步栅栏函数只会阻塞自定义的队列,不会阻塞外面的队列
- 同步栅栏函数阻塞的是外面的队列,不会阻塞自定义的队列
- 栅栏函数只能使用
自定义的并发队列
2、栅栏函数使用案例
dispatch_queue_t concurrentQueue = dispatch_queue_create("differ", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *imageName = [NSString stringWithFormat:@"%d.jpg", (i % 10)];
NSURL *url = [[NSBundle mainBundle] URLForResource:imageName withExtension:nil];
NSData *data = [NSData dataWithContentsOfURL:url];
UIImage *image = [UIImage imageWithData:data];
[self.mArray addObject:image];
});
}
运行程序会Crash,并报如下错误:
error for object 0x7f8844860000: pointer being freed was not allocated
也就是多线程操作时,同名的变量retain
和release
操作并未成对出现,所以导致数据读取的安全问题。
添加栅栏函数后就能解决问题
dispatch_barrier_async(concurrentQueue , ^{
[self.mArray addObject:image];
});
通过加锁也能解决问题
@synchronized (self) {
[self.mArray addObject:image];
}
3、同步函数、栅栏函数底层源码分析
同步函数:
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {// 串行队列
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);// 串行队列和barrier相同
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
_dispatch_barrier_sync_f
方法调用_dispatch_barrier_sync_f_inline
方法
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);//判断有无死锁
}// 栅栏函数也会死锁,判断有没有阻塞当前队列
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);// 准备操作
// 执行完成后对状态进行释放
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
三、调度组的使用
1、信号量控制最大并发数
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(2);
//任务1
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"执行任务1");
sleep(1);
NSLog(@"任务1完成");
dispatch_semaphore_signal(sem);
});
//任务2
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"执行任务2");
sleep(1);
NSLog(@"任务2完成");
dispatch_semaphore_signal(sem);
});
//任务3
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"执行任务3");
sleep(1);
NSLog(@"任务3完成");
dispatch_semaphore_signal(sem);
});
这里设置的最大并发数 为2
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
long value = os_atomic_dec2o(dsema, dsema_value, acquire);//信号量--
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);//进入异常处理
}
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
long value = os_atomic_inc2o(dsema, dsema_value, release);//信号量++
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);//进入异常等待
}
2、调度组
进组 出组搭配使用,不一起使用会Crash
dispatch_group_enter(group);
dispatch_group_leave(group);
dispatch_group_notify
代替进组 出组