iOS刨根問底-深入理解GCD
概述
做過iOS開發的同學相信對於GCD(Grand Central Dispatch)並不陌生,因爲在平時多線程開發過程中GCD應該是使用最多的技術甚至它要比它的上層封裝NSOperation還要常用,其中最主要的原因是簡單易用功能強大。本文將從GCD的原理和使用兩個層面分析GCD的內容,本文會結合源碼和實例分析使用GCD的注意事項,源碼解讀部分主要通過註釋源碼的方式方便進行源碼分析,具體到細節通過在源碼解釋說明。
開源的libdispatch
和前面一篇文章深入瞭解Runloop一樣 GCD的代碼 是開源的(也可以直接從蘋果官網 下載 ),這樣要弄清GCD的很多實現原理就有了可能,所以文中不涉及的很多細節大家可以通過源代碼進行了解。下面讓我們看一下關於常見的幾個類型的源碼:
隊列類型 dispatch_queue_t
dispatch_queue_t應該是平時接觸最多的一個GCD類型,比如說創建一個隊列,它返回的就是一個dispatch_queue_t類型:
dispatch_queue_t serialDispatch = dispatch_queue_create("com.cmjstudio.dispatch", nil);
通過查看源碼可以看到dispatch_queue_t的定義:
// 首先可以看到dispatch_queue_t本身只是dispatch_queue_s這個結構體指針 typedef struct dispatch_queue_s *dispatch_queue_t; // 繼續查看dispatch_queue_s定義,可以看到一個 DISPATCH_QUEUE_CLASS_HEADER的宏定義 struct dispatch_queue_s { DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1); /* 32bit hole on LP64 */ } DISPATCH_ATOMIC64_ALIGN; // 查看DISPATCH_QUEUE_CLASS_HEADER #define DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \ _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__); \ /* LP64 global queue cacheline boundary */ \ unsigned long dq_serialnum; \ const char *dq_label; \ DISPATCH_UNION_LE(uint32_t volatile dq_atomic_flags, \ const uint16_t dq_width, \ const uint16_t __dq_opaque2 \ ); \ dispatch_priority_t dq_priority; \ union { \ struct dispatch_queue_specific_head_s *dq_specific_head; \ struct dispatch_source_refs_s *ds_refs; \ struct dispatch_timer_source_refs_s *ds_timer_refs; \ struct dispatch_mach_recv_refs_s *dm_recv_refs; \ }; \ int volatile dq_sref_cnt // 展開_DISPATCH_QUEUE_CLASS_HEADER #define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \ DISPATCH_OBJECT_HEADER(x); \ DISPATCH_UNION_LE(uint64_t volatile dq_state, \ dispatch_lock dq_state_lock, \ uint32_t dq_state_bits \ ); \ __pointer_sized_field__ // 持續展開DISPATCH_OBJECT_HEADER #define DISPATCH_OBJECT_HEADER(x) \ struct dispatch_object_s _as_do[0]; \ _DISPATCH_OBJECT_HEADER(x) // 進一步查看 _DISPATCH_OBJECT_HEADER #define _DISPATCH_OBJECT_HEADER(x) \ struct _os_object_s _as_os_obj[0]; \ OS_OBJECT_STRUCT_HEADER(dispatch_##x); \ struct dispatch_##x##_s *volatile do_next; \ struct dispatch_queue_s *do_targetq; \ void *do_ctxt; \ void *do_finalizer // 再查看 OS_OBJECT_STRUCT_HEADER #define OS_OBJECT_STRUCT_HEADER(x) \ _OS_OBJECT_HEADER(\ const void *_objc_isa, \ do_ref_cnt, \ do_xref_cnt); \ const struct x##_vtable_s *do_vtable // 進一步查看 _OS_OBJECT_HEADER #define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \ isa; /* must be pointer-sized */ \ int volatile ref_cnt; \ int volatile xref_cnt
上面的源代碼拆分過程儘管繁瑣但是每一步都可以在源碼中順利的找到倒也不是太複雜。最終可以看到 dispatch_queue_t 本身存儲了我們平時常見的label、priority、specific等,本身就是isa指針和引用計數器等一些信息。
需要說明的是 dispatch 版本衆多,如果查看當前版本可以直接打印 DISPATCH_API_VERSION
即可。
創建隊列 dispatch_queue_create
dispatch_queue_create 用於創建一個隊列,返回類型是上面分析過的dispatch_queue_t ,那麼現在看一下如何創建一個隊列:
dispatch_queue_t dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) { return _dispatch_lane_create_with_target(label, attr, DISPATCH_TARGET_QUEUE_DEFAULT, true); } // 然後進一步查看 _dispatch_lane_create_with_target 的代碼 static dispatch_queue_t _dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa, dispatch_queue_t tq, bool legacy) { dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa); // // Step 1: Normalize arguments (qos, overcommit, tq) // dispatch_qos_t qos = dqai.dqai_qos; #if !HAVE_PTHREAD_WORKQUEUE_QOS if (qos == DISPATCH_QOS_USER_INTERACTIVE) { dqai.dqai_qos = qos = DISPATCH_QOS_USER_INITIATED; } if (qos == DISPATCH_QOS_MAINTENANCE) { dqai.dqai_qos = qos = DISPATCH_QOS_BACKGROUND; } #endif // !HAVE_PTHREAD_WORKQUEUE_QOS _dispatch_queue_attr_overcommit_t overcommit = dqai.dqai_overcommit; if (overcommit != _dispatch_queue_attr_overcommit_unspecified && tq) { if (tq->do_targetq) { DISPATCH_CLIENT_CRASH(tq, "Cannot specify both overcommit and " "a non-global target queue"); } } if (tq && dx_type(tq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) { // Handle discrepancies between attr and target queue, attributes win if (overcommit == _dispatch_queue_attr_overcommit_unspecified) { if (tq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) { overcommit = _dispatch_queue_attr_overcommit_enabled; } else { overcommit = _dispatch_queue_attr_overcommit_disabled; } } if (qos == DISPATCH_QOS_UNSPECIFIED) { qos = _dispatch_priority_qos(tq->dq_priority); } tq = NULL; } else if (tq && !tq->do_targetq) { // target is a pthread or runloop root queue, setting QoS or overcommit // is disallowed if (overcommit != _dispatch_queue_attr_overcommit_unspecified) { DISPATCH_CLIENT_CRASH(tq, "Cannot specify an overcommit attribute " "and use this kind of target queue"); } } else { if (overcommit == _dispatch_queue_attr_overcommit_unspecified) { // Serial queues default to overcommit! overcommit = dqai.dqai_concurrent ? _dispatch_queue_attr_overcommit_disabled : _dispatch_queue_attr_overcommit_enabled; } } if (!tq) { tq = _dispatch_get_root_queue( qos == DISPATCH_QOS_UNSPECIFIED ? DISPATCH_QOS_DEFAULT : qos, overcommit == _dispatch_queue_attr_overcommit_enabled)->_as_dq; if (unlikely(!tq)) { DISPATCH_CLIENT_CRASH(qos, "Invalid queue attribute"); } } // // Step 2: Initialize the queue // if (legacy) { // if any of these attributes is specified, use non legacy classes if (dqai.dqai_inactive || dqai.dqai_autorelease_frequency) { legacy = false; } } const void *vtable; dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0; if (dqai.dqai_concurrent) { vtable = DISPATCH_VTABLE(queue_concurrent); } else { vtable = DISPATCH_VTABLE(queue_serial); } switch (dqai.dqai_autorelease_frequency) { case DISPATCH_AUTORELEASE_FREQUENCY_NEVER: dqf |= DQF_AUTORELEASE_NEVER; break; case DISPATCH_AUTORELEASE_FREQUENCY_WORK_ITEM: dqf |= DQF_AUTORELEASE_ALWAYS; break; } if (label) { const char *tmp = _dispatch_strdup_if_mutable(label); if (tmp != label) { dqf |= DQF_LABEL_NEEDS_FREE; label = tmp; } } dispatch_lane_t dq = _dispatch_object_alloc(vtable, sizeof(struct dispatch_lane_s)); _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ? DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER | (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0)); dq->dq_label = label; dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos, dqai.dqai_relpri); if (overcommit == _dispatch_queue_attr_overcommit_enabled) { dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT; } if (!dqai.dqai_inactive) { _dispatch_queue_priority_inherit_from_target(dq, tq); _dispatch_lane_inherit_wlh_from_target(dq, tq); } _dispatch_retain(tq); dq->do_targetq = tq; _dispatch_object_debug(dq, "%s", __func__); return _dispatch_trace_queue_create(dq)._dq; }
從源碼註釋也可以看出主要有兩步操作,第一步是 Normalize arguments,第二部纔是真正創建隊列,忽略一些參數規範化操作。首先 _dispatch_get_root_queue
用於獲取root隊列,它有兩個參數:一個是隊列優先級(有6個:userInteractive>default>unspecified>userInitiated>utility>background),另一個是支持不支持過載overcommit(支持overcommit的隊列在創建隊列時無論系統是否有足夠的資源都會重新開一個線程),所以總共就有12個root隊列。對應的源代碼如下(其實是從一個數組中獲取):
DISPATCH_ALWAYS_INLINE DISPATCH_CONST static inline dispatch_queue_global_t _dispatch_get_root_queue(dispatch_qos_t qos, bool overcommit) { if (unlikely(qos < DISPATCH_QOS_MIN || qos > DISPATCH_QOS_MAX)) { DISPATCH_CLIENT_CRASH(qos, "Corrupted priority"); } return &_dispatch_root_queues[2 * (qos - 1) + overcommit]; }
至於12個root隊列可以查看源代碼:
struct dispatch_queue_global_s _dispatch_root_queues[] = { #define _DISPATCH_ROOT_QUEUE_IDX(n, flags) \ ((flags & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) ? \ DISPATCH_ROOT_QUEUE_IDX_##n##_QOS_OVERCOMMIT : \ DISPATCH_ROOT_QUEUE_IDX_##n##_QOS) #define _DISPATCH_ROOT_QUEUE_ENTRY(n, flags, ...) \ [_DISPATCH_ROOT_QUEUE_IDX(n, flags)] = { \ DISPATCH_GLOBAL_OBJECT_HEADER(queue_global), \ .dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE, \ .do_ctxt = _dispatch_root_queue_ctxt(_DISPATCH_ROOT_QUEUE_IDX(n, flags)), \ .dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL), \ .dq_priority = flags | ((flags & DISPATCH_PRIORITY_FLAG_FALLBACK) ? \ _dispatch_priority_make_fallback(DISPATCH_QOS_##n) : \ _dispatch_priority_make(DISPATCH_QOS_##n, 0)), \ __VA_ARGS__ \ } _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, 0, .dq_label = "com.apple.root.maintenance-qos", .dq_serialnum = 4, ), _DISPATCH_ROOT_QUEUE_ENTRY(MAINTENANCE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.maintenance-qos.overcommit", .dq_serialnum = 5, ), _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, 0, .dq_label = "com.apple.root.background-qos", .dq_serialnum = 6, ), _DISPATCH_ROOT_QUEUE_ENTRY(BACKGROUND, DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.background-qos.overcommit", .dq_serialnum = 7, ), _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, 0, .dq_label = "com.apple.root.utility-qos", .dq_serialnum = 8, ), _DISPATCH_ROOT_QUEUE_ENTRY(UTILITY, DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.utility-qos.overcommit", .dq_serialnum = 9, ), _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK, .dq_label = "com.apple.root.default-qos", .dq_serialnum = 10, ), _DISPATCH_ROOT_QUEUE_ENTRY(DEFAULT, DISPATCH_PRIORITY_FLAG_FALLBACK | DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.default-qos.overcommit", .dq_serialnum = 11, ), _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, 0, .dq_label = "com.apple.root.user-initiated-qos", .dq_serialnum = 12, ), _DISPATCH_ROOT_QUEUE_ENTRY(USER_INITIATED, DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.user-initiated-qos.overcommit", .dq_serialnum = 13, ), _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, 0, .dq_label = "com.apple.root.user-interactive-qos", .dq_serialnum = 14, ), _DISPATCH_ROOT_QUEUE_ENTRY(USER_INTERACTIVE, DISPATCH_PRIORITY_FLAG_OVERCOMMIT, .dq_label = "com.apple.root.user-interactive-qos.overcommit", .dq_serialnum = 15, ), };
其實我們平時用到的全局隊列也是其中一個root隊列,這個只要查看 dispatch_get_global_queue
代碼就可以了:
dispatch_queue_global_t dispatch_get_global_queue(intptr_t priority, uintptr_t flags) { dispatch_assert(countof(_dispatch_root_queues) == DISPATCH_ROOT_QUEUE_COUNT); if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) { return DISPATCH_BAD_INPUT; } dispatch_qos_t qos = _dispatch_qos_from_queue_priority(priority); #if !HAVE_PTHREAD_WORKQUEUE_QOS if (qos == QOS_CLASS_MAINTENANCE) { qos = DISPATCH_QOS_BACKGROUND; } else if (qos == QOS_CLASS_USER_INTERACTIVE) { qos = DISPATCH_QOS_USER_INITIATED; } #endif if (qos == DISPATCH_QOS_UNSPECIFIED) { return DISPATCH_BAD_INPUT; } return _dispatch_get_root_queue(qos, flags & DISPATCH_QUEUE_OVERCOMMIT); }
可以很清楚的看到, dispatch_get_global_queue
的本質就是調用 _dispatch_get_root_queue
,其中的flag只是一個蘋果予保留字段,通常我們傳0(你可以試試傳1應該隊列創建失敗),而代入上面的數組當使用 dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)
。如果打印這個返回結果可以看到:
<OS_dispatch_queue_global: com.apple.root.default-qos[0x1063cbf00] = { xref = -2147483648, ref = -2147483648, sref = 1, target = [0x0], width = 0xfff, state = 0x0060000000000000, in-barrier}>
首先通過上面數組進行索引 2 * (qos - 1) + overcommit
= 2*(4-1)+0 = 6 ,可以索引得到 dq_serialnum=10的隊列,剛好label=com.apple.root.default-qos。至於qos參數爲什麼是4呢?
DISPATCH_ALWAYS_INLINE static inline dispatch_qos_t _dispatch_qos_from_queue_priority(intptr_t priority) { switch (priority) { case DISPATCH_QUEUE_PRIORITY_BACKGROUND: return DISPATCH_QOS_BACKGROUND; case DISPATCH_QUEUE_PRIORITY_NON_INTERACTIVE: return DISPATCH_QOS_UTILITY; case DISPATCH_QUEUE_PRIORITY_LOW: return DISPATCH_QOS_UTILITY; case DISPATCH_QUEUE_PRIORITY_DEFAULT: return DISPATCH_QOS_DEFAULT; case DISPATCH_QUEUE_PRIORITY_HIGH: return DISPATCH_QOS_USER_INITIATED; default: return _dispatch_qos_from_qos_class((qos_class_t)priority); } #define DISPATCH_QOS_DEFAULT ((dispatch_qos_t)4) }
然後我們分析一下 dispatch_queue_create
中的 DISPATCH_VTABLE
這個宏:
#define DISPATCH_VTABLE(name) DISPATCH_OBJC_CLASS(name) // 查看DISPATCH_OBJC_CLASS #define DISPATCH_OBJC_CLASS(name) (&DISPATCH_CLASS_SYMBOL(name)) // 進一步查看 DISPATCH_CLASS_SYMBOL #define DISPATCH_CLASS_SYMBOL(name) OS_dispatch_##name##_class
解析之後就是按隊列類型分別獲取不同隊列類型的類: OS_dispatch_queue_concurrent_class 和 OS_dispatch_queue_serial_class ,對比我們平時打印一個隊列的信息(如下),可以看到 OS_dispatch_queue_serial 或者 OS_dispatch_queue_concurrent_class :
<OS_dispatch_queue_serial: com.cmjstudio.dispatch[0x6000026a5a00] = { xref = 1, ref = 1, sref = 1, target = com.apple.root.default-qos.overcommit[0x108a4bf80], width = 0x1, state = 0x001ffe2000000000, in-flight = 0}>
接着看 _dispatch_object_alloc
和 _dispatch_queue_init
,分別用於申請對應類型的內存和初始化。首先看前者的實現:
// 注意對於iOS並不滿足 OS_OBJECT_HAVE_OBJC1 void * _dispatch_object_alloc(const void *vtable, size_t size) { #if OS_OBJECT_HAVE_OBJC1 const struct dispatch_object_vtable_s *_vtable = vtable; dispatch_object_t dou; dou._os_obj = _os_object_alloc_realized(_vtable->_os_obj_objc_isa, size); dou._do->do_vtable = vtable; return dou._do; #else return _os_object_alloc_realized(vtable, size); #endif } // 接着看 _os_object_alloc_realized _os_object_t _os_object_alloc_realized(const void *cls, size_t size) { dispatch_assert(size >= sizeof(struct _os_object_s)); return _os_objc_alloc(cls, size); } // 再看一下 _os_objc_alloc static inline id _os_objc_alloc(Class cls, size_t size) { id obj; size -= sizeof(((struct _os_object_s *)NULL)->os_obj_isa); while (unlikely(!(obj = class_createInstance(cls, size)))) { _dispatch_temporary_resource_shortage(); } return obj; } DISPATCH_NOINLINE void _dispatch_temporary_resource_shortage(void) { sleep(1); __asm__ __volatile__(""); // prevent tailcall }
然後看一下內存分配之後的初始化 _dispatch_queue_init
源碼,也只是簡單的進行了初始化工作,不過值得一提的是 dqai.dqai_concurrent ? DISPATCH_QUEUE_WIDTH_MAX : 1
這個參數, DISPATCH_QUEUE_WIDTH_MAX
其實看一下源碼就知道是0x1000ull-2就是0xffe,而如果是串行隊列就是1,這也是爲什麼可以在上面打印中看到 width = 0x1
的原因,width本身就是併發數的個數,對於串行隊列是1而對於併發隊列是不限制的(回過頭去看全局隊列width爲什麼是0xfff呢,因爲它的width是#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1)
)=0x1000ull-1:
static inline dispatch_queue_class_t _dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf, uint16_t width, uint64_t initial_state_bits) { uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width); dispatch_queue_t dq = dqu._dq; dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK | DISPATCH_QUEUE_INACTIVE)) == 0); if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) { dq_state |= DISPATCH_QUEUE_INACTIVE + DISPATCH_QUEUE_NEEDS_ACTIVATION; dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) { dq->do_ref_cnt++; // released when DSF_DELETED is set } } dq_state |= (initial_state_bits & DISPATCH_QUEUE_ROLE_MASK); dq->do_next = DISPATCH_OBJECT_LISTLESS; dqf |= DQF_WIDTH(width); os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed); dq->dq_state = dq_state; dq->dq_serialnum = os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed); return dqu; }
接着看 dispatch_queue_create
的 dq->do_targetq = tq;
這句話是什麼意思呢?這個其實是當使用 dispatch_queue_create
創建的自定義隊列(事實上包括主隊列和管理隊列,也就是非全局隊列[可以看一下上面的源代碼全局隊列並沒有設置do_targetq,但是事實上它本身就是root隊列]),都需要壓入到全局隊列(這裏指的是root隊列)進行處理,這個目標隊列的目的就是允許我們將一個隊列放在另一個隊列裏執行任務。看一下上面創建自定義隊列的源碼不難發現,如果是自定義一個串行隊列其實最終就是一個root隊列。
爲了驗證上面關於主隊列也是root隊列的說法不放看一下主隊列的源碼:
DISPATCH_INLINE DISPATCH_ALWAYS_INLINE DISPATCH_CONST DISPATCH_NOTHROW dispatch_queue_main_t dispatch_get_main_queue(void) { return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q); } // 進一步查看 DISPATCH_GLOBAL_OBJECT #define DISPATCH_GLOBAL_OBJECT(type, object) ((OS_OBJECT_BRIDGE type)&(object)) // 先看一下類型 dispatch_queue_main_t #if defined(__DISPATCH_BUILDING_DISPATCH__) && !defined(__OBJC__) typedef struct dispatch_queue_static_s *dispatch_queue_main_t; #else DISPATCH_DECL_SUBCLASS(dispatch_queue_main, dispatch_queue_serial); #endif // 然後查看 _dispatch_main_q,可以看到真正的類型如下 struct dispatch_queue_static_s _dispatch_main_q = { DISPATCH_GLOBAL_OBJECT_HEADER(queue_main), #if !DISPATCH_USE_RESOLVERS .do_targetq = _dispatch_get_default_queue(true), #endif .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) | DISPATCH_QUEUE_ROLE_BASE_ANON, .dq_label = "com.apple.main-thread", .dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1), .dq_serialnum = 1, }; // 查看 _dispatch_get_default_queue源碼 #define _dispatch_get_default_queue(overcommit) \ _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS + \ !!(overcommit)]._as_dq // 查看 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS enum { DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS = 0, DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT, DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS, DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT, DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS, DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT, DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS, DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT, DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS, DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT, DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS, DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT, _DISPATCH_ROOT_QUEUE_IDX_COUNT, };
可以看到主隊列do_targetq也是一個root隊列(通過獲取_dispatch_root_queues),DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS =6 所以 _dispatch_root_queues[6+1]
就是 com.apple.root.default-qos.overcommit
,不妨打印一些主隊列(如下),可以看到target正是 com.apple.root.default-qos.overcommit
,而且width=1,其次由於 dispatch_queue_main_t
是對dispatch_queue_serial的重寫所以也是一個串行隊列:
<OS_dispatch_queue_main: com.apple.main-thread[0x1092dfb00] = { xref = -2147483648, ref = -2147483648, sref = 1, target = com.apple.root.default-qos.overcommit[0x1092dff80], width = 0x1, state = 0x001ffe9000000300, dirty, in-flight = 0, thread = 0x303 }>
到了這裏關於隊列的創建我們已經基本介紹完了,
可以看到不管是自定義隊列、全局隊列還是主隊列最終都直接或者間接的依賴12個root隊列來執行任務調度(儘管如此主隊列有自己的label,如果按照label計算總共16個,除了上面的12個,就是 com.apple.main-thread
還有兩個內部管理隊列 com.apple.libdispatch-manager
和 com.apple.root.libdispatch-manager
以及runloop的運行隊列)
。下面看一下幾個常用的隊列任務的執行方法的源碼,對於任務的執行GCD其實主要用兩個方法 dispatch_sync
和 dispatch_async
。
隊列和線程之間的關係
上面提到一個重要概念是overcommit,overcommit的隊列在隊列創建時會新建一個線程,非overcommit隊列創建隊列則未必創建線程。另外width=1意味着是串行隊列,只有一個線程可用,width=0xffe則意味着並行隊列,線程則是從線程池獲取,可用線程數是64個。
可以看到全局隊列是非overcommit的(flat保留字只能傳0,如果默認優先級則是com.apple.root.default-qos,但是width=0xffe是並行隊列);主隊列是overcommit的com.apple.root.default-qos.overcommit,不過它是串行隊列,width=1,並且運行的這個線程只能是主線程;自定義串行隊列是overcommit的,默認優先級則是 com.apple.root.default-qos.overcommit,並行隊列則是非overcommit的。
這裏看一下爲什麼上面說並行隊列最大線程數是64個,不妨結合幾個例子來查看:
/** 串行隊列只有一個線程,線程num > 2 **/ - (void)test1 { dispatch_queue_t serialQueue = dispatch_queue_create("com.cmjstudio.dispatch", DISPATCH_QUEUE_SERIAL); for (int i=0; i<1000; ++i) { dispatch_async(serialQueue, ^{ NSLog(@"%@,%i",[NSThread currentThread],i); // only one thread(number = 3~66) }); } } - (void)test2 { dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_USER_INITIATED, -1); dispatch_queue_t serialQueue = dispatch_queue_create("com.cmjstudio.dispatch", attr); for (int i=0; i<1000; ++i) { dispatch_async(serialQueue, ^{ NSLog(@"%@,%i",[NSThread currentThread],i); // only one thread }); } } /** 不管優先級多高並行隊列有最多有64個線程,線程num在3~66,在一次輪詢中遇到高優先級的會先執行 **/ - (void)test3 { dispatch_queue_t concurrentQueue = dispatch_queue_create("com.cmjstudio.dispatch", DISPATCH_QUEUE_CONCURRENT); for (int i=0; i<1000; ++i) { dispatch_async(concurrentQueue, ^{ NSLog(@"%@,%i",[NSThread currentThread],i); // 64 thread (num = 3~66) }); } } // 全局隊列是並行隊列(下面的demo會先輸出global然後是所有custom再是剩餘的global,整體遵循高優先級先執行規則,個別低優先級先輸出的原因是發送global期間還沒有輪訓到高優先級任務,一旦遇到就會先執行高優先級任務) - (void)test4 { dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0); for (int i=0; i<100; ++i) { dispatch_async(globalQueue, ^{ NSLog(@"global:%@,%i",[NSThread currentThread],i); // 64 thread (num = 3~66) }); } dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_USER_INITIATED, -1); dispatch_queue_t concurrentQueue = dispatch_queue_create("com.cmjstudio.dispatch", attr); for (int i=0; i<100; ++i) { dispatch_async(concurrentQueue, ^{ NSLog(@"custom:%@,%i",[NSThread currentThread],i); // 64 thread (num = 3~66) }); } } /** 串行隊列和並行隊列都存在線程數多了1個,number最大到了67,不過串行隊列的任務不一定在67這個線程中而是會複用前面的任意一個線程。說明串行隊列加入時一定會創建一個線程 **/ - (void)test5 { dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_USER_INITIATED, -1); dispatch_queue_t concurrentQueue = dispatch_queue_create("com.cmjstudio.dispatch", attr); for (int i=0; i<100; ++i) { dispatch_async(concurrentQueue, ^{ NSLog(@"concurrent:%@,%i",[NSThread currentThread],i); }); } dispatch_queue_t serialQueue = dispatch_queue_create("com.cmjstudio.dispatch", DISPATCH_QUEUE_SERIAL); for (int i=0; i<100; ++i) { dispatch_async(serialQueue, ^{ NSLog(@"serial:%@,%i",[NSThread currentThread],i); }); } } /** 當一個串行隊列依附於一個並行隊列時(非overcommit,如果是overcommit隊列則會新建一個線程),線程最多恢復到了64個,並不會再新建一個線程了 **/ - (void)test6 { dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_USER_INITIATED, -1); dispatch_queue_t concurrentQueue = dispatch_queue_create("com.cmjstudio.dispatch", attr); for (int i=0; i<100; ++i) { dispatch_async(concurrentQueue, ^{ NSLog(@"%@,%i",[NSThread currentThread],i); }); } dispatch_queue_t serialQueue = dispatch_queue_create("com.cmjstudio.dispatch", DISPATCH_QUEUE_SERIAL); dispatch_set_target_queue(serialQueue, concurrentQueue); for (int i=0; i<100; ++i) { dispatch_async(serialQueue, ^{ NSLog(@"%@,%i",[NSThread currentThread],i); }); } }
可以看到對於 dispatch_asyn 的調用(同步操作線程都在主線程不再贅述)串行隊列是overcommit的,創建隊列會創建1個新的線程,並行隊列是非overcommit的,不一定會新建線程,會從線程池中的64個線程中獲取並使用。另外上面的dispatch_set_target_queue 操作和前面源碼中的do_targetq是作用一樣的。
這樣以來反而串行隊列是開發中應該注意的,因爲一旦新建一個串行隊列就會新建一個線程,避免在類似循環操作中新建串行隊列,這個上限是多少是任意多嗎?其實也不是最多新增512個(不算主線程,number從4開始到515)但是這明顯已經是災難性的了。另外對於多個同一優先級的自定義串行隊列(比如:com.apple.root.default-qos.overcommit)對於 dispatch_asyn 調用又怎麼保證調用順序呢?儘管是overcommit可以創建多個線程,畢竟都在一個root隊列中執行,優先級又是相同的。
先看一段代碼:
-(void)test10{ dispatch_queue_t serialQueue1 = dispatch_queue_create("com.cmjstudio.dispatch1", DISPATCH_QUEUE_SERIAL); dispatch_queue_t serialQueue2 = dispatch_queue_create("com.cmjstudio.dispatch2", DISPATCH_QUEUE_SERIAL); dispatch_queue_t serialQueue3 = dispatch_queue_create("com.cmjstudio.dispatch3", DISPATCH_QUEUE_SERIAL); dispatch_async(serialQueue1, ^{ NSLog(@"serialQueue1 async invoke:%@", [NSThread currentThread]); }); dispatch_async(serialQueue2, ^{ NSLog(@"serialQueue2 async invoke:%@", [NSThread currentThread]); }); dispatch_async(serialQueue3, ^{ NSLog(@"serialQueue3 async invoke:%@", [NSThread currentThread]); }); }
三次執行順序依次如下:
2020-07-07 19:26:57.951602+0800 GCDBasic[68448:3758078] serialQueue2 async invoke:<NSThread: 0x600000e06500>{number = 4, name = (null)} 2020-07-07 19:26:57.951633+0800 GCDBasic[68448:3758079] serialQueue1 async invoke:<NSThread: 0x600000e37f00>{number = 6, name = (null)} 2020-07-07 19:26:57.951651+0800 GCDBasic[68448:3758076] serialQueue3 async invoke:<NSThread: 0x600000e3cc80>{number = 7, name = (null)} 2020-07-07 19:27:08.292555+0800 GCDBasic[68448:3758077] serialQueue1 async invoke:<NSThread: 0x600000e06480>{number = 3, name = (null)} 2020-07-07 19:27:08.292651+0800 GCDBasic[68448:3758271] serialQueue3 async invoke:<NSThread: 0x600000e37e80>{number = 8, name = (null)} 2020-07-07 19:27:08.292659+0800 GCDBasic[68448:3758273] serialQueue2 async invoke:<NSThread: 0x600000e30340>{number = 9, name = (null)} 2020-07-07 19:27:12.261150+0800 GCDBasic[68448:3758077] serialQueue1 async invoke:<NSThread: 0x600000e06480>{number = 3, name = (null)} 2020-07-07 19:27:12.261157+0800 GCDBasic[68448:3758273] serialQueue2 async invoke:<NSThread: 0x600000e30340>{number = 9, name = (null)} 2020-07-07 19:27:12.261162+0800 GCDBasic[68448:3758271] serialQueue3 async invoke:<NSThread: 0x600000e37e80>{number = 8, name = (null)}
確實單次執行都創建了新的線程(和前面說的 overcommit 是相符的),但是執行任務的順序可以說是隨機的,這個和線程調度有關,那麼如果有比較重的任務會不會造成影響呢?這個答案是如果都分別創建了隊列(overcommit)一般不會有影響,除非創建超過了512個,因爲儘管是同一個root隊列但是會創建不同的線程,此時當前root隊列僅僅控制任務FIFO,但是並不是只有第一個任務執行完第二個任務才能開始,也就是說FIFO控制的是開始的節奏,但是任務在不同的thread執行不會阻塞。當然一個串行隊列中的多個異步task是相互有執行順序的,比如下面的代碼task2一定會被task1阻塞,但是都不會阻塞task3:
-(void)test11{ dispatch_queue_t serialQueue1 = dispatch_queue_create("com.cmjstudio.dispatch1", DISPATCH_QUEUE_SERIAL); dispatch_async(serialQueue1, ^{ NSLog(@"task1 in"); [NSThread sleepForTimeInterval:5]; NSLog(@"serialQueue1-task1 async invoke:%@", [NSThread currentThread]); }); dispatch_async(serialQueue1, ^{ NSLog(@"task2 in"); [NSThread sleepForTimeInterval:5]; NSLog(@"serialQueue1-task2 async invoke:%@", [NSThread currentThread]); }); dispatch_queue_t serialQueue2 = dispatch_queue_create("com.cmjstudio.dispatch2", DISPATCH_QUEUE_SERIAL); dispatch_async(serialQueue2, ^{ NSLog(@"task3 in"); NSLog(@"serialQueue2-task3 async invoke:%@", [NSThread currentThread]); }); }
同步執行 dispatch_sync
DISPATCH_NOINLINE void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) { uintptr_t dc_flags = DC_FLAG_BLOCK; if (unlikely(_dispatch_block_has_private_data(work))) { return _dispatch_sync_block_with_privdata(dq, work, dc_flags); } _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags); } // 進一步查看 _dispatch_sync_f DISPATCH_NOINLINE static void _dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { _dispatch_sync_f_inline(dq, ctxt, func, dc_flags); } // 查看 _dispatch_sync_f_inline DISPATCH_ALWAYS_INLINE static inline void _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { if (likely(dq->dq_width == 1)) { return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags); } if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) { DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); } dispatch_lane_t dl = upcast(dq)._dl; // Global concurrent queues and queues bound to non-dispatch threads // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) { return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags); } if (unlikely(dq->do_targetq->do_targetq)) { return _dispatch_sync_recurse(dl, ctxt, func, dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG( _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags))); }
可以看到首先通過width判定是串行隊列還是併發隊列,如果是併發隊列則調用 _dispatch_sync_invoke_and_complete
,串行隊列則調用 _dispatch_barrier_sync_f
。先展開看一下串行隊列的同步執行源代碼:
DISPATCH_NOINLINE static void _dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags); } // 看一下 _dispatch_barrier_sync_f_inline DISPATCH_ALWAYS_INLINE static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags) { dispatch_tid tid = _dispatch_tid_self(); if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) { DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync"); } dispatch_lane_t dl = upcast(dq)._dl; // The more correct thing to do would be to merge the qos of the thread // that just acquired the barrier lock into the queue state. // // However this is too expensive for the fast path, so skip doing it. // The chosen tradeoff is that if an enqueue on a lower priority thread // contends with this fast path, this thread may receive a useless override. // // Global concurrent queues and queues bound to non-dispatch threads // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) { return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl, DC_FLAG_BARRIER | dc_flags); } if (unlikely(dl->do_targetq->do_targetq)) { return _dispatch_sync_recurse(dl, ctxt, func, DC_FLAG_BARRIER | dc_flags); } _dispatch_introspection_sync_begin(dl); _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop( dq, ctxt, func, dc_flags | DC_FLAG_BARRIER))); }
首先獲取線程id,然後處理死鎖的情況,因此這裏先看一下死鎖的情況:
DISPATCH_NOINLINE static void _dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt, dispatch_function_t func, uintptr_t top_dc_flags, dispatch_queue_class_t dqu, uintptr_t dc_flags) { dispatch_queue_t top_dq = top_dqu._dq; dispatch_queue_t dq = dqu._dq; if (unlikely(!dq->do_targetq)) { return _dispatch_sync_function_invoke(dq, ctxt, func); } pthread_priority_t pp = _dispatch_get_priority(); struct dispatch_sync_context_s dsc = { .dc_flags = DC_FLAG_SYNC_WAITER | dc_flags, .dc_func = _dispatch_async_and_wait_invoke, .dc_ctxt = &dsc, .dc_other = top_dq, .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG, .dc_voucher = _voucher_get(), .dsc_func = func, .dsc_ctxt = ctxt, .dsc_waiter = _dispatch_tid_self(), }; _dispatch_trace_item_push(top_dq, &dsc); __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq); if (dsc.dsc_func == NULL) { dispatch_queue_t stop_dq = dsc.dc_other; return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags); } _dispatch_introspection_sync_begin(top_dq); _dispatch_trace_item_pop(top_dq, &dsc); _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags DISPATCH_TRACE_ARG(&dsc)); } // 看一下 __DISPATCH_WAIT_FOR_QUEUE__ DISPATCH_NOINLINE static void __DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq) { uint64_t dq_state = _dispatch_wait_prepare(dq); if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) { DISPATCH_CLIENT_CRASH((uintptr_t)dq_state, "dispatch_sync called on queue " "already owned by current thread"); } // Blocks submitted to the main thread MUST run on the main thread, and // dispatch_async_and_wait also executes on the remote context rather than // the current thread. // // For both these cases we need to save the frame linkage for the sake of // _dispatch_async_and_wait_invoke _dispatch_thread_frame_save_state(&dsc->dsc_dtf); if (_dq_state_is_suspended(dq_state) || _dq_state_is_base_anon(dq_state)) { dsc->dc_data = DISPATCH_WLH_ANON; } else if (_dq_state_is_base_wlh(dq_state)) { dsc->dc_data = (dispatch_wlh_t)dq; } else { _dispatch_wait_compute_wlh(upcast(dq)._dl, dsc); } if (dsc->dc_data == DISPATCH_WLH_ANON) { dsc->dsc_override_qos_floor = dsc->dsc_override_qos = (uint8_t)_dispatch_get_basepri_override_qos_floor(); _dispatch_thread_event_init(&dsc->dsc_event); } dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority)); _dispatch_trace_runtime_event(sync_wait, dq, 0); if (dsc->dc_data == DISPATCH_WLH_ANON) { _dispatch_thread_event_wait(&dsc->dsc_event); // acquire } else { _dispatch_event_loop_wait_for_ownership(dsc); } if (dsc->dc_data == DISPATCH_WLH_ANON) { _dispatch_thread_event_destroy(&dsc->dsc_event); // If _dispatch_sync_waiter_wake() gave this thread an override, // ensure that the root queue sees it. if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) { _dispatch_set_basepri_override_qos(dsc->dsc_override_qos); } } } // 展開 _dq_state_drain_locked_by DISPATCH_ALWAYS_INLINE static inline bool _dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid) { return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid); } // 然後看一下 _dispatch_lock_is_locked_by DISPATCH_ALWAYS_INLINE static inline bool _dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid) { // equivalent to _dispatch_lock_owner(lock_value) == tid return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0; }
隊列push以後就是用 _dispatch_lock_is_locked_by
判斷將要調度的和當前等待的隊列是不是同一個,如果相同則返回YES,產生死鎖 DISPATCH_CLIENT_CRASH
;如果沒有產生死鎖,則執行 _dispatch_trace_item_pop()出隊列執行。如何執行調度呢,需要看一下 _dispatch_sync_invoke_and_complete_recurse
?
DISPATCH_NOINLINE static void _dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq, void *ctxt, dispatch_function_t func, uintptr_t dc_flags DISPATCH_TRACE_ARG(void *dc)) { _dispatch_sync_function_invoke_inline(dq, ctxt, func); _dispatch_trace_item_complete(dc); _dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags); } // 看一下 _dispatch_sync_function_invoke_inline DISPATCH_ALWAYS_INLINE static inline void _dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt, dispatch_function_t func) { dispatch_thread_frame_s dtf; _dispatch_thread_frame_push(&dtf, dq); _dispatch_client_callout(ctxt, func); _dispatch_perfmon_workitem_inc(); _dispatch_thread_frame_pop(&dtf); } // 看一下 _dispatch_client_callout void _dispatch_client_callout(void *ctxt, dispatch_function_t f) { @try { return f(ctxt); } @catch (...) { objc_terminate(); } }
可以比較清楚的看到最終執行f函數,這個就是外界傳過來的回調block。
異步調用 dispatch_async
void dispatch_async(dispatch_queue_t dq, dispatch_block_t work) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME; dispatch_qos_t qos; qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags); _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); } // 查看 _dispatch_continuation_init 代碼,主要進行block初始化 DISPATCH_ALWAYS_INLINE static inline dispatch_qos_t _dispatch_continuation_init(dispatch_continuation_t dc, dispatch_queue_class_t dqu, dispatch_block_t work, dispatch_block_flags_t flags, uintptr_t dc_flags) { void *ctxt = _dispatch_Block_copy(work); dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED; if (unlikely(_dispatch_block_has_private_data(work))) { dc->dc_flags = dc_flags; dc->dc_ctxt = ctxt; // will initialize all fields but requires dc_flags & dc_ctxt to be set return _dispatch_continuation_init_slow(dc, dqu, flags); } dispatch_function_t func = _dispatch_Block_invoke(work); if (dc_flags & DC_FLAG_CONSUME) { func = _dispatch_call_block_and_release; } return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags); } // 另外查看 _dispatch_continuation_async DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_async(dispatch_queue_class_t dqu, dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags) { #if DISPATCH_INTROSPECTION if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_push(dqu, dc); } #else (void)dc_flags; #endif return dx_push(dqu._dq, dc, qos); } // 進一步查看 dx_push #define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z) // 本質是調用dx_vtable的dq_push(其實就是調用對象的do_push),進一步查看 dq_push,我們假設是global_queue進行異步調用可以看到: DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane, .do_type = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE, .do_dispose = _dispatch_object_no_dispose, .do_debug = _dispatch_queue_debug, .do_invoke = _dispatch_object_no_invoke, .dq_activate = _dispatch_queue_no_activate, .dq_wakeup = _dispatch_root_queue_wakeup, .dq_push = _dispatch_root_queue_push, );
可以看到 dx_push
已經到了 _dispatch_root_queue_push
,這是可以接着查看 _dispatch_root_queue_push
:
DISPATCH_NOINLINE void _dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou, dispatch_qos_t qos) { #if DISPATCH_USE_KEVENT_WORKQUEUE dispatch_deferred_items_t ddi = _dispatch_deferred_items_get(); if (unlikely(ddi && ddi->ddi_can_stash)) { dispatch_object_t old_dou = ddi->ddi_stashed_dou; dispatch_priority_t rq_overcommit; rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT; if (likely(!old_dou._do || rq_overcommit)) { dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq; dispatch_qos_t old_qos = ddi->ddi_stashed_qos; ddi->ddi_stashed_rq = rq; ddi->ddi_stashed_dou = dou; ddi->ddi_stashed_qos = qos; _dispatch_debug("deferring item %p, rq %p, qos %d", dou._do, rq, qos); if (rq_overcommit) { ddi->ddi_can_stash = false; } if (likely(!old_dou._do)) { return; } // push the previously stashed item qos = old_qos; rq = old_rq; dou = old_dou; } } #endif #if HAVE_PTHREAD_WORKQUEUE_QOS if (_dispatch_root_queue_push_needs_override(rq, qos)) { return _dispatch_root_queue_push_override(rq, dou, qos); } #else (void)qos; #endif _dispatch_root_queue_push_inline(rq, dou, dou, 1); } // 多數情況下符合HAVE_PTHREAD_WORKQUEUE_QOS,會執行_dispatch_root_queue_push_override(對比的是qos與root隊列的qos是否一致,基本上都不一致的。) DISPATCH_NOINLINE static void _dispatch_root_queue_push_override(dispatch_queue_global_t orig_rq, dispatch_object_t dou, dispatch_qos_t qos) { bool overcommit = orig_rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT; dispatch_queue_global_t rq = _dispatch_get_root_queue(qos, overcommit); dispatch_continuation_t dc = dou._dc; if (_dispatch_object_is_redirection(dc)) { // no double-wrap is needed, _dispatch_async_redirect_invoke will do // the right thing dc->dc_func = (void *)orig_rq; } else { dc = _dispatch_continuation_alloc(); dc->do_vtable = DC_VTABLE(OVERRIDE_OWNING); dc->dc_ctxt = dc; dc->dc_other = orig_rq; dc->dc_data = dou._do; dc->dc_priority = DISPATCH_NO_PRIORITY; dc->dc_voucher = DISPATCH_NO_VOUCHER; } _dispatch_root_queue_push_inline(rq, dc, dc, 1); } // 上面_dispatch_object_is_redirection函數其實就是return _dispatch_object_has_type(dou,DISPATCH_CONTINUATION_TYPE(ASYNC_REDIRECT));所以自定義隊列會走這個if語句,如果是dispatch_get_global_queue不會走if語句。展開 _dispatch_root_queue_push_inline。注意_dispatch_root_queue_push_inline中的if把任務裝進隊列,大多數不走進if語句。但是第一個任務進來之前還是滿足這個條件式的,會進入這個條件語句去激活隊列來執行裏面的任務,後面再加入的任務因爲隊列被激活了,所以也就不太需要再進入這個隊列了,所以相對來說激活隊列只要一次 DISPATCH_ALWAYS_INLINE static inline void _dispatch_root_queue_push_inline(dispatch_queue_global_t dq, dispatch_object_t _head, dispatch_object_t _tail, int n) { struct dispatch_object_s *hd = _head._do, *tl = _tail._do; if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) { return _dispatch_root_queue_poke(dq, n, 0); } } // 我們可以看到,我們裝入到自定義的任務都被扔到其掛靠的root隊列裏去了,所以我們我們自己創建的隊列只是一個代理人身份,繼續查看 _dispatch_root_queue_poke 源碼 DISPATCH_NOINLINE void _dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor) { if (!_dispatch_queue_class_probe(dq)) { return; } #if !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_POOL if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)) #endif { if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) { _dispatch_root_queue_debug("worker thread request still pending " "for global queue: %p", dq); return; } } #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE return _dispatch_root_queue_poke_slow(dq, n, floor); } // 繼續查看 _dispatch_root_queue_poke_slow DISPATCH_NOINLINE static void _dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor) { int remaining = n; #if !defined(_WIN32) int r = ENOSYS; #endif _dispatch_root_queues_init(); _dispatch_debug_root_queue(dq, __func__); _dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n); #if !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_ROOT_QUEUES if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE) #endif { _dispatch_root_queue_debug("requesting new worker thread for global " "queue: %p", dq); r = _pthread_workqueue_addthreads(remaining, _dispatch_priority_to_pp_prefer_fallback(dq->dq_priority)); (void)dispatch_assume_zero(r); return; } #endif // !DISPATCH_USE_INTERNAL_WORKQUEUE #if DISPATCH_USE_PTHREAD_POOL dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt; if (likely(pqc->dpq_thread_mediator.do_vtable)) { while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) { _dispatch_root_queue_debug("signaled sleeping worker for " "global queue: %p", dq); if (!--remaining) { return; } } } bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT; if (overcommit) { os_atomic_add2o(dq, dgq_pending, remaining, relaxed); } else { if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) { _dispatch_root_queue_debug("worker thread request still pending for " "global queue: %p", dq); return; } } int can_request, t_count; // seq_cst with atomic store to tail <rdar://problem/16932833> t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered); do { can_request = t_count < floor ? 0 : t_count - floor; if (remaining > can_request) { _dispatch_root_queue_debug("pthread pool reducing request from %d to %d", remaining, can_request); os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed); remaining = can_request; } if (remaining == 0) { _dispatch_root_queue_debug("pthread pool is full for root queue: " "%p", dq); return; } } while (!os_atomic_cmpxchgvw2o(dq, dgq_thread_pool_size, t_count, t_count - remaining, &t_count, acquire)); #if !defined(_WIN32) pthread_attr_t *attr = &pqc->dpq_thread_attr; pthread_t tid, *pthr = &tid; #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES if (unlikely(dq == &_dispatch_mgr_root_queue)) { pthr = _dispatch_mgr_root_queue_init(); } #endif do { _dispatch_retain(dq); // released in _dispatch_worker_thread while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { if (r != EAGAIN) { (void)dispatch_assume_zero(r); } _dispatch_temporary_resource_shortage(); } } while (--remaining); #else // defined(_WIN32) #if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES if (unlikely(dq == &_dispatch_mgr_root_queue)) { _dispatch_mgr_root_queue_init(); } #endif do { _dispatch_retain(dq); // released in _dispatch_worker_thread uintptr_t hThread = 0; while (!(hThread = _beginthreadex(NULL, /* stack_size */ 0, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) { if (errno != EAGAIN) { (void)dispatch_assume(hThread); } _dispatch_temporary_resource_shortage(); } #if DISPATCH_USE_PTHREAD_ROOT_QUEUES if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) { (void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE); } #endif CloseHandle((HANDLE)hThread); } while (--remaining); #endif // defined(_WIN32) #else (void)floor; #endif // DISPATCH_USE_PTHREAD_POOL }
到了這裏可以清楚的看到對於全局隊列使用 _pthread_workqueue_addthreads
開闢線程,對於其他隊列使用 pthread_create
開闢新的線程。那麼任務執行的代碼爲什麼沒看到?其實_dispatch_root_queues_init中會首先執行第一個任務:
DISPATCH_ALWAYS_INLINE static inline void _dispatch_root_queues_init(void) { dispatch_once_f(&_dispatch_root_queues_pred, NULL, _dispatch_root_queues_init_once); } // 看一下dispatch_once_f就不展開了,可以看一下下面dispatch_once的分析,這裏看一下 _dispatch_root_queues_init_once static void _dispatch_root_queues_init_once(void *context DISPATCH_UNUSED) { _dispatch_fork_becomes_unsafe(); #if DISPATCH_USE_INTERNAL_WORKQUEUE size_t i; for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { _dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0, _dispatch_root_queues[i].dq_priority); } #else int wq_supported = _pthread_workqueue_supported(); int r = ENOTSUP; if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) { DISPATCH_INTERNAL_CRASH(wq_supported, "QoS Maintenance support required"); } if (unlikely(!_dispatch_kevent_workqueue_enabled)) { r = _pthread_workqueue_init(_dispatch_worker_thread2, offsetof(struct dispatch_queue_s, dq_serialnum), 0); #if DISPATCH_USE_KEVENT_WORKLOOP } else if (wq_supported & WORKQ_FEATURE_WORKLOOP) { r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2, (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread, (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread, offsetof(struct dispatch_queue_s, dq_serialnum), 0); #endif // DISPATCH_USE_KEVENT_WORKLOOP #if DISPATCH_USE_KEVENT_WORKQUEUE } else if (wq_supported & WORKQ_FEATURE_KEVENT) { r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2, (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread, offsetof(struct dispatch_queue_s, dq_serialnum), 0); #endif } else { DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support"); } if (r != 0) { DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported, "Root queue initialization failed"); } #endif // DISPATCH_USE_INTERNAL_WORKQUEUE } // 繼續查看 DISPATCH_NOINLINE static void _dispatch_workloop_worker_thread(uint64_t *workloop_id, dispatch_kevent_t *events, int *nevents) { if (!workloop_id || !dispatch_assume(*workloop_id != 0)) { return _dispatch_kevent_worker_thread(events, nevents); } if (!events || !nevents) { // events for worker thread request have already been delivered earlier return; } if (!dispatch_assume(*nevents && *events)) return; dispatch_wlh_t wlh = (dispatch_wlh_t)*workloop_id; _dispatch_adopt_wlh(wlh); _dispatch_wlh_worker_thread(wlh, *events, nevents); _dispatch_preserve_wlh_storage_reference(wlh); } // 查看 _dispatch_worker_thread2 static void _dispatch_worker_thread2(pthread_priority_t pp) { bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG; dispatch_queue_global_t dq; pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK; _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp); dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit); _dispatch_introspection_thread_add(); _dispatch_trace_runtime_event(worker_unpark, dq, 0); int pending = os_atomic_dec2o(dq, dgq_pending, relaxed); dispatch_assert(pending >= 0); _dispatch_root_queue_drain(dq, dq->dq_priority, DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN); _dispatch_voucher_debug("root queue clear", NULL); _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK); _dispatch_trace_runtime_event(worker_park, NULL, 0); } // 查看 _dispatch_root_queue_drain DISPATCH_NOT_TAIL_CALLED // prevent tailcall (for Instrument DTrace probe) static void _dispatch_root_queue_drain(dispatch_queue_global_t dq, dispatch_priority_t pri, dispatch_invoke_flags_t flags) { #if DISPATCH_DEBUG dispatch_queue_t cq; if (unlikely(cq = _dispatch_queue_get_current())) { DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling"); } #endif _dispatch_queue_set_current(dq); _dispatch_init_basepri(pri); _dispatch_adopt_wlh_anon(); struct dispatch_object_s *item; bool reset = false; dispatch_invoke_context_s dic = { }; #if DISPATCH_COCOA_COMPAT _dispatch_last_resort_autorelease_pool_push(&dic); #endif // DISPATCH_COCOA_COMPAT _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri); _dispatch_perfmon_start(); while (likely(item = _dispatch_root_queue_drain_one(dq))) { if (reset) _dispatch_wqthread_override_reset(); _dispatch_continuation_pop_inline(item, &dic, flags, dq); reset = _dispatch_reset_basepri_override(); if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) { break; } } // overcommit or not. worker thread if (pri & DISPATCH_PRIORITY_FLAG_OVERCOMMIT) { _dispatch_perfmon_end(perfmon_thread_worker_oc); } else { _dispatch_perfmon_end(perfmon_thread_worker_non_oc); } #if DISPATCH_COCOA_COMPAT _dispatch_last_resort_autorelease_pool_pop(&dic); #endif // DISPATCH_COCOA_COMPAT _dispatch_reset_wlh(); _dispatch_clear_basepri(); _dispatch_queue_set_current(NULL); } // 查看 _dispatch_continuation_pop_inline 這個是出隊列操作,這裏注意一下首先看了有沒有vtable(_dispatch_object_has_vtable),這裏解釋了爲什麼dispatch_barrier_async儘管主要流程和dispatch_async一模一樣但是無法應用到全局隊列的原因,因爲全局隊列沒有v_table結構會直接像dispatch_async一樣執行 DISPATCH_ALWAYS_INLINE_NDEBUG static inline void _dispatch_continuation_pop_inline(dispatch_object_t dou, dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu) { dispatch_pthread_root_queue_observer_hooks_t observer_hooks = _dispatch_get_pthread_root_queue_observer_hooks(); if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq); flags &= _DISPATCH_INVOKE_PROPAGATE_MASK; if (_dispatch_object_has_vtable(dou)) { dx_invoke(dou._dq, dic, flags); } else { _dispatch_continuation_invoke_inline(dou, flags, dqu); } if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq); } // 查看 _dispatch_continuation_invoke_inline ,這裏`_dispatch_client_callout`就是真正的執行block操作 ,當然還有一種情況這裏還不會走就是_dispatch_continuation_with_group_invoke,這個後面的dispatch_group會用到 DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_invoke_inline(dispatch_object_t dou, dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu) { dispatch_continuation_t dc = dou._dc, dc1; dispatch_invoke_with_autoreleasepool(flags, { uintptr_t dc_flags = dc->dc_flags; // Add the item back to the cache before calling the function. This // allows the 'hot' continuation to be used for a quick callback. // // The ccache version is per-thread. // Therefore, the object has not been reused yet. // This generates better assembly. _dispatch_continuation_voucher_adopt(dc, dc_flags); if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) { _dispatch_trace_item_pop(dqu, dou); } if (dc_flags & DC_FLAG_CONSUME) { dc1 = _dispatch_continuation_free_cacheonly(dc); } else { dc1 = NULL; } if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) { _dispatch_continuation_with_group_invoke(dc); } else { _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); _dispatch_trace_item_complete(dc); } if (unlikely(dc1)) { _dispatch_continuation_free_to_cache_limit(dc1); } }); _dispatch_perfmon_workitem_inc(); }
另外對於 _dispatch_continuation_init
的代碼中的並沒有對其進行展開,其實_dispatch_continuation_init中的 func
就是 _dispatch_call_block_and_release
(源碼如下),它在 dx_push
調用時包裝進了 qos
。
void _dispatch_call_block_and_release(void *block) { void (^b)(void) = block; b(); Block_release(b); }
dispatch_async代碼實現看起來比較複雜,因爲其中的數據結構較多,分支流程控制比較複雜。不過思路其實很簡單,用鏈表保存所有提交的 block(先進先出,,在隊列本身維護了一個鏈表新加入block放到鏈表尾部),然後在底層線程池中,依次取出 block 並執行。
類似的可以看到 dispatch_barrier_async
源碼和dispatch_async幾乎一致,僅僅多了一個標記位 DC_FLAG_BARRIER
,這個標記位用於在取出任務時進行判斷,正常的異步調用會依次取出,而如果遇到了 DC_FLAG_BARRIER
則會返回,所以可以等待所有任務執行結束執行dx_push(不過提醒一下dispatch_barrier_async必須在自定義隊列纔有用,原因是global隊列沒有v_table結構,同時不要試圖在主隊列調用,否則會crash):
void dispatch_barrier_async(dispatch_queue_t dq, dispatch_block_t work) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_BARRIER; dispatch_qos_t qos; qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags); _dispatch_continuation_async(dq, dc, qos, dc_flags); }
單次執行dispatch_once
下面的代碼在objc開發中應該很常見,這種方式可以保證 instance 只會創建一次:
+ (instancetype)sharedInstance { static MyClass *instance; static dispatch_once_t onceToken; dispatch_once(&onceToken, ^{ instance = [[MyClass alloc] init]; }); return instance; }
不放分析一下dispatch_once的源碼:
void dispatch_once(dispatch_once_t *val, dispatch_block_t block) { dispatch_once_f(val, block, _dispatch_Block_invoke(block)); } // 展開 dispatch_once_f DISPATCH_NOINLINE void dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func) { dispatch_once_gate_t l = (dispatch_once_gate_t)val; #if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER uintptr_t v = os_atomic_load(&l->dgo_once, acquire); if (likely(v == DLOCK_ONCE_DONE)) { return; } #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER if (likely(DISPATCH_ONCE_IS_GEN(v))) { return _dispatch_once_mark_done_if_quiesced(l, v); } #endif #endif if (_dispatch_once_gate_tryenter(l)) { return _dispatch_once_callout(l, ctxt, func); } return _dispatch_once_wait(l); } // 如果 os_atomic_load爲 DLOCK_ONCE_DONE 則直接返回,否則進入_dispatch_once_gate_tryenter,在這裏首先判斷對象是否存儲過,如果存儲過則則標記爲unlock DISPATCH_ALWAYS_INLINE static inline bool _dispatch_once_gate_tryenter(dispatch_once_gate_t l) { return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED, (uintptr_t)_dispatch_lock_value_for_self(), relaxed); } // 如果沒有存儲過則執行 _dispatch_once_callout,主要是執行block DISPATCH_NOINLINE static void _dispatch_once_callout(dispatch_once_gate_t l, void *ctxt, dispatch_function_t func) { _dispatch_client_callout(ctxt, func); _dispatch_once_gate_broadcast(l); } // 執行過block則調用 _dispatch_once_gate_broadcast DISPATCH_ALWAYS_INLINE static inline void _dispatch_once_gate_broadcast(dispatch_once_gate_t l) { dispatch_lock value_self = _dispatch_lock_value_for_self(); uintptr_t v; #if DISPATCH_ONCE_USE_QUIESCENT_COUNTER v = _dispatch_once_mark_quiescing(l); #else v = _dispatch_once_mark_done(l); #endif if (likely((dispatch_lock)v == value_self)) return; _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v); } // 在 _dispatch_once_gate_broadcast 中由於執行完畢,使用_dispatch_once_mark_don畢標記爲done DISPATCH_ALWAYS_INLINE static inline uintptr_t _dispatch_once_mark_done(dispatch_once_gate_t dgo) { return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release); }
swift中實現dispatch_once
說到這裏,從swift3.0以後已經沒辦法使用dispach_once了,其實原因很簡單因爲在swift1.x的 static var/let
屬性就已經是 dispatch_once
在後臺執行的了,所以對於單例的創建沒有必要顯示調用了。但是有時候其他情況我們還是需要使用單次執行怎麼辦呢?代替方法:使用全局變量(例如創建一個對象實例或者初始化成一個立即執行的閉包:let g = {}();_ = g;),當然習慣於dispatch_once的朋友有時候並不適應這種方法,這裏給出一個比較簡單的方案:
public extension DispatchQueue { private static var _onceTracker = [String]() public class func once(file: String = #file, function: String = #function, line: Int = #line, block:(Void)->Void) { let token = file + ":" + function + ":" + String(line) once(token: token, block: block) } /** Executes a block of code, associated with a unique token, only once. The code is thread safe and will only execute the code once even in the presence of multithreaded calls. - parameter token: A unique reverse DNS style name such as com.vectorform.<name> or a GUID - parameter block: Block to execute once */ public class func once(token: String, block:(Void)->Void) { objc_sync_enter(self) defer { objc_sync_exit(self) } if _onceTracker.contains(token) { return } _onceTracker.append(token) block() } }
延遲執行 dispatch_after
dispatch_after也是一個常用的延遲執行的方法,比如常見的使用方法是:
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{ NSLog(@"..."); });
在查看 dispatch_after
源碼之前先看一下另一個內容事件源 dispatch_source_t
,其實 dispatch_source_t
是一個很少讓開發者和GCD聯想到一起的一個類型,它本身也有對應的創建方法 dispatch_source_create
(事實上它的使用甚至可以追蹤到Runloop)。多數開發者認識 dispatch_source_t
都是通過定時器,很多文章會教你如何創建一個比較準確的定時器,比如下面的代碼:
dispatch_source_t timerSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)); dispatch_source_set_timer(timerSource, dispatch_time(DISPATCH_TIME_NOW, 0), 3*NSEC_PER_SEC, 0); dispatch_source_set_event_handler(timerSource, ^{ NSLog(@"dispatch_source_t..."); }); dispatch_resume(timerSource); self->source = timerSource;
如果你知道上面一個定時器如何執行的那麼下面看一下dispatch_after應該就比較容易明白了:
void dispatch_after(dispatch_time_t when, dispatch_queue_t queue, dispatch_block_t work) { _dispatch_after(when, queue, NULL, work, true); } // 查看 _dispatch_after DISPATCH_ALWAYS_INLINE static inline void _dispatch_after(dispatch_time_t when, dispatch_queue_t dq, void *ctxt, void *handler, bool block) { dispatch_timer_source_refs_t dt; dispatch_source_t ds; uint64_t leeway, delta; if (when == DISPATCH_TIME_FOREVER) { #if DISPATCH_DEBUG DISPATCH_CLIENT_CRASH(0, "dispatch_after called with 'when' == infinity"); #endif return; } delta = _dispatch_timeout(when); if (delta == 0) { if (block) { return dispatch_async(dq, handler); } return dispatch_async_f(dq, ctxt, handler); } leeway = delta / 10; // <rdar://problem/13447496> if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC; if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC; // this function can and should be optimized to not use a dispatch source ds = dispatch_source_create(&_dispatch_source_type_after, 0, 0, dq); dt = ds->ds_timer_refs; dispatch_continuation_t dc = _dispatch_continuation_alloc(); if (block) { _dispatch_continuation_init(dc, dq, handler, 0, 0); } else { _dispatch_continuation_init_f(dc, dq, ctxt, handler, 0, 0); } // reference `ds` so that it doesn't show up as a leak dc->dc_data = ds; _dispatch_trace_item_push(dq, dc); os_atomic_store2o(dt, ds_handler[DS_EVENT_HANDLER], dc, relaxed); dispatch_clock_t clock; uint64_t target; _dispatch_time_to_clock_and_value(when, &clock, &target); if (clock != DISPATCH_CLOCK_WALL) { leeway = _dispatch_time_nano2mach(leeway); } dt->du_timer_flags |= _dispatch_timer_flags_from_clock(clock); dt->dt_timer.target = target; dt->dt_timer.interval = UINT64_MAX; dt->dt_timer.deadline = target + leeway; dispatch_activate(ds); }
代碼並不是太複雜,無時間差則直接調用 dispatch_async
,否則先創建一個 dispatch_source_t
,不同的是這裏的類型並不是 DISPATCH_SOURCE_TYPE_TIMER
而是 _dispatch_source_type_after
,查看源碼不難發現它只是dispatch_source_type_s類型的一個常量和 _dispatch_source_type_timer
並沒有明顯區別:
const dispatch_source_type_s _dispatch_source_type_after = { .dst_kind = "timer (after)", .dst_filter = DISPATCH_EVFILT_TIMER_WITH_CLOCK, .dst_flags = EV_DISPATCH, .dst_mask = 0, .dst_timer_flags = DISPATCH_TIMER_AFTER, .dst_action = DISPATCH_UNOTE_ACTION_SOURCE_TIMER, .dst_size = sizeof(struct dispatch_timer_source_refs_s), .dst_create = _dispatch_source_timer_create, .dst_merge_evt = _dispatch_source_merge_evt, };
而和dispatch_activate()其實和dispatch_resume() 是一樣的開啓定時器。那麼爲什麼看不到 dispatch_source_set_event_handler
來給timer設置handler呢?不放看一下 dispatch_source_set_event_handler
的源代碼:
void dispatch_source_set_event_handler(dispatch_source_t ds, dispatch_block_t handler) { _dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true); } // 查看 _dispatch_source_set_handler DISPATCH_NOINLINE static void _dispatch_source_set_handler(dispatch_source_t ds, void *func, uintptr_t kind, bool is_block) { dispatch_continuation_t dc; dc = _dispatch_source_handler_alloc(ds, func, kind, is_block); if (_dispatch_lane_try_inactive_suspend(ds)) { _dispatch_source_handler_replace(ds, kind, dc); return _dispatch_lane_resume(ds, false); } dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds); if (unlikely(dqf & DSF_STRICT)) { DISPATCH_CLIENT_CRASH(kind, "Cannot change a handler of this source " "after it has been activated"); } // Ignore handlers mutations past cancelation, it's harmless if ((dqf & DSF_CANCELED) == 0) { _dispatch_ktrace1(DISPATCH_PERF_post_activate_mutation, ds); if (kind == DS_REGISTN_HANDLER) { _dispatch_bug_deprecated("Setting registration handler after " "the source has been activated"); } else if (func == NULL) { _dispatch_bug_deprecated("Clearing handler after " "the source has been activated"); } } dc->dc_data = (void *)kind; _dispatch_barrier_trysync_or_async_f(ds, dc, _dispatch_source_set_handler_slow, 0); }
可以看到最終還是封裝成一個 dispatch_continuation_t
進行同步或者異步調用,而上面 _dispatch_after
直接構建了 dispatch_continuation_t
進行執行。
取消延遲執行的任務
使用 dispatch_after
還有一個問題就是取消問題,當然通常遇到了這種問題大部分答案就是使用下面的方式:
[self performSelector:@selector(myDelayedMethod) withObject: self afterDelay: desiredDelay]; [NSObject cancelPreviousPerformRequestsWithTarget: self selector:@selector(myDelayedMethod) object: self];
不過如果你使用的是iOS 8及其以上的版本,那麼其實是可以取消的(如下),當然如果你還在支持iOS 8以下的版本不妨試試這個自定義的 dispatch_cancelable_block_t 類:
dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{ NSLog(@"dispatch_after..."); }); dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 3*NSEC_PER_SEC), dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), block); // 取消 dispatch_block_cancel(block);
如果你用的是swift那麼恭喜你,很簡單:
let dispatchItem = DispatchWorkItem { handler() } DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + Double(Int64(3 * Double(NSEC_PER_SEC))) / Double(NSEC_PER_SEC), execute: dispatchItem) // 取消 dispatchItem.cancel()
dispatch_semaphore
信號量是線程同步操作中很常用的一個操作,常用的幾個類型:
dispatch_semaphore_t:信號量類型
dispatch_semaphore_create:創建一個信號量
dispatch_semaphore_wait:發送一個等待信號,信號量-1,當信號量爲0阻塞線程,大於0則開始執行後面的邏輯(也就是說執行dispatch_semaphore_wait前如果信號量<=0則阻塞,否則正常執行後面的邏輯)
dispatch_semaphore_signal:發送喚醒信號,信號量會+1
比如我們有個操作foo()在異步線程已經開始執行,同時可能用戶會手動再次觸發動作bar(),但是bar依賴foo完成則可以使用信號量:
- (void)foo { dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ // 這裏執行其他任務。。。 // TODO: // 執行完發送信號 dispatch_semaphore_signal(semaphore); }); self->semaphore = semaphore; } - (void)bar { // 等待上面的操作完成,如果60s還沒有完成則超時繼續執行下面的邏輯 dispatch_semaphore_wait(self.semaphore, dispatch_time(DISPATCH_TIME_NOW, 60*NSEC_PER_SEC)); // 這裏執行其他任務。。。但是依賴上面的操作完成 // TODO: }
那麼信號量是如何實現的呢,不妨看一下它的源碼:
// 首先看一下dispatch_semaphore_t,沒錯和上面一樣本質就是 dispatch_semaphore_s,dsema_value代表當前信號量,dsema_orig表示初始信號量 DISPATCH_CLASS_DECL(semaphore, OBJECT); struct dispatch_semaphore_s { DISPATCH_OBJECT_HEADER(semaphore); intptr_t volatile dsema_value; intptr_t dsema_orig; _dispatch_sema4_t dsema_sema; }; // 查看 dispatch_semaphore_create 源碼,其實並不複雜創建分配DISPATCH_VTABLE結構的空間,設置初始信號量,但是可以清楚的看到同樣指定了目標隊列,這是一個優先級爲`DISPATCH_QUEUE_PRIORITY_DEFAULT`的非過載隊列 dispatch_semaphore_t dispatch_semaphore_create(intptr_t value) { dispatch_semaphore_t dsema; // If the internal value is negative, then the absolute of the value is // equal to the number of waiting threads. Therefore it is bogus to // initialize the semaphore with a negative value. if (value < 0) { return DISPATCH_BAD_INPUT; } dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore), sizeof(struct dispatch_semaphore_s)); dsema->do_next = DISPATCH_OBJECT_LISTLESS; dsema->do_targetq = _dispatch_get_default_queue(false); dsema->dsema_value = value; _dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); dsema->dsema_orig = value; return dsema; } // 下面看一下 dispatch_semaphore_wait,首先`os_atomic_dec2o`信號量減一,當然遞減之後信號量大於等於0它其實什麼也不做繼續執行就好了,但是如果不滿足執行_dispatch_semaphore_wait_slow 等待信號量喚醒或者timeout超時 dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long value = os_atomic_dec2o(dsema, dsema_value, acquire); if (likely(value >= 0)) { return 0; } return _dispatch_semaphore_wait_slow(dsema, timeout); } // 看一下 _dispatch_semaphore_wait_slow 源碼,這裏首先對於兩種極端情況:如果是DISPATCH_TIME_NOW則執行信號量+1並返回超時信號,DISPATCH_TIME_FOREVER則一直等待,默認則調用 `_dispatch_sema4_timedwait` DISPATCH_NOINLINE static intptr_t _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); switch (timeout) { default: if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) { break; } // Try to undo what the fast path did to dsema->dsema_value DISPATCH_FALLTHROUGH; case DISPATCH_TIME_NOW: orig = dsema->dsema_value; while (orig < 0) { if (os_atomic_cmpxchgvw2o(dsema, dsema_value, orig, orig + 1, &orig, relaxed)) { return _DSEMA4_TIMEOUT(); } } // Another thread called semaphore_signal(). Drain the wakeup. DISPATCH_FALLTHROUGH; case DISPATCH_TIME_FOREVER: _dispatch_sema4_wait(&dsema->dsema_sema); break; } return 0; } // 查看 _dispatch_sema4_timedwait 調用mach的內核函數semaphore_timedwait等待收到信號直至超時 bool _dispatch_sema4_timedwait(_dispatch_sema4_t *sema, dispatch_time_t timeout) { mach_timespec_t _timeout; kern_return_t kr; do { uint64_t nsec = _dispatch_timeout(timeout); _timeout.tv_sec = (__typeof__(_timeout.tv_sec))(nsec / NSEC_PER_SEC); _timeout.tv_nsec = (__typeof__(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); kr = semaphore_timedwait(*sema, _timeout); } while (unlikely(kr == KERN_ABORTED)); if (kr == KERN_OPERATION_TIMED_OUT) { return true; } DISPATCH_SEMAPHORE_VERIFY_KR(kr); return false; } // 最後看一下 dispatch_semaphore_signal,首先信號量+1,如果信號量大於0就什麼也不做(通常到了這裏dispatch_semaphore_wait還沒調用),否則執行 _dispatch_semaphore_signal_slow intptr_t dispatch_semaphore_signal(dispatch_semaphore_t dsema) { long value = os_atomic_inc2o(dsema, dsema_value, release); if (likely(value > 0)) { return 0; } if (unlikely(value == LONG_MIN)) { DISPATCH_CLIENT_CRASH(value, "Unbalanced call to dispatch_semaphore_signal()"); } return _dispatch_semaphore_signal_slow(dsema); } // 查看 _dispatch_semaphore_signal_slow ,調用內核`semaphore_signal`喚醒線程,如apple api描述“如果喚醒線程則返回非0,否則返回0” DISPATCH_NOINLINE intptr_t _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) { _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); _dispatch_sema4_signal(&dsema->dsema_sema, 1); return 1; } // 查看 _dispatch_sema4_signal 源碼 void _dispatch_sema4_signal(_dispatch_sema4_t *sema, long count) { do { kern_return_t kr = semaphore_signal(*sema); DISPATCH_SEMAPHORE_VERIFY_KR(kr); } while (--count); }
信號量是一個比較重要的內容,合理使用可以讓你的程序更加的優雅,比如說一個常見的情況:大家知道 PHImageManager.requestImage
是一個釋放消耗內存的方法,有時我們需要批量獲取到圖片執行一些操作的話可能就沒辦法直接for循環,不然內存會很快爆掉,因爲每個requestImage操作都需要佔用大量內存,即使外部嵌套autoreleasepool也不一定可以及時釋放(想想for執行的速度,釋放肯定來不及),那麼requestImage又是一個異步操作,如此只能讓一個操作執行完再執行另一個循環操作才能解決。也就是說這個問題就變成 for循環內部的異步操作串行執行的問題
。要解決這個問題有幾種思路:1.使用requestImage的同步請求照片 2.使用遞歸操作一個操作執行完再執行另外一個操作移除for操作 3.使用信號量解決。當然第一個方法並非普適,有些異步操作並不能輕易改成同步操作,第二個方法相對普適,但是遞歸調用本身因爲要改變原來的代碼結構看起來不是那麼優雅,自然當前討論的信號量是更好的方式。我們假設requestImage是一個bar(callback:((_ image)->
Void))操作,整個請求是一個foo(callback:((_ images)->Void))那麼它的實現方式如下:
- (void)foo:(CallbackWithImages)callback { dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); dispatch_async(globalQueue, ^{ NSMutableArray *array = [[NSMutableArray alloc] init]; for (int i=0; i<100; ++i) { [self bar:^(UIImage *image){ [array addObject:image]; dispatch_semaphore_signal(semaphore); }]; dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); } dispatch_async(dispatch_get_main_queue(), ^{ callback([array copy]); }); }); } - (void)bar:(CallbackWithImage)callback { dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_async(globalQueue, ^{ callback([UIImage new]); }); }
信號量常見crash
可以看到信號量在做線程同步時簡單易用,不過有時候不經意間容易出錯,比如下面的代碼會出現 EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0)
錯誤,原因是之前的信號量還在使用:
dispatch_semaphore_t semaphore = dispatch_semaphore_create(1); dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, 1000*NSEC_PER_SEC)); // semaphore = dispatch_semaphore_create(0);
爲什麼會這樣呢?原因和上面 dispatch_semaphore_create
中的 DISPATCH_VTABLE(semaphore)
有關係,這個宏我們上面分析過,最終展開就是 OS_dispatch_semaphore_class
實例的引用,那麼它的實例是什麼呢?它當然是通過 _dispatch_object_alloc
創建的,沿着查找 _dispatch_object_alloc
的源碼可以找到下面的代碼:
static inline id _os_objc_alloc(Class cls, size_t size) { id obj; size -= sizeof(((struct _os_object_s *)NULL)->os_obj_isa); while (unlikely(!(obj = class_createInstance(cls, size)))) { _dispatch_temporary_resource_shortage(); } return obj; }
不難看出就是依靠 class_createInstance
創建一個 OS_dispatch_semaphore_class
實例,這個代碼在libdispatch是找不到的,它在runtime源碼中。不過在這裏可以找到它的實例的定義(其實類似的通過vtable結構創建的實例都包含在libdispatch的init.c中):
DISPATCH_VTABLE_INSTANCE(semaphore, .do_type = DISPATCH_SEMAPHORE_TYPE, .do_dispose = _dispatch_semaphore_dispose, .do_debug = _dispatch_semaphore_debug, .do_invoke = _dispatch_object_no_invoke, );
不難看出這個對象是包含一個dispose方法的,就是 _dispatch_semaphore_dispose
,我們可以看到它的源碼,其實這裏對我們排查問題最重要的就是if條件語句,信號量的當前值小於初始化,會發生閃退,因爲信號量已經被釋放了,如果此時沒有crash其實就會意味着一直有線程在信號量等待:
void _dispatch_semaphore_dispose(dispatch_object_t dou, DISPATCH_UNUSED bool *allow_free) { dispatch_semaphore_t dsema = dou._dsema; if (dsema->dsema_value < dsema->dsema_orig) { DISPATCH_CLIENT_CRASH(dsema->dsema_orig - dsema->dsema_value, "Semaphore object deallocated while in use"); } _dispatch_sema4_dispose(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO); }
dispatch_group
dispatch_group
常常用來同步多個任務(注意和 dispatch_barrier_sync
不同的是它可以是多個隊列的同步),所以其實上面先分析 dispatch_semaphore
也是這個原因,它本身是依靠信號量來完成的同步管理。典型的用法如下:
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_group_t group = dispatch_group_create(); dispatch_group_async(group, globalQueue, ^{ sleep(10); NSLog(@"任務1完成"); }); dispatch_group_async(group, globalQueue, ^{ NSLog(@"任務2完成"); }); dispatch_group_notify(group, globalQueue, ^{ NSLog(@"兩個任務全部完成"); }); dispatch_async(globalQueue, ^{ // 等待5s超時後繼續執行,此時dispatch_group中的任務未必全部完成,注意:dispatch_group_wait是同步操作必須放到異步隊列否則阻塞當前線程 dispatch_group_wait(group, dispatch_time(DISPATCH_TIME_NOW, 5*NSEC_PER_SEC)); NSLog(@"等待到了上限,開始執行。。。"); });
下面看一下 dispatch_group
相關的源碼:
// 和其他對象一樣,dispatch_group_t的本質就是 dispatch_group_s指針,這裏重點關注一下dg_state和dg_bits是一個計數器 struct dispatch_group_s { DISPATCH_OBJECT_HEADER(group); DISPATCH_UNION_LE(uint64_t volatile dg_state, uint32_t dg_bits, uint32_t dg_gen ) DISPATCH_ATOMIC64_ALIGN; struct dispatch_continuation_s *volatile dg_notify_head; struct dispatch_continuation_s *volatile dg_notify_tail; }; // 查看 dispatch_group_create dispatch_group_t dispatch_group_create(void) { return _dispatch_group_create_with_count(0); } // 展開 _dispatch_group_create_with_count,其實就是一個dispatch_group_s對象,指定了do_targetq是默認隊列並且不支持過載 DISPATCH_ALWAYS_INLINE static inline dispatch_group_t _dispatch_group_create_with_count(uint32_t n) { dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group), sizeof(struct dispatch_group_s)); dg->do_next = DISPATCH_OBJECT_LISTLESS; dg->do_targetq = _dispatch_get_default_queue(false); if (n) { os_atomic_store2o(dg, dg_bits, (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed); os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411> } return dg; } // 首先看一下 dispatch_group_enter,它的核心就是os_atomic_sub_orig2o對dg_bits進行-1操作 void dispatch_group_enter(dispatch_group_t dg) { // The value is decremented on a 32bits wide atomic so that the carry // for the 0 -> -1 transition is not propagated to the upper 32bits. uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits, DISPATCH_GROUP_VALUE_INTERVAL, acquire); uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK; if (unlikely(old_value == 0)) { _dispatch_retain(dg); // <rdar://problem/22318411> } if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) { DISPATCH_CLIENT_CRASH(old_bits, "Too many nested calls to dispatch_group_enter()"); } } // 然後看一下 dispatch_group_leave,核心就是os_atomic_add_orig2o執行dg_state+1操作,如果+1之後還等於0那麼說明之前沒有調用`dispatch_group_enter`,就裏會crash,當然這裏核心在 `_dispatch_group_wake` void dispatch_group_leave(dispatch_group_t dg) { // The value is incremented on a 64bits wide atomic so that the carry for // the -1 -> 0 transition increments the generation atomically. uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state, DISPATCH_GROUP_VALUE_INTERVAL, release); uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK); if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) { old_state += DISPATCH_GROUP_VALUE_INTERVAL; do { new_state = old_state; if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) { new_state &= ~DISPATCH_GROUP_HAS_WAITERS; new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } else { // If the group was entered again since the atomic_add above, // we can't clear the waiters bit anymore as we don't know for // which generation the waiters are for new_state &= ~DISPATCH_GROUP_HAS_NOTIFS; } if (old_state == new_state) break; } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state, old_state, new_state, &old_state, relaxed))); return _dispatch_group_wake(dg, old_state, true); } if (unlikely(old_value == 0)) { DISPATCH_CLIENT_CRASH((uintptr_t)old_value, "Unbalanced call to dispatch_group_leave()"); } } // 查看 _dispatch_group_wake源碼,到了這裏通常就是出了調度組,如果有notify等待則執行notify遍歷並且在對應隊列中執行,如果有wait任務則喚醒其執行任務(注意這裏比較牛叉的`_dispatch_wake_by_address`可以根據地址進行函數調用,本身是調用的WakeByAddressAll這個系統調用) DISPATCH_NOINLINE static void _dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release) { uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411> if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) { dispatch_continuation_t dc, next_dc, tail; // Snapshot before anything is notified/woken <rdar://problem/8554546> dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail); do { dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data; next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next); _dispatch_continuation_async(dsn_queue, dc, _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags); _dispatch_release(dsn_queue); } while ((dc = next_dc)); refs++; } if (dg_state & DISPATCH_GROUP_HAS_WAITERS) { _dispatch_wake_by_address(&dg->dg_gen); } if (refs) _dispatch_release_n(dg, refs); } // 查看 dispatch_group_async源碼,dispatch_continuation_t僅僅是封裝任務,核心是_dispatch_continuation_group_async void dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dc = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC; dispatch_qos_t qos; qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags); _dispatch_continuation_group_async(dg, dq, dc, qos); } // 展開 _dispatch_continuation_group_async,這裏重點記住以下 dispatch_group_enter()方法,至於_dispatch_continuation_async前面已經介紹過 DISPATCH_ALWAYS_INLINE static inline void _dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dc, dispatch_qos_t qos) { dispatch_group_enter(dg); dc->dc_data = dg; _dispatch_continuation_async(dq, dc, qos, dc->dc_flags); } // 繼續查看 dispatch_group_notify,注意這裏將`dispatch_block_t`存儲到了共享數據 dispatch_continuation_t中 void dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_block_t db) { dispatch_continuation_t dsn = _dispatch_continuation_alloc(); _dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME); _dispatch_group_notify(dg, dq, dsn); } // 展開 _dispatch_group_notify,其實最主要的方法就是`_dispatch_group_wake` DISPATCH_ALWAYS_INLINE static inline void _dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq, dispatch_continuation_t dsn) { uint64_t old_state, new_state; dispatch_continuation_t prev; dsn->dc_data = dq; _dispatch_retain(dq); prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next); if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg); os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next); if (os_mpsc_push_was_empty(prev)) { os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, { new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS; if ((uint32_t)old_state == 0) { os_atomic_rmw_loop_give_up({ return _dispatch_group_wake(dg, new_state, false); }); } }); } }
簡單的說就是 dispatch_group_async
和 dispatch_group_notify
本身就是和 dispatch_group_enter
、 dispatch_group_leave
沒有本質區別,後者相對更加靈活。當然這裏還有一個重要的操作就是 dispatch_group_wait
,還沒有看:
// os_atomic_rmw_loop2o不斷遍歷,如果(old_state & DISPATCH_GROUP_VALUE_MASK) == 0表示執行完,直接返回0,如果當前如果超時立即返回,其他情況調用_dispatch_group_wait_slow intptr_t dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout) { uint64_t old_state, new_state; os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, { if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) { os_atomic_rmw_loop_give_up_with_fence(acquire, return 0); } if (unlikely(timeout == 0)) { os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT()); } new_state = old_state | DISPATCH_GROUP_HAS_WAITERS; if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) { os_atomic_rmw_loop_give_up(break); } }); return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout); } // 查看 _dispatch_group_wait_slow,最終調用 _dispatch_wait_on_address 直至 __ulock_wait DISPATCH_NOINLINE static intptr_t _dispatch_group_wait_slow(dispatch_group_t dg, uint32_t gen, dispatch_time_t timeout) { for (;;) { int rc = _dispatch_wait_on_address(&dg->dg_gen, gen, timeout, 0); if (likely(gen != os_atomic_load2o(dg, dg_gen, acquire))) { return 0; } if (rc == ETIMEDOUT) { return _DSEMA4_TIMEOUT(); } } }
多個異步操作同步
上面第一個 dispatch_group
例子介紹的情況很簡單,任務本身都是同步的,只是將一個同步任務放到了 dispatch_group_async
中,現實中這個操作可能是一個網絡請求,你現在想讓10個請求都完成後再執行某個操作怎麼辦(網絡請求假設方法是request(url:String,complete:Callback))?你現在不可能在網絡請求方法內部做出修改了,怎麼保證操作同步呢?
之前看到過這種操作:
- (void)foo { dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_group_t group = dispatch_group_create(); dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); for (int i=0; i<5; ++i) { dispatch_group_async(group, globalQueue, ^{ [self bar:^{ printf("task\n"); dispatch_semaphore_signal(semaphore); }]; }); } dispatch_group_notify(group, globalQueue, ^{ for (int i=0; i<5; ++i) { dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); } NSLog(@"complete"); }); } - (void)bar:(Callback)complete { dispatch_queue_t queue = dispatch_queue_create(DISPATCH_QUEUE_PRIORITY_DEFAULT, nil); dispatch_async(queue, ^{ sleep(arc4random_uniform(5)); complete(); }); }
其實這種方法基本沒有用 dispatch_group
,直接用信號量就可以解決,有了上面的分析使用 dispatch_enter
和 dispatch_leave
就可以了。
- (void)foo { dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_group_t group = dispatch_group_create(); for (int i=0; i<5; ++i) { dispatch_group_enter(group); [self bar:^{ printf("task\n"); dispatch_group_leave(group); }]; } dispatch_group_notify(group, globalQueue, ^{ NSLog(@"complete"); }); }
dispatch_apply
dispatch_apply設計的主要目的是提高並行能力(注意不是併發,等同於Swift中的DispatchQueue.concurrentPerform),所以一般我們用來並行執行多個結構類似的任務,比如:
void dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t)) { dispatch_apply_f(iterations, dq, work, (dispatch_apply_function_t)_dispatch_Block_invoke(work)); } // 查看 dispatch_apply_f,對於width=1或者單核心數cpu其實這個是一個同步調用,核心方法就是_dispatch_apply_f DISPATCH_NOINLINE void dispatch_apply_f(size_t iterations, dispatch_queue_t _dq, void *ctxt, void (*func)(void *, size_t)) { if (unlikely(iterations == 0)) { return; } dispatch_thread_context_t dtctxt = _dispatch_thread_context_find(_dispatch_apply_key); size_t nested = dtctxt ? dtctxt->dtc_apply_nesting : 0; dispatch_queue_t old_dq = _dispatch_queue_get_current(); dispatch_queue_t dq; if (likely(_dq == DISPATCH_APPLY_AUTO)) { dq = _dispatch_apply_root_queue(old_dq)->_as_dq; } else { dq = _dq; // silence clang Nullability complaints } dispatch_qos_t qos = _dispatch_priority_qos(dq->dq_priority) ?: _dispatch_priority_fallback_qos(dq->dq_priority); if (unlikely(dq->do_targetq)) { // if the queue passed-in is not a root queue, use the current QoS // since the caller participates in the work anyway qos = _dispatch_qos_from_pp(_dispatch_get_priority()); } int32_t thr_cnt = (int32_t)_dispatch_qos_max_parallelism(qos, DISPATCH_MAX_PARALLELISM_ACTIVE); if (likely(!nested)) { nested = iterations; } else { thr_cnt = nested < (size_t)thr_cnt ? thr_cnt / (int32_t)nested : 1; nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX ? nested * iterations : DISPATCH_APPLY_MAX; } if (iterations < (size_t)thr_cnt) { thr_cnt = (int32_t)iterations; } struct dispatch_continuation_s dc = { .dc_func = (void*)func, .dc_ctxt = ctxt, .dc_data = dq, }; dispatch_apply_t da = (__typeof__(da))_dispatch_continuation_alloc(); da->da_index = 0; da->da_todo = iterations; da->da_iterations = iterations; da->da_nested = nested; da->da_thr_cnt = thr_cnt; #if DISPATCH_INTROSPECTION da->da_dc = _dispatch_continuation_alloc(); *da->da_dc = dc; da->da_dc->dc_flags = DC_FLAG_ALLOCATED; #else da->da_dc = &dc; #endif da->da_flags = 0; if (unlikely(dq->dq_width == 1 || thr_cnt <= 1)) { return dispatch_sync_f(dq, da, _dispatch_apply_serial); } if (unlikely(dq->do_targetq)) { if (unlikely(dq == old_dq)) { return dispatch_sync_f(dq, da, _dispatch_apply_serial); } else { return dispatch_sync_f(dq, da, _dispatch_apply_redirect); } } dispatch_thread_frame_s dtf; _dispatch_thread_frame_push(&dtf, dq); _dispatch_apply_f(upcast(dq)._dgq, da, _dispatch_apply_invoke); _dispatch_thread_frame_pop(&dtf); } // 查看 _dispatch_apply_f DISPATCH_ALWAYS_INLINE static inline void _dispatch_apply_f(dispatch_queue_global_t dq, dispatch_apply_t da, dispatch_function_t func) { int32_t i = 0; dispatch_continuation_t head = NULL, tail = NULL; pthread_priority_t pp = _dispatch_get_priority(); // The current thread does not need a continuation int32_t continuation_cnt = da->da_thr_cnt - 1; dispatch_assert(continuation_cnt); for (i = 0; i < continuation_cnt; i++) { dispatch_continuation_t next = _dispatch_continuation_alloc(); uintptr_t dc_flags = DC_FLAG_CONSUME; _dispatch_continuation_init_f(next, dq, da, func, DISPATCH_BLOCK_HAS_PRIORITY, dc_flags); next->dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG; next->do_next = head; head = next; if (!tail) { tail = next; } } _dispatch_thread_event_init(&da->da_event); // FIXME: dq may not be the right queue for the priority of `head` _dispatch_trace_item_push_list(dq, head, tail); _dispatch_root_queue_push_inline(dq, head, tail, continuation_cnt); // Call the first element directly _dispatch_apply_invoke_and_wait(da); } // 展開 _dispatch_apply_invoke_and_wait DISPATCH_NOINLINE static void _dispatch_apply_invoke_and_wait(void *ctxt) { _dispatch_apply_invoke2(ctxt, DISPATCH_APPLY_INVOKE_WAIT); _dispatch_perfmon_workitem_inc(); } // 查看 _dispatch_apply_invoke2,主要是循環使用_dispatch_perfmon_workitem_inc調用任務,同時在最後一個任務調用完恢復線程 _dispatch_thread_event_signal DISPATCH_ALWAYS_INLINE static inline void _dispatch_apply_invoke2(dispatch_apply_t da, long invoke_flags) { size_t const iter = da->da_iterations; size_t idx, done = 0; idx = os_atomic_inc_orig2o(da, da_index, acquire); if (unlikely(idx >= iter)) goto out; // da_dc is only safe to access once the 'index lock' has been acquired dispatch_apply_function_t const func = (void *)da->da_dc->dc_func; void *const da_ctxt = da->da_dc->dc_ctxt; _dispatch_perfmon_workitem_dec(); // this unit executes many items // Handle nested dispatch_apply rdar://problem/9294578 dispatch_thread_context_s apply_ctxt = { .dtc_key = _dispatch_apply_key, .dtc_apply_nesting = da->da_nested, }; _dispatch_thread_context_push(&apply_ctxt); dispatch_thread_frame_s dtf; dispatch_priority_t old_dbp = 0; if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) { dispatch_queue_t dq = da->da_dc->dc_data; _dispatch_thread_frame_push(&dtf, dq); old_dbp = _dispatch_set_basepri(dq->dq_priority); } dispatch_invoke_flags_t flags = da->da_flags; // Striding is the responsibility of the caller. do { dispatch_invoke_with_autoreleasepool(flags, { _dispatch_client_callout2(da_ctxt, idx, func); _dispatch_perfmon_workitem_inc(); done++; idx = os_atomic_inc_orig2o(da, da_index, relaxed); }); } while (likely(idx < iter)); if (invoke_flags & DISPATCH_APPLY_INVOKE_REDIRECT) { _dispatch_reset_basepri(old_dbp); _dispatch_thread_frame_pop(&dtf); } _dispatch_thread_context_pop(&apply_ctxt); // The thread that finished the last workitem wakes up the possibly waiting // thread that called dispatch_apply. They could be one and the same. if (!os_atomic_sub2o(da, da_todo, done, release)) { _dispatch_thread_event_signal(&da->da_event); } out: if (invoke_flags & DISPATCH_APPLY_INVOKE_WAIT) { _dispatch_thread_event_wait(&da->da_event); _dispatch_thread_event_destroy(&da->da_event); } if (os_atomic_dec2o(da, da_thr_cnt, release) == 0) { #if DISPATCH_INTROSPECTION _dispatch_continuation_free(da->da_dc); #endif _dispatch_continuation_free((dispatch_continuation_t)da); } }
線程和鎖
在GCD中其實總共有兩個線程池進行線程管理,一個是主線程池,另一個是除了主線程池之外的線程池。主線程池由序列爲1的主隊列管理,使用objc.io上的一幅圖表示如下:
大家都知道使用dispatch_sync很有可能會發生死鎖那麼這是爲什麼呢?
不妨回顧一下dispatch_sync的過程:
dispatch_sync _dispatch_sync_f:區分併發還是串行隊列,如果是串行隊列 _dispatch_barrier_sync_f:是不是同一個隊列,如果是 _dispatch_sync_f_slow
重點在 _dq_state_drain_locked_by(dq_state, dsc->dsc_waiter)
這個條件,成立則會發生死鎖,那麼它成立的條件就是 ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0
首先lock_value和tid進行異或操作,相同爲0不同爲1,然後和DLOCK_OWNER_MASK(0xfffffffc)進行按位與操作,一個爲0則是0,所以若干lock_value和tid相同則會發生死鎖。
補充
關於上面源碼中的幾個宏定義
__builtin_expect
__builtin_expect是一個針對編譯器優化的內置函數,讓編譯更加優化。比如說我們會寫這種代碼:
if a { print(a) } else { print(b) }
如果我們更加傾向於使用a那麼可將其設爲默認值,極特殊情況下才會使用b條件。CPU讀取指定是多條一起加載的,可能先加載進來的是a,那麼如果遇到執行b的情況則再加載b,那麼對於條件a的情況就造成了性能浪費。long __builtin_expect (long EXP, long C) 第一個參數是要預測變量,第二個參數是預測值,這樣__builtin_expect(a,false)說明多數情況a應該是false,極少數情況可能是true,這樣不至於造成性能浪費。其實對於編譯器在彙編時會優化成 if !a
的形式:
if !a { print(b) } else { print(a) }
likely和unlikely
#define likely(x) __builtin_expect(!!(x), 1) #define unlikely(x) __builtin_expect(!!(x), 0)
看了likely和unlikely可以瞭解,likely表示更大可能成立,unlikely表示更大可能不成立。likely就是 if(likely(x == 0)) 就是if (x==0)。
os_atomic_cmpxchg
第二個參數與第一個參數值比較,如果相等,第三個參數的值替換第一個參數的值。如果不相等,把第一個參數的值賦值到第二個參數上。
#define os_atomic_cmpxchg(p, e, v, m) \ ({ _os_atomic_basetypeof(p) _r = (e); \ atomic_compare_exchange_strong_explicit(_os_atomic_c11_atomic(p), \ &_r, v, memory_order_##m, memory_order_relaxed); })
os_atomic_store2o
將第二個參數保存到第一個參數中
#define os_atomic_store2o(p, f, v, m) \ os_atomic_store(&(p)->f, (v), m) #define os_atomic_store(p, v, m) \ atomic_store_explicit(_os_atomic_c11_atomic(p), v, memory_order_##m)
os_atomic_inc_orig
第一個參數賦值爲1
#define os_atomic_inc_orig(p, m) \ os_atomic_add_orig((p), 1, m) #define os_atomic_add_orig(p, v, m) \ _os_atomic_c11_op_orig((p), (v), m, add, +) #define _os_atomic_c11_op_orig(p, v, m, o, op) \ atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), v, \ memory_order_##m)
os_atomic_inc_orig2o
第二個參數加1並返回
#define os_atomic_inc_orig2o(p, f, m) \ os_atomic_add_orig2o(p, f, 1, m) #define os_atomic_add_orig2o(p, f, v, m) \ os_atomic_add_orig(&(p)->f, (v), m)
os_atomic_dec2o
第二個參數-1並返回
#define os_atomic_dec2o(p, f, m) \ os_atomic_sub2o(p, f, 1, m) #define os_atomic_sub2o(p, f, v, m) \ os_atomic_sub(&(p)->f, (v), m)
串行、併發、並行
dispatch_barrier_async
dispatch_apply()