Scrapy 源碼閱讀(一):Twisted 基礎和 Scrapy 數據流
本文記錄下自己看 Scrapy 源碼的一點經驗,沒有涉及框架的方方面面,更多的是關注數據的流轉以及代碼的組織。如果你想深入框架的某個細節,那麼這篇文字可以給你一個切入點。
閱讀源碼常規的步驟
- 準備好必須的基礎知識
- 熟悉框架有哪些功能
- 看源碼,瞭解代碼是怎麼組織的,最後按需關注特定部分的實現細節
我們一步一步來。
目錄
1 Twisted
Scrapy 基於 Twisted,所以除了要有一定的 Python 基礎外,還要對 Twisted 有一些瞭解。
你可以將 Twisted 和 Asyncio 類比,它們都是爲了支持協程而誕生的,只是前者比後者出現的更早。這 2 個技術實現的核心都是事件循環,當程序執行到某個耗時的 IO 操作時,程序的執行權限會被退回給事件循環,事件循環會檢測其它準備就緒的協程,然後將執行權限交給它,當之前的協程 IO 操作完畢後,事件循環會將執行權限轉給它,繼續後面的操作。這樣,就能在單線程內實現併發,作用和多線程類似,只是比多線程更輕量。事件循環在 Asyncio 中被叫做 event_loop,在 Twisted 中叫做 reactor。我們看一些簡單的例子
1.1 Schedule
- 多少秒後執行某個任務
from twisted.internet import reactor def f(s): print('this will run 3.5 seconds after it was scheduled: %s' % s) reactor.callLater(3.5, f, 'hello, world') # f() will only be called if the event loop is started. reactor.run()
reactor.callLater 會將某個任務加入到事件循環,並設置好多少秒後開始執行,當然要將事件循環啓動後纔會有作用。
- 每隔多少秒執行某個任務
from twisted.internet import reactor, task def f(s): print(s) loop = task.LoopingCall(f, 'hello, world') # Start looping every 1 second. loop.start(1) reactor.run()
每隔 1s 執行一次 f 方法。
1.2 Deferred
Deferred 表示某個任務未來會產生結果,當任務執行完畢後,會執行註冊在 Deferred 的回調函數,並將結果傳遞給它。
1.2.1 成功回調
from twisted.internet import reactor, defer def get_dummpy_data(input_data): print('get_dummpy_data called') deferred = defer.Deferred() reactor.callLater(2, deferred.callback, input_data * 3) return deferred def cb_print_data(result): print('Result received: {}'.format(result)) deferred = get_dummpy_data(3) deferred.addCallback(cb_print_data) # manually set up the end of the process by asking the reactor to # stop itself in 4 seconds time reactor.callLater(4, reactor.stop) reactor.run()
get_dummpy_data
,返回了 1 個 defer. Deferred 對象,表示該函數是 1 個異步任務,會在未來某個時間點產生結果,這裏使用
reactor.callLater(2, deferred.callback, input_data * 3)
模仿異步結果,2s 後執行 deferred 的回調函數,並將計算後的結果傳遞給它。
cb_print_data
,簡單的打印結果。
看下執行流程,首先調用 get_dummpy_data
得到 deferred,然後註冊回調函數 cb_print_data,最後運行事件循環。爲了讓程序正常關閉,還設置了在 4s 後自動關閉事件循環。執行結果
get_dummpy_data called Result received: 9
1.2.2 異常回調
上面我們使用 addCallback 註冊成功回調,除開這個,我們還能註冊異常回調,實現異常處理。
from twisted.internet import reactor, defer def get_dummpy_data(input_data): print('get_dummpy_data called') deferred = defer.Deferred() if input_data % 2 == 0: reactor.callLater(2, deferred.callback, input_data * 3) else: reactor.callLater(2, deferred.errback, ValueError('You used an odd number!')) return deferred def cb_print_data(result): print('Result received: {}'.format(result)) def eb_print_error(failure): print(failure) deferred = get_dummpy_data(3) deferred.addCallback(cb_print_data) deferred.addErrback(eb_print_error) reactor.callLater(4, reactor.stop) reactor.run()
get_dummpy_data,當 input_data 爲奇數時,2s 後會觸發 deferred 的異常回調。
執行時,使用 addErrback 註冊異常回調。結果
get_dummpy_data called [Failure instance: Traceback (failure with no frames): <class 'ValueError'>: You used an odd number! ]
1.2.3 回調鏈
看到這裏,有的讀者可能會想如果我註冊多個成功回調以及多個異常回調,成功或者異常時,具體的執行流程又是怎麼樣的呢?這就要引入回調鏈的概念了。看下圖
有 2 種類型的主鏈:成功以及異常回調鏈
- 對於前者來說,每個回調的結果都是下個回調的輸入,比如 cb1 的結果會是 cb2 的輸入
- 對於後者來說,某個回調返回異常或者拋出異常,就會將異常作爲下一個回調的輸入,比如 eb1 如果返回或者拋出異常,那麼 eb2 會被調用,並將 eb1 返回或者拋出的異常作爲參數
當然,還有其它情況
- 如果 cb1 異常,那麼 eb2 會被調用,並將 cb1 中的異常作爲參數
- 如果 eb1 處理了異常並返回值,那麼 cb2 會被調用,並將 eb1 的返回值作爲參數
也就是說, 當前級別的回調只根據上一級別的回調的執行情況進行觸發。
添加回調大概有下面幾種方式
- addCallback、addErrback
- addCallbacks
- addBoth
這些方式有些細節上的不同,舉例說明下
第 1 個例子
d = getDeferredFromSomewhere() d.addCallback(cb1) # A d.addErrback(eb1) # B d.addCallback(cb2) d.addErrback(eb2)
對應圖
也即,對於每 1 次 addCallback(addErrback),都會在同一級上綁定 1 個成功回調(異常回調)和 1 個什麼都不做的異常回調(成功回調)。
第 2 個例子
d = getDeferredFromSomewhere() d.addCallbacks(cb1, eb1) # C d.addCallbacks(cb2, eb2)
對應圖
也即,addCallbacks 會在同一級別上綁定 1 個成功回調和 1 個異常回調。
爲了加強我們的理解,可以想下這個問題,如果 cb1 出現異常,對於這 2 個例子的執行情況有什麼差別?結合圖其實很好理解,對於第 1 個例子,eb1 會被調用;對於第 2 個例子,eb2 會被調用。
至於 addBoth(cb),可以理解爲
d = getDeferredFromSomewhere() d.addCallback(cb) d.addErrback(cb)
1.2.4 DeferredList
當碰到需要等待多個 deferred 執行完畢的時候,我們可以使用 DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3])
看個例子
from twisted.internet import defer def print_result(result): for (success, value) in result: if success: print('Success:', value) else: print('Failure:', value.getErrorMessage()) deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred3 = defer.Deferred() dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True) dl.addCallback(print_result) deferred1.callback('one') deferred2.errback(Exception('bang!')) deferred3.callback('three')
print_result print_result
看下執行結果
Success: one Failure: bang! Success: three
1.2.5 gatherResults
gatherResults 和 DeferredList 類似,也是等待多個 deferred,不同的是
- 只要有 1 個 deferred 異常,就會觸發整體的異常
- 返回的結果列表中的元素不是一個元祖,而是對應 deferred 的真正的結果。因爲只要有 1 個 deferred 發生異常就會觸發整體的異常回調,所以成功回調錶示所有的 deferred 都是執行成功了的,也就沒有必要使用布爾值表示是否成功執行了。
from twisted.internet import defer d1 = defer.Deferred() d2 = defer.Deferred() d = defer.gatherResults([d1, d2]) def print_result(result): print(result) d.addBoth(print_result) d1.callback('one') d2.errback(Exception('bang!'))
print_result
看下執行結果
[Failure instance: Traceback (failure with no frames): <class 'twisted.internet.defer.FirstError'>: FirstError[#1, [Failure instance: Traceback (failure with no frames): <class 'Exception'>: bang! ]] ] Unhandled error in Deferred: Traceback (most recent call last): Failure: builtins.Exception: bang!
如果 gatherResults 時,設置了 consumeErrors 爲 True,不會有 Unhandled error in Deferred:
及其之後的輸出。
如果將
d2.errback(Exception('bang!'))
改爲
d2.callback('two')
結果爲
['one', 'two']
1.3 defer.inlineCallbacks
在 Scrapy 源碼中,我們經常會看到類似這樣的語法
@defer.inlineCallbacks def f(): ...
這可以理解爲一個語法糖,表示在被這個裝飾器裝飾過的方法裏面,可以使用 yield 等待 deferred 的結果,我們可以改寫 1.2.1 中的例子爲
from twisted.internet import reactor, defer def get_dummpy_data(input_data): print('get_dummpy_data called') deferred = defer.Deferred() reactor.callLater(2, deferred.callback, input_data * 3) return deferred @defer.inlineCallbacks def main(): result = yield get_dummpy_data(3) print(result) main() reactor.callLater(4, reactor.stop) reactor.run()
1.4 更多
如果只是想簡單的看懂 Scrapy 源碼的話,關於 Twisted,理解上面的一些概念就差不多了。雖說源碼裏面還是一些上面沒提及的方法,比如 maybeDeferred 等,不過有上面的一些基礎後,你通過查看官方文檔或者跳轉方法定義處讀讀,也能較快的理解其用處了。
如果想比較全面的掌握 Twisted,可以先看一本書《Twisted. Network. Programming》,這本書可以幫你更爲全面的理解 Twisted 的一些基本概念。讀完此書後,再根據需要去讀官方文檔的特定部分,就要輕鬆一些了。
2 框架有哪些功能
要知道一個框架有些什麼功能,可以看它的官方文檔。剛接觸 Scrapy 的同學一看文檔,可能會覺得功能有點多,有點雜。其實,我們可以看下它的數據流圖,就可以將其主要的功能給串起來了。
從圖上可以直觀的看出,Scrapy 的主要部件有 Engine、Spider、Scheduler、Downloader、Item Pipelines 以及中間件。
更詳細的功能,我們就可以通過主要部件去聯想。比如:
- Spider 涉及 Request、Response、Selector、Extractor
- Scheduler 涉及去重的 DupeFilter
- Downloader 涉及併發量、下載延時等設置
- Item Pipelines 涉及 Items、Item Loaders、Feed Exports
- 至於中間件,在 Spider 和 Engine 中間的是 Spider Middleware,在 Engine 和 Downloader 中間的是 Downloader Middleware
除此之外,文檔上還列了一些內置服務以及解決特定問題的示例,比如:Stats Collection 可以用於統計;Telnet Console 可以查看運行時爬蟲的狀態、內存使用情況等;對於廣度優先的爬蟲怎麼設置;等等。
上面的聯想只是作爲一個參考,不同的人有不同的聯想方式。
這樣一來,我們就瞭解了框架的大部分功能了。 不過,我還是推薦在使用 Scrapy 一段時間後,碰到特定的問題後,再去看相應的源碼。一方面,那個時候,你會對框架的很多概念有更深的理解,看源碼的時候更容易理解作者的用心;另一方面,如果只是範範的看源碼,很難抓重點,很可能你看了一段時間後,感覺好像沒有從中學到什麼。
現在我們還是參考上面的圖,來仔細看下框架中的數據流
- Spider 就是我們常寫的那個 spider 文件,Engine 從
start_urls
或者start_requests
中獲取初始請求 - Engine 將請求加入 Scheduler 並從中獲取下一個請求
- Scheduler 返回給 Engine 下一個請求
- Engine 將請求發送給 Downloader,這會調用 Downloader Middleware 中的
process_request
- 當 Downloader 下載完畢後會生成一個響應發送給 Engine,這會調用 Downloader Middleware 的
process_response
- Engine 收到 Downloader 生成的響應後會將其發送給 Spider,這會調用 Spider Middleware 的
process_spider_input
- 當 Spider 處理完響應後,會將生成的 Items 或者新的請求發送給 Engine,這會調用 Spider Middleware 的
process_spider_output
- 如果 Spider 發送給 Engine 的有 Items,Engine 會將 Item 發送給 Item Pipelines;如果有新的請求,會將其加入 Scheduler;如果 Scheduler 還有未處理的請求的話,Engine 會向其獲取下一個請求
- 重複步驟 1 直到 Scheduler 中的請求被處理完
到這裏,看源碼前的準備工作就差不多了。下一篇開始跟源碼,感興趣的夥伴可以關注下後面的文章。