iOS 多线程:GCD源码分析<一>

GCD

前言

在日常开发中,GCD是我们常用的多线程开发工具,它隔离了我们与线程间的直接交互,所以在使用过程中我们很少关注到GCD的底层原理。此次带着几个问题来对GCD源码进行分析。源码版本339.92.1

  • dispatch_async 如何实现的,分发到主队与全局队列有什么区别,一定会新建线程执行任务么?
  • dispatch_sync 如何实现的,为什么说 GCD 死锁是队列导致的而不是线程,死锁不是操作系统的概念么?
  • 信号量是如何实现的,有哪些使用场景?
  • dispatch_group 的等待与通知。
  • dispatch_once 如何实现?
  • dispatch_source 用来做定时器如何实现,有什么优点和用途?
  • dispatch_suspenddispatch_resume 如何实现,队列的的暂停和计时器的暂停有区别么?

Dispatch 源码分析

Dispatch 中常用的宏定义及基础知识

DISPATCH_DECL

#define DISPATCH_DECL(name) typedef struct name##_s *name##_t
GCD中的变量大多使用了这个宏,比如DISPATCH_DECL(dispatch_queue)展开后是

typedef struct dispatch_queue_s *dispatch_queue_t;
它的意思是定义一个dispatch_queue_t类型的指针,指向了一个dispatch_queue_s类型的结构体。

fastpath vs slowpath

1
2
#define fastpath(x) ((typeof(x))__builtin_expect((long)(x), ~0l))
#define slowpath(x) ((typeof(x))__builtin_expect((long)(x), 0l))

__builtin_expect 是编译器优化汇编代码的,fastpath(x) 依然返回 x,只是告诉编译器 x 的值一般不为 0,从而编译器可以进行优化。同理,slowpath(x) 表示 x 的值很可能为 0,希望编译器进行优化。

TSD

Thread Specific Data(TSD)是指线程私有数据。在多线程中,会用全局变量来实现多个函数间的数据共享,局部变量来实现内部的单独访问。TSD则是能够在同一个线程的不同函数中被访问,在不同线程时,相同的键值获取的数据随线程不同而不同。可以通过pthread的相关api来实现TSD:

1
2
3
4
5
6
//创建key
int pthread_key_create(pthread_key_t *, void (* _Nullable)(void *));
//get方法
void* _Nullable pthread_getspecific(pthread_key_t);
//set方法
int pthread_setspecific(pthread_key_t , const void * _Nullable);

原子操作
c++原子操作库

1
2
3
4
5
6
7
8
#define dispatch_atomic_xchg(p, n)  __sync_lock_test_and_set((p), (n))
#define dispatch_atomic_cmpxchg(p, o, n) __sync_bool_compare_and_swap((p), (o), (n))
#define dispatch_atomic_inc(p) __sync_add_and_fetch((p), 1)
#define dispatch_atomic_dec(p) __sync_sub_and_fetch((p), 1)
#define dispatch_atomic_add(p, v) __sync_add_and_fetch((p), (v))
#define dispatch_atomic_sub(p, v) __sync_sub_and_fetch((p), (v))
#define dispatch_atomic_or(p, v) __sync_fetch_and_or((p), (v))
#define dispatch_atomic_and(p, v) __sync_fetch_and_and((p), (v))

  • __sync_lock_test_and_set((p), (n))p设为value并返回p操作之前的值。
  • __sync_bool_compare_and_swap((p), (o), (n)) 这两个函数提供原子的比较和交换:如果p == o,就将n写入p(p代表地址,o代表oldValue,n代表newValue)
  • __sync_add_and_fetch((p), 1) 先自加1,再返回
  • __sync_sub_and_fetch((p), 1) 先自减1,再返回
  • __sync_add_and_fetch((p), (v)) 先自加v,再返回
  • __sync_sub_and_fetch((p), (v)) 先自减v,再返回
  • __sync_fetch_and_or((p), (v)) 先返回,再进行或运算
  • __sync_fetch_and_and((p), (v)) 先返回,再进行与运算

    libdispatch 关键数据结构

源码中数据结构的命名一般是以_s_t结尾,其中_t_s的指针类型,_s是结构体。比如dispatch_queue_tdispatch_queue_s

dispatch_object_s

dispatch_object_s 是GCD最基础的结构体,相当于基类,类似OC中的id类型。定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct dispatch_object_s {
DISPATCH_STRUCT_HEADER(object);
};
//os object头部宏定义
#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
isa; /* must be pointer-sized */ \ //isa
int volatile ref_cnt; \ //引用计数
int volatile xref_cnt //外部引用计数,两者都为0释放
//dispatch 结构体头部
#define DISPATCH_STRUCT_HEADER(x) \
_OS_OBJECT_HEADER( \
const struct dispatch_##x##_vtable_s *do_vtable, \ //vtable 结构体
do_ref_cnt, \
do_xref_cnt); \
struct dispatch_##x##_s *volatile do_next; \ //下一个do 链表next
struct dispatch_queue_s *do_targetq; \ //目标队列
void *do_ctxt; \ //上下文
void *do_finalizer; \ //销毁时调用函数
unsigned int do_suspend_cnt; //suspend计数, 用作暂停标志

dispatch_object_t

dispatch_object_t是个union的联合体,可以用dispatch_object_t代表这个联合体里的所有数据结构。

union与结构体有本质的不同,结构体中各成员有各自的内存空间,联合体中的各成员共享一段内存空间,一个联合变量的长度等于各成员最长的长度。用于节省内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef union {
struct _os_object_s *_os_obj;
struct dispatch_object_s *_do;
struct dispatch_continuation_s *_dc;
struct dispatch_queue_s *_dq;
struct dispatch_queue_attr_s *_dqa;
struct dispatch_group_s *_dg;
struct dispatch_source_s *_ds;
struct dispatch_mach_s *_dm;
struct dispatch_mach_msg_s *_dmsg;
struct dispatch_timer_aggregate_s *_dta;
struct dispatch_source_attr_s *_dsa;
struct dispatch_semaphore_s *_dsema;
struct dispatch_data_s *_ddata;
struct dispatch_io_s *_dchannel;
struct dispatch_operation_s *_doperation;
struct dispatch_disk_s *_ddisk;
} dispatch_object_t __attribute__((__transparent_union__));

dispatch_xxx_vtable

DISPATCH_VTABLE_HEADER(x) vtable结构体定义,包含了这个dispatch
_object_s 的操作函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define DISPATCH_VTABLE_HEADER(x) \
unsigned long do_type; \ //dispatch_object_s类型
const char *do_kind; \ //do 的说明
size_t (*do_debug)(struct dispatch_##x##_s *, char *, size_t); \ //debug方法
void (*do_invoke)(struct dispatch_##x##_s *); \ //任务出队时会触发invoke函数
unsigned long (*do_probe)(struct dispatch_##x##_s *); \ //用户队列创建的这个方法是空的,但rootqueue内的这个有一个 `_dispatch_queue_wakeup_global`函数。一般是唤醒队列的方法
void (*do_dispose)(struct dispatch_##x##_s *); //销毁队列的方法,内部通常调用do_finalizer 函数。

//dx_xxx 开头的宏定义,本质是调用vtable中定义的函数
#define dx_type(x) (x)->do_vtable->do_type
#define dx_metatype(x) ((x)->do_vtable->do_type & _DISPATCH_META_TYPE_MASK)
#define dx_kind(x) (x)->do_vtable->do_kind
#define dx_debug(x, y, z) (x)->do_vtable->do_debug((x), (y), (z))
#define dx_dispose(x) (x)->do_vtable->do_dispose(x)
#define dx_invoke(x) (x)->do_vtable->do_invoke(x)
#define dx_probe(x) (x)->do_vtable->do_probe(x)

dispatch_continuation_s

dispatch_continuation_s 结构体主要封装block和function,被传入的block会变成这个结构体对象传入队列。定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define DISPATCH_CONTINUATION_HEADER(x) \
_OS_OBJECT_HEADER( \
const void *do_vtable, \
do_ref_cnt, \
do_xref_cnt); \ //os_object_header
struct dispatch_##x##_s *volatile do_next; \ //下一个任务 链表next
dispatch_function_t dc_func; \ // 如 _dispatch_call_block_and_release 方法,结构体实际执行的方法
void *dc_ctxt; \ // 调用dispatch_async 传入的block 即待执行的内容,会作为参数传入dc_func
void *dc_data; \ //相关数据
void *dc_other; //其他

struct dispatch_continuation_s {
DISPATCH_CONTINUATION_HEADER(continuation);
};

dispatch_queue_s

dispatch_queue_s是队列的结构体,是我们接触最多的结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct dispatch_queue_s {
DISPATCH_STRUCT_HEADER(queue);
DISPATCH_QUEUE_HEADER;
DISPATCH_QUEUE_CACHELINE_PADDING; // for static queues only
};

#define DISPATCH_QUEUE_HEADER \
uint32_t volatile dq_running; \ //队列运行的任务数量
struct dispatch_object_s *volatile dq_items_head; \ //头节点
/* LP64 global queue cacheline boundary */ \
struct dispatch_object_s *volatile dq_items_tail; \ //尾节点
dispatch_queue_t dq_specific_q; \ //specifix队列
uint32_t dq_width; \ //队列并发数
unsigned int dq_is_thread_bound:1; \ //是否线程绑定
unsigned long dq_serialnum; \ //队列序号
const char *dq_label; \ //队列名
DISPATCH_INTROSPECTION_QUEUE_LIST;

dispatch_queue_create 实现

dispatch_queue_create 用来创建自定义队列,流程图和源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - mgr_root_q
// 4,5,6,7,8,9,10,11 - global queues 全局队列序列号
// we use 'xadd' on Intel, so the initial value == next assigned

unsigned long volatile _dispatch_queue_serial_numbers = 12;

dispatch_queue_t
dispatch_queue_create_with_target(const char *label,
dispatch_queue_attr_t attr, dispatch_queue_t tq)
{
dispatch_queue_t dq;
//申请内存
dq = _dispatch_alloc(DISPATCH_VTABLE(queue),
sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD);
//初始化设置基本属性
_dispatch_queue_init(dq);
if (label) {
dq->dq_label = strdup(label);
}
//并发队列,设置并发数为UINT32_MAX
if (attr == DISPATCH_QUEUE_CONCURRENT) {
dq->dq_width = UINT32_MAX;
if (!tq) {
//默认target queue 优先级DISPATCH_QUEUE_PRIORITY_DEFAULT
tq = _dispatch_get_root_queue(0, false);
}
} else {
if (!tq) {
// Default target queue is overcommit! //默认的target queue是overcommit 优先级是 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY
tq = _dispatch_get_root_queue(0, true);
}
if (slowpath(attr)) {
dispatch_debug_assert(!attr, "Invalid attribute");
}
}
//设置自定义的目标队列,dq队列的任务会放到目标队列执行。
dq->do_targetq = tq;
return _dispatch_introspection_queue_create(dq);
}

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
return dispatch_queue_create_with_target(label, attr,
DISPATCH_TARGET_QUEUE_DEFAULT);
}

//队列初始化方法
static inline void _dispatch_queue_init(dispatch_queue_t dq)
{
dq->do_next = (struct dispatch_queue_s *)DISPATCH_OBJECT_LISTLESS;
dq->dq_running = 0; //队列当前运行数初始为0
dq->dq_width = 1; //队列并发数默认为1,串行队列
dq->dq_serialnum = dispatch_atomic_inc_orig(&_dispatch_queue_serial_numbers,
relaxed); //序列号,在_dispatch_queue_serial_numbers基础上原子性加1
}

自定义队列创建流程:

  1. 申请内存空间,设置基本属性,默认并发数do_width=1
  2. 根据 attr 的属性值(nilDISPATCH_QUEUE_SERIAL(实际上就是 nil) 或 DISPATCH_QUEUE_CONCURRENT)设置目标队列,如果为Concurrent 设置并发数UINT32_MAX
  3. _dispatch_get_root_queue会获取一个全局队列,它有两个参数,分别表示优先级和是否支持 overcommit。一共有四个优先级,LOW、DEFAULT、HIGH 和 BACKGROUND,因此共有 8 个全局队列。带有 overcommit 的队列表示每当有任务提交时,系统都会新开一个线程处理,这样就不会造成某个线程过载(overcommit)。
  4. 设置 dq->do_targetq = tq; ,向队列提交的任务,会被放到它的目标队列来执行。普通串行队列的目标队列就是一个支持overcommit的全局队列,全局队列的底层是一个线程池。

dispatch_async 实现

直接调用dispatch_async_f方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void
dispatch_async(dispatch_queue_t dq, void (^work)(void))
{
dispatch_async_f(dq, _dispatch_Block_copy(work),
_dispatch_call_block_and_release);
}

void
dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
dispatch_continuation_t dc;
//如果并发数==1 说明是串行队列 走dispatch_barrier_async_f方法
if (dq->dq_width == 1) {
return dispatch_barrier_async_f(dq, ctxt, func);
}
dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_async_f_slow(dq, ctxt, func);
}
//设置标志位
dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
dc->dc_func = func;
dc->dc_ctxt = ctxt;

if (dq->do_targetq) {
return _dispatch_async_f2(dq, dc);
}

_dispatch_queue_push(dq, dc);
}
  • 如果是串行队列(dq_width = 1)调用dispatch_barrier_async_f函数处理
  • 如果有do_targetq 目标队列,则进行转发
  • 否则调用_dispatch_queue_push 将封装好的dc放入队列中。

_dispatch_queue_push 宏展开,调用栈如下:

1
2
3
4
5
6
7
_dispatch_queue_push(dq,dc)
└──_dispatch_trace_queue_push(dq, _tail)
└──_dispatch_queue_push(dq,_tail) {struct dispatch_object_s *tail = _tail._do;}
//判断链表中已经存在节点,将tail(即dc)放在链表尾部
└──if(!dispatch_queue_push_lists2(dq, tail, tail))
//否则将任务放在链表头部
└── _dispatch_queue_push_slow(dq, tail);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static inline void
_dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t _tail)
{
struct dispatch_object_s *tail = _tail._do;
if (!fastpath(_dispatch_queue_push_list2(dq, tail, tail))) {
_dispatch_queue_push_slow(dq, tail);
}
}
//判断链表中是否已经存在节点
static inline bool _dispatch_queue_push_list2(dispatch_queue_t dq, struct dispatch_object_s *head,
struct dispatch_object_s *tail) {
struct dispatch_object_s *prev;
tail->do_next = NULL;
//将tail原子性赋值给dq->dq_items_tail,同时返回之前的值并赋给prev
prev = dispatch_atomic_xchg2o(dq, dq_items_tail, tail, release);
if (fastpath(prev)) {
//如果prev不等于NULL,直接在链表尾部添加节点
prev->do_next = head;
}
//链表中之前有元素返回YES,否则返回NO
return (prev != NULL);
}
//将节点放到链表开头
void _dispatch_queue_push_slow(dispatch_queue_t dq,
struct dispatch_object_s *obj)
{
if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) {
//原子性的将head存储到链表头部
dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed);
//唤醒global queue队列
return _dispatch_queue_wakeup_global(dq);
}
//将obj放到链表头部并执行_dispatch_wakeup函数里的dx_probe()函数
_dispatch_queue_push_list_slow2(dq, obj);
}

由上面的代码可以看出_dispatch_queue_push分为两种情况:
1、如果队列的链表不为空,将节点添加到链表尾部,即dq->dq_item_tail=dc。然后队列会按先进先出(FIFO)来处理任务。
2、如果队列此时为空,进入到_dispatch_queue_push_slow函数。如果队列是全局队列会进入if分支,原子性的将节点添加到队列开头,并执行_dispatch_queue_wakeup_global唤醒全局队列;如果队列是主队列或自定义串行队列if分支判断不成立,执行_dispatch_queue_push_list_slow2函数,它会将节点添加到队列开头并执行_dispatch_wakeup函数唤醒队列。

dispatch_async第一阶段的工作主要是封装外部任务并添加到队列的链表中,可以用下图来表示:

接着来看队列唤醒的逻辑,主要分成主队列和全局队列的唤醒和任务执行逻辑:

  1. 如果是主队列,会先调用_dispatch_wakeup唤醒队列,然后执行_dispatch_main_queue_wakeup函数来唤醒主线程的Runloop,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
dispatch_queue_t _dispatch_wakeup(dispatch_object_t dou) {
if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) {
return NULL;
}
//_dispatch_queue_probe判断dq_items_tail是否为空,if分支不成立
if (!dx_probe(dou._do)) {
return NULL;
}
//如果dou._do->do_suspend_cnt==0,返回YES,否则返回NO;
//同时将DISPATCH_OBJECT_SUSPEND_LOCK赋值给dou._do->do_suspend_cnt
if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0,
DISPATCH_OBJECT_SUSPEND_LOCK, release)) {
//因为主线程do_suspend_cnt非0,所以主线程if分支判断成功
#if DISPATCH_COCOA_COMPAT
if (dou._dq == &_dispatch_main_q) {
//主队列的任务执行和Runloop关联,唤醒主队列
return _dispatch_main_queue_wakeup();
}
#endif
return NULL;
}
//放到目标队列中,重新走_dispatch_queue_push方法
_dispatch_retain(dou._do);
dispatch_queue_t tq = dou._do->do_targetq;
_dispatch_queue_push(tq, dou._do);
return tq;
}

//唤醒主线程Runloop
static dispatch_queue_t _dispatch_main_queue_wakeup(void) {
dispatch_queue_t dq = &_dispatch_main_q;
if (!dq->dq_is_thread_bound) {
return NULL;
}
//只初始化一次mach_port_t
dispatch_once_f(&_dispatch_main_q_port_pred, dq,
_dispatch_runloop_queue_port_init);
_dispatch_runloop_queue_wakeup_thread(dq);
return NULL;
}
//唤醒runloop
static inline void _dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq) {
// main_queue 的do_ctxt 是一个mach_port
mach_port_t mp = (mach_port_t)dq->do_ctxt;
if (!mp) {
return;
}
//唤醒主线程的runloop
kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0);
switch (kr) {
case MACH_SEND_TIMEOUT:
case MACH_SEND_TIMED_OUT:
case MACH_SEND_INVALID_DEST:
break;
default:
(void)dispatch_assume_zero(kr);
break;
}
}

当我们调用 dispatch_async(dispatch_get_main_queue(), block) 时,libDispatch 向主线程的 RunLoop 发送消息,RunLoop会被唤醒,并从消息中取得这个 block,并在回调 __CFRUNLOOP_IS_SERVICING_THE_MAIN_DISPATCH_QUEUE__()里执行这个 block。

  1. 如果是全局队列,调用_dispatch_queue_wakeup_global函数,它封装调用了核心函数_dispatch_queue_wakeup_global_slow,调用栈和核心代码如下:
1
2
3
_dispatch_queue_wakeup_global_slow(dq)
└──_dispatch_queue_wakeup_global2(dq, 1)
└──_dispatch_queue_wakeup_global_slow(dq, 1)
1
2
3
4
5
6
7
8
9
10
11
12
static void _dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) {  
// 如果线程池已满,则直接调用 _dispatch_worker_thread
// 否则创建线程池
pthread_t pthr;
while ((r = pthread_create(&pthr, NULL, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
sleep(1);
}
r = pthread_detach(pthr);
(void)dispatch_assume_zero(r);

详细代码说明在这里,检测当前线程池是否可用(已满),未满创建新的线程。创建新的线程后执行_dispatch_worker_thread函数。

1
2
3
_dispatch_worker_thread  
└──_dispatch_worker_thread4
└──_dispatch_continuation_pop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void * _dispatch_worker_thread(void *context) {
const int64_t timeout = (pqc ? 5ull : 65ull) * NSEC_PER_SEC;
//为了防止有些timer每隔一分钟调用,线程执行任务后会有65s的超时用来等待signal唤醒
//降低线程频繁创建销毁的性能消耗
do {
//取出一个任务并执行
_dispatch_root_queue_drain(dq);
} while (dispatch_semaphore_wait(qc->dgq_thread_mediator,
dispatch_time(0, timeout)) == 0);
//将线程池加一
(void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, relaxed);
_dispatch_queue_wakeup_global(dq);
_dispatch_release(dq);

return NULL;
}
1
2
3
4
5
6
7
8
9
10
11
static inline void _dispatch_continuation_pop(dispatch_object_t dou) {  
// ...
//dispatch_continuation_t结构体,执行dc->dc_func(dc->ctxt)
//本质是调用Block_layout结构体的invoke执行block的实现代码
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
if (dg) {
//如果是群组执行dispatch_group_leave
dispatch_group_leave(dg);
_dispatch_release(dg);
}
}

总结:dispatch_async 的流程是用链表保存所有提交的block,然后在底层线程池中,依次取出block并执行;而向主队列提交block则会向主线程的Runloop发送消息并唤醒Runloop,接着会在回调函数中取出block并执行。

dispatch_sync

dispatch_sync主要封装了 dispatch_sync_f函数,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void
dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
if (fastpath(dq->dq_width == 1)) {
//串行队列执行同步方法
return dispatch_barrier_sync_f(dq, ctxt, func);
}
if (slowpath(!dq->do_targetq)) {
//global 并发队列不要求执行顺序,直接执行具体的block
// the global concurrent queues do not need strict ordering
(void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
return _dispatch_sync_f_invoke(dq, ctxt, func);
}
//并行队列执行同步方法
_dispatch_sync_f2(dq, ctxt, func);
}

从上面代码可以看出,后续主要分为两种情况:

  1. 向串行队列提交同步任务,执行dispatch_barrier_sync_f函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void
dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) {
// global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case
return _dispatch_barrier_sync_f_slow(dq, ctxt, func);
}
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_barrier_sync_f_recurse(dq, ctxt, func);
}
_dispatch_barrier_sync_f_invoke(dq, ctxt, func);
}

如果队列无任务执行,调用_dispatch_barrier_sync_f_invoke执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void _dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func) {
//任务执行核心逻辑,将当前线程的dispatch_queue_key设置为dq,然后执行block,
//执行完之后再恢复到之前的old_dq
dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
_dispatch_thread_setspecific(dispatch_queue_key, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);

//如果队列中存在其他任务,用信号量的方法唤醒,然后继续执行下一个任务
if (slowpath(dq->dq_items_tail)) {
return _dispatch_barrier_sync_f2(dq);
}
if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) {
_dispatch_wakeup(dq);
}
}

如果队列存在其他任务或者被挂起,调用_dispatch_barrier_sync_f_slow函数,等待该队列的任务执行完成后用信号量通知队列继续执行任务。向当前串行队列提交任务就会走到如下分支,导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static void _dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
struct dispatch_continuation_s dc = {
.dc_data = dq,
.dc_func = func,
.dc_ctxt = ctxt,
.dc_other = (void*)sema,
};
struct dispatch_continuation_s dbss = {
.do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT |
DISPATCH_OBJ_SYNC_SLOW_BIT),
.dc_func = _dispatch_barrier_sync_f_slow_invoke, //这个方法内部会在执行完block后 调用_dispatch_thread_semaphore_signal(sema);还原信号量
.dc_ctxt = &dc,
.dc_data = (void*)_dispatch_thread_self(),
};
//使用信号量等待其他任务执行完成
_dispatch_queue_push(dq, &dbss);
_dispatch_thread_semaphore_wait(sema); // acquire
_dispatch_put_thread_semaphore(sema);
//收到signal信号,继续执行当前任务
if (slowpath(dq->do_targetq->do_targetq)) {
_dispatch_function_recurse(dq, ctxt, func);
} else {
_dispatch_function_invoke(dq, ctxt, func);
}
}
  1. 向并发队列提交同步任务,执行_dispatch_sync_f2函数。如果thread存在其他任务,或者队列被挂起,或者有正在执行的任务,则调用_dispatch_sync_f_slow函数,使用信号量等待,否则直接调用_dispatch_sync_f_invoke执行任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static inline void
_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func)
{
// 1) ensure that this thread hasn't enqueued anything ahead of this call
// 2) the queue is not suspended
if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){
return _dispatch_sync_f_slow(dq, ctxt, func, false);
}
uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed);
// re-check suspension after barrier check <rdar://problem/15242126>
if (slowpath(running & 1) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) {
running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed);
return _dispatch_sync_f_slow(dq, ctxt, func, running == 0);
}
if (slowpath(dq->do_targetq->do_targetq)) {
return _dispatch_sync_f_recurse(dq, ctxt, func);
}
_dispatch_sync_f_invoke(dq, ctxt, func);
}

static void
_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
bool wakeup)
{
_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
struct dispatch_continuation_s dss = {
.do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT,
#if DISPATCH_INTROSPECTION
.dc_func = func,
.dc_ctxt = ctxt,
.dc_data = (void*)_dispatch_thread_self(),
#endif
.dc_other = (void*)sema,
};
_dispatch_queue_push_wakeup(dq, &dss, wakeup);

_dispatch_thread_semaphore_wait(sema);
_dispatch_put_thread_semaphore(sema);

if (slowpath(dq->do_targetq->do_targetq)) {
_dispatch_function_recurse(dq, ctxt, func);
} else {
_dispatch_function_invoke(dq, ctxt, func);
}
if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) {
_dispatch_wakeup(dq);
}
}

dispatch_barrier_async

dispatch_barrier_async是OC中解决线程同步的一种方法
它调用了dispatch_barrier_async_f函数,与dispatch_async类似但是do_vtable多了一个标志位DISPATCH_OBJ_BARRIER_BIT

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void
dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_continuation_t dc;

dc = fastpath(_dispatch_continuation_alloc_cacheonly());
if (!dc) {
return _dispatch_barrier_async_f_slow(dq, ctxt, func);
}
// 区别于dispatch_async 多了个标志位 DISPATCH_OBJ_BARRIER_BIT,从队列中取任务时会用到
dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
dc->dc_func = func;
dc->dc_ctxt = ctxt;

_dispatch_queue_push(dq, dc);
}

  • dispatch_barrier_async如果传入的是global queue,在唤醒队列时会执行_dispatch_queue_wakeup_global函数,故执行效果同dispatch_async一致,栅栏方法会无效;
  • 如果传入的是自定义队列,_dispatch_continuation_pop 参数是自定义的queue,在取出任务时会用到DISPATCH_OBJ_BARRIER_BIT标记,调用栈如下:
1
2
3
4
_dispatch_queue_invoke
└──_dispatch_queue_class_invoke
└──dispatch_queue_invoke2
└──_dispatch_queue_drain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
_dispatch_thread_semaphore_t _dispatch_queue_drain(dispatch_object_t dou) {
dispatch_queue_t dq = dou._dq, orig_tq, old_dq;
old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
struct dispatch_object_s *dc, *next_dc;
_dispatch_thread_semaphore_t sema = 0;
while (dq->dq_items_tail) {
dc = _dispatch_queue_head(dq);
do {
bool redirect = false;
if (!fastpath(dq->dq_width == 1)) {
if (!DISPATCH_OBJ_IS_VTABLE(dc) &&
(long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
if (dq->dq_running > 1) {
goto out;
}
} else {
redirect = true;
}
}
next_dc = _dispatch_queue_next(dq, dc);
if (redirect) {
_dispatch_continuation_redirect(dq, dc); //依次取出任务并调用这个方法,使得block并发执行
continue;
}
//barrier block之前的block已经执行完,开始执行barrier block
if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) {
goto out;
}
_dispatch_continuation_pop(dc);
_dispatch_perfmon_workitem_inc();
} while ((dc = next_dc));
}
out:
_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
return sema; // 返回空的信号量
}

在while循环中依次取出任务并调用_dispatch_continuation_redirect函数,使block并发执行。当遇到DISPATCH_OBJ_BARRIER_BIT标记时,直接goto out,返回一个空的信号量,随后方法调用者会将这个任务单独放入队列,然后修改do_suspend_cnt标志保证后续while循环直接goto out,barrier block的任务执行完之后_dispatch_queue_class_invoke会将do_suspend_cnt重置回去,所以barrier block之后的任务会继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static inline void
_dispatch_queue_
class_invoke(dispatch_object_t dou,
dispatch_queue_t (*invoke)(dispatch_object_t,
_dispatch_thread_semaphore_t*))
{
dispatch_queue_t dq = dou._dq;
//...
_dispatch_thread_semaphore_t sema = 0;
tq = invoke(dq, &sema); //调用dispatch_queue_invoke2() 发现标志位返回空sema
(void)dispatch_atomic_dec2o(dq, dq_running, release);
if (sema) {
_dispatch_thread_semaphore_signal(sema);
} else if (tq) {
_dispatch_introspection_queue_item_complete(dq);
return _dispatch_queue_push(tq, dq); //将任务单独放入队列
}
}
dq->do_next = DISPATCH_OBJECT_LISTLESS;
if (!dispatch_atomic_sub2o(dq, do_suspend_cnt,
DISPATCH_OBJECT_SUSPEND_LOCK, release)) { //修改do_suspend_cnt
dispatch_atomic_barrier(seq_cst); // <rdar://problem/11915417>
if (dispatch_atomic_load2o(dq, dq_running, seq_cst) == 0) {
_dispatch_wakeup(dq); // verify that the queue is idle
}
}
_dispatch_introspection_queue_item_complete(dq);
_dispatch_release(dq); // added when the queue is put on the list
}

流程图:

总结

  1. dispatch_async如何实现?分发到主队列与全局队列有什么区别,一定会新建线程执行任务么?

    dispatch_async 会把任务添加到队列链表中,添加完成后唤醒队列,全局队列唤醒时会从线程池中取出可用线程,如果没有会新建线程,然后在线程中执行队列取出的任务;主队列会唤醒主线程的Runloop,然后在Runloop中通知GCD执行主队列提交的任务。

  2. dispatch_sync 如何实现?

    dispatch_sync 一般在当前线程执行,如果是主队列的任务还是会切换到主线程执行。它使用与线程绑定的信号量来实现串行执行的功能。
    向串行队列提交同步任务

    • 如果队列无任务调用_dispatch_barrier_sync_f_invoke执行任务。
    • 如果队列存在其他任务或被挂起,则调用_dispatch_barrier_sync_f_slow,并且调用线程对应的信号量进行wait操作,等待该队列的任务执行完之后用信号量通知队列继续执行任务。向当前串行队列同步提交block时会进入这个方法,导致死锁
向并发队列提交同步任务
*  如果队列无任务调用`_dispatch_sync_f_invoke`执行任务。
*  如果队列存在其他任务,或者队列被挂起,或者有正在执行的任务,则调用`_dispatch_sync_f_slow` 函数,使用信号量等待。
  1. dispatch_barrier_async 如何实现?

    改变了block vtable的标记位,当执行到该block时,会修改队列的suspend_count,待之前的任务都执行完毕,才会执行barrier block,待barrier_block执行完恢复suspend_count,并执行后面的任务。
    如果把Barrier block提交到global queue,执行效果与dispatch_async 一致,只有将Barrier blocks 提交到DISPATCH_QUEUE_CONCURRENT属性创建的自定义队列时它才有效。

参考文档

深入浅出 GCD 之 dispatch_queue
深入理解GCD
我所理解的 iOS 并发编程
扒了扒libdispatch源码
从NSTimer的失效性谈起(二):关于GCD Timer和libdispatch
GCD源码分析1 —— 开篇
细说 GCD(Grand Central Dispatch)如何用