本文記錄下自己看 Scrapy 源碼的一點經驗,沒有涉及框架的方方面面,更多的是關注數據的流轉以及代碼的組織。如果你想深入框架的某個細節,那麼這篇文字可以給你一個切入點。

閱讀源碼常規的步驟

  1. 準備好必須的基礎知識
  2. 熟悉框架有哪些功能
  3. 看源碼,瞭解代碼是怎麼組織的,最後按需關注特定部分的實現細節

我們一步一步來。

目錄

1 Twisted

Scrapy 基於 Twisted,所以除了要有一定的 Python 基礎外,還要對 Twisted 有一些瞭解。

你可以將 Twisted 和 Asyncio 類比,它們都是爲了支持協程而誕生的,只是前者比後者出現的更早。這 2 個技術實現的核心都是事件循環,當程序執行到某個耗時的 IO 操作時,程序的執行權限會被退回給事件循環,事件循環會檢測其它準備就緒的協程,然後將執行權限交給它,當之前的協程 IO 操作完畢後,事件循環會將執行權限轉給它,繼續後面的操作。這樣,就能在單線程內實現併發,作用和多線程類似,只是比多線程更輕量。事件循環在 Asyncio 中被叫做 event_loop,在 Twisted 中叫做 reactor。我們看一些簡單的例子

1.1 Schedule

  • 多少秒後執行某個任務
from twisted.internet import reactor

def f(s):
    print('this will run 3.5 seconds after it was scheduled: %s' % s)

reactor.callLater(3.5, f, 'hello, world')

# f() will only be called if the event loop is started.
reactor.run()

reactor.callLater 會將某個任務加入到事件循環,並設置好多少秒後開始執行,當然要將事件循環啓動後纔會有作用。

  • 每隔多少秒執行某個任務
from twisted.internet import reactor, task

def f(s):
    print(s)

loop = task.LoopingCall(f, 'hello, world')
# Start looping every 1 second.
loop.start(1)

reactor.run()

每隔 1s 執行一次 f 方法。

1.2 Deferred

Deferred 表示某個任務未來會產生結果,當任務執行完畢後,會執行註冊在 Deferred 的回調函數,並將結果傳遞給它。

1.2.1 成功回調

from twisted.internet import reactor, defer

def get_dummpy_data(input_data):
    print('get_dummpy_data called')
    deferred = defer.Deferred()
    reactor.callLater(2, deferred.callback, input_data * 3)
    return deferred

def cb_print_data(result):
    print('Result received: {}'.format(result))

deferred = get_dummpy_data(3)
deferred.addCallback(cb_print_data)

# manually set up the end of the process by asking the reactor to
# stop itself in 4 seconds time
reactor.callLater(4, reactor.stop)

reactor.run()

get_dummpy_data ,返回了 1 個 defer. Deferred 對象,表示該函數是 1 個異步任務,會在未來某個時間點產生結果,這裏使用

reactor.callLater(2, deferred.callback, input_data * 3)

模仿異步結果,2s 後執行 deferred 的回調函數,並將計算後的結果傳遞給它。

cb_print_data ,簡單的打印結果。

看下執行流程,首先調用 get_dummpy_data 得到 deferred,然後註冊回調函數 cb_print_data,最後運行事件循環。爲了讓程序正常關閉,還設置了在 4s 後自動關閉事件循環。執行結果

get_dummpy_data called
Result received: 9

1.2.2 異常回調

上面我們使用 addCallback 註冊成功回調,除開這個,我們還能註冊異常回調,實現異常處理。

from twisted.internet import reactor, defer

def get_dummpy_data(input_data):
    print('get_dummpy_data called')
    deferred = defer.Deferred()
    if input_data % 2 == 0:
        reactor.callLater(2, deferred.callback, input_data * 3)
    else:
        reactor.callLater(2, deferred.errback, ValueError('You used an odd number!'))
    return deferred

def cb_print_data(result):
    print('Result received: {}'.format(result))

def eb_print_error(failure):
    print(failure)

deferred = get_dummpy_data(3)
deferred.addCallback(cb_print_data)
deferred.addErrback(eb_print_error)

reactor.callLater(4, reactor.stop)
reactor.run()

get_dummpy_data,當 input_data 爲奇數時,2s 後會觸發 deferred 的異常回調。

執行時,使用 addErrback 註冊異常回調。結果

get_dummpy_data called
[Failure instance: Traceback (failure with no frames): <class 'ValueError'>: You used an odd number!
]

1.2.3 回調鏈

看到這裏,有的讀者可能會想如果我註冊多個成功回調以及多個異常回調,成功或者異常時,具體的執行流程又是怎麼樣的呢?這就要引入回調鏈的概念了。看下圖

有 2 種類型的主鏈:成功以及異常回調鏈

  • 對於前者來說,每個回調的結果都是下個回調的輸入,比如 cb1 的結果會是 cb2 的輸入
  • 對於後者來說,某個回調返回異常或者拋出異常,就會將異常作爲下一個回調的輸入,比如 eb1 如果返回或者拋出異常,那麼 eb2 會被調用,並將 eb1 返回或者拋出的異常作爲參數

當然,還有其它情況

  • 如果 cb1 異常,那麼 eb2 會被調用,並將 cb1 中的異常作爲參數
  • 如果 eb1 處理了異常並返回值,那麼 cb2 會被調用,並將 eb1 的返回值作爲參數

也就是說, 當前級別的回調只根據上一級別的回調的執行情況進行觸發。

添加回調大概有下面幾種方式

  • addCallback、addErrback
  • addCallbacks
  • addBoth

這些方式有些細節上的不同,舉例說明下

第 1 個例子

d = getDeferredFromSomewhere()
d.addCallback(cb1)       # A
d.addErrback(eb1)         # B
d.addCallback(cb2)
d.addErrback(eb2)

對應圖

也即,對於每 1 次 addCallback(addErrback),都會在同一級上綁定 1 個成功回調(異常回調)和 1 個什麼都不做的異常回調(成功回調)。

第 2 個例子

d = getDeferredFromSomewhere()
d.addCallbacks(cb1, eb1)  # C
d.addCallbacks(cb2, eb2)

對應圖

也即,addCallbacks 會在同一級別上綁定 1 個成功回調和 1 個異常回調。

爲了加強我們的理解,可以想下這個問題,如果 cb1 出現異常,對於這 2 個例子的執行情況有什麼差別?結合圖其實很好理解,對於第 1 個例子,eb1 會被調用;對於第 2 個例子,eb2 會被調用。

至於 addBoth(cb),可以理解爲

d = getDeferredFromSomewhere()
d.addCallback(cb)
d.addErrback(cb)

1.2.4 DeferredList

當碰到需要等待多個 deferred 執行完畢的時候,我們可以使用 DeferredList

dl = defer.DeferredList([deferred1, deferred2, deferred3])

看個例子

from twisted.internet import defer

def print_result(result):
    for (success, value) in result:
        if success:
            print('Success:', value)
        else:
            print('Failure:', value.getErrorMessage())

deferred1 = defer.Deferred()
deferred2 = defer.Deferred()
deferred3 = defer.Deferred()

dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)

dl.addCallback(print_result)

deferred1.callback('one')
deferred2.errback(Exception('bang!'))
deferred3.callback('three')
print_result
print_result

看下執行結果

Success: one
Failure: bang!
Success: three

1.2.5 gatherResults

gatherResults 和 DeferredList 類似,也是等待多個 deferred,不同的是

  • 只要有 1 個 deferred 異常,就會觸發整體的異常
  • 返回的結果列表中的元素不是一個元祖,而是對應 deferred 的真正的結果。因爲只要有 1 個 deferred 發生異常就會觸發整體的異常回調,所以成功回調錶示所有的 deferred 都是執行成功了的,也就沒有必要使用布爾值表示是否成功執行了。
from twisted.internet import defer

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.gatherResults([d1, d2])

def print_result(result):
    print(result)

d.addBoth(print_result)

d1.callback('one')
d2.errback(Exception('bang!'))
print_result

看下執行結果

[Failure instance: Traceback (failure with no frames): <class 'twisted.internet.defer.FirstError'>: FirstError[#1, [Failure instance: Traceback (failure with no frames): <class 'Exception'>: bang!
]]
]
Unhandled error in Deferred:

Traceback (most recent call last):
Failure: builtins.Exception: bang!

如果 gatherResults 時,設置了 consumeErrors 爲 True,不會有 Unhandled error in Deferred: 及其之後的輸出。

如果將

d2.errback(Exception('bang!'))

改爲

d2.callback('two')

結果爲

['one', 'two']

1.3 defer.inlineCallbacks

在 Scrapy 源碼中,我們經常會看到類似這樣的語法

@defer.inlineCallbacks
def f():
    ...

這可以理解爲一個語法糖,表示在被這個裝飾器裝飾過的方法裏面,可以使用 yield 等待 deferred 的結果,我們可以改寫 1.2.1 中的例子爲

from twisted.internet import reactor, defer

def get_dummpy_data(input_data):
    print('get_dummpy_data called')
    deferred = defer.Deferred()
    reactor.callLater(2, deferred.callback, input_data * 3)
    return deferred

@defer.inlineCallbacks
def main():
    result = yield get_dummpy_data(3)
    print(result)

main()

reactor.callLater(4, reactor.stop)
reactor.run()

1.4 更多

如果只是想簡單的看懂 Scrapy 源碼的話,關於 Twisted,理解上面的一些概念就差不多了。雖說源碼裏面還是一些上面沒提及的方法,比如 maybeDeferred 等,不過有上面的一些基礎後,你通過查看官方文檔或者跳轉方法定義處讀讀,也能較快的理解其用處了。

如果想比較全面的掌握 Twisted,可以先看一本書《Twisted. Network. Programming》,這本書可以幫你更爲全面的理解 Twisted 的一些基本概念。讀完此書後,再根據需要去讀官方文檔的特定部分,就要輕鬆一些了。

2 框架有哪些功能

要知道一個框架有些什麼功能,可以看它的官方文檔。剛接觸 Scrapy 的同學一看文檔,可能會覺得功能有點多,有點雜。其實,我們可以看下它的數據流圖,就可以將其主要的功能給串起來了。

從圖上可以直觀的看出,Scrapy 的主要部件有 Engine、Spider、Scheduler、Downloader、Item Pipelines 以及中間件。

更詳細的功能,我們就可以通過主要部件去聯想。比如:

  • Spider 涉及 Request、Response、Selector、Extractor
  • Scheduler 涉及去重的 DupeFilter
  • Downloader 涉及併發量、下載延時等設置
  • Item Pipelines 涉及 Items、Item Loaders、Feed Exports
  • 至於中間件,在 Spider 和 Engine 中間的是 Spider Middleware,在 Engine 和 Downloader 中間的是 Downloader Middleware

除此之外,文檔上還列了一些內置服務以及解決特定問題的示例,比如:Stats Collection 可以用於統計;Telnet Console 可以查看運行時爬蟲的狀態、內存使用情況等;對於廣度優先的爬蟲怎麼設置;等等。

上面的聯想只是作爲一個參考,不同的人有不同的聯想方式。

這樣一來,我們就瞭解了框架的大部分功能了。 不過,我還是推薦在使用 Scrapy 一段時間後,碰到特定的問題後,再去看相應的源碼。一方面,那個時候,你會對框架的很多概念有更深的理解,看源碼的時候更容易理解作者的用心;另一方面,如果只是範範的看源碼,很難抓重點,很可能你看了一段時間後,感覺好像沒有從中學到什麼。

現在我們還是參考上面的圖,來仔細看下框架中的數據流

  1. Spider 就是我們常寫的那個 spider 文件,Engine 從 start_urls 或者 start_requests 中獲取初始請求
  2. Engine 將請求加入 Scheduler 並從中獲取下一個請求
  3. Scheduler 返回給 Engine 下一個請求
  4. Engine 將請求發送給 Downloader,這會調用 Downloader Middleware 中的 process_request
  5. 當 Downloader 下載完畢後會生成一個響應發送給 Engine,這會調用 Downloader Middleware 的 process_response
  6. Engine 收到 Downloader 生成的響應後會將其發送給 Spider,這會調用 Spider Middleware 的 process_spider_input
  7. 當 Spider 處理完響應後,會將生成的 Items 或者新的請求發送給 Engine,這會調用 Spider Middleware 的 process_spider_output
  8. 如果 Spider 發送給 Engine 的有 Items,Engine 會將 Item 發送給 Item Pipelines;如果有新的請求,會將其加入 Scheduler;如果 Scheduler 還有未處理的請求的話,Engine 會向其獲取下一個請求
  9. 重複步驟 1 直到 Scheduler 中的請求被處理完

到這裏,看源碼前的準備工作就差不多了。下一篇開始跟源碼,感興趣的夥伴可以關注下後面的文章。

相關文章