摘要:# Horovod: (optional) compression algorithm. compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none # Restore from a previous checkpoint, if initial_epoch is specified. # Horovod: restore on the first worker which will broadcast both model and optimizer weights # to other workers. if resume_from_epoch > 0 and hvd.rank() == 0: model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch), compression=compression) else: # ResNet-50 model that is included with Keras is optimized for inference. # Add L2 weight decay & adjust BN settings. model_config = model.get_config() for layer, layer_config in zip(model.layers, model_config['layers']): if hasattr(layer, 'kernel_regularizer'): regularizer = keras.regularizers.l2(args.wd) layer_config['config']['kernel_regularizer'] = \ {'class_name': regularizer.__class__.__name__, 'config': regularizer.get_config()} if type(layer) == keras.layers.BatchNormalization: layer_config['config']['momentum'] = 0.9 layer_config['config']['epsilon'] = 1e-5 model = keras.models.Model.from_config(model_config) # Horovod: adjust learning rate based on number of GPUs. opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(), momentum=args.momentum) # Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt, compression=compression) model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=['accuracy', 'top_k_categorical_accuracy'])。使用 horovod 有一定的侵入性,代碼需要一定的修改才能變成適配分佈式訓練,但是有一個好處就是適配的成本不高,並且 horovod 提供的各種框架的支持可以讓 horovod 比較好的在各個框架的基礎上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的實現也有很多,比如 OpenMPI 還有 Nvidia 的 NCCL,還有 facebook 的 gloo,他們都實現了一種並行計算的通信和計算方式。

Horovod 是一個兼容主流計算框架的分佈式機器學習訓練框架,主要基於的算法是 AllReduce,這個是 baidu-research 在17年做的一個實現,這個東西原來是高性能計算範疇裏的東西應用了 MPI 並行計算接口來實現,這是並行計算裏的一個框架,已經很老了, 這裏 有一個介紹 MPI 的 tutorial 寫的比較好。

在介紹 horovod 的之前需要解釋一下 AllReduce。在 MapReduce 裏面 reduce 被翻譯成了規約,在上面提到的 MPI tutorial 裏面的解釋是

Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5] . Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15 . Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120 .

就是說把一個大的集合“縮減”成了小的集合,這裏要注意的是這種縮減的計算是要滿足交換律的,也就是減法或者除法是不行的,因爲在並行計算當中不太好去控制計算的順序。Reduce 就是這個意思,具體到 MPI_Reduce 就是把不同節點的數字“縮減”到一個節點上,支持的計算方式有加法乘法和取大小值等。

教程中給出的 Reduce 是求和。

AllReduce 就是在每個節點都獲得 Reduce 的結果

基於這個標準就有很多的 All-Reduce 的實現,比如 Ring-Reduce,這個實現分兩部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在 這篇 post 裏提到的。這個算法的好處是可以擺脫之前 PS 非常依賴 Parameter-Server 的帶寬,Parameter-Server 的帶寬會成爲計算瓶頸的問題,而 AllReduce 可以讓每個節點在帶寬傳輸中的位置是對等的,並且減少傳輸次數。具體的算法可以看文章的解釋,scatter-reduce 就是讓每個節點有 K/N 的一個 reduce(也就是 sum),然後把自己的一個 K/N 的 reduce 再傳遞給其他節點,每個節點只和自己相鄰的節點通信。

In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is
Data Transferred=2(N−1)KN

數據傳輸量在 N 比較大的時候越沒有影響,這就消弭了多節點給 Parameter-Server 造成的瓶頸。

還有一些其他術語,假設有 4 臺 4 卡的 GPU 服務器。size 是工作進程(GPU)的數量(6),rank 是所有工作進程的 id(0-15),local rank 是當前服務器上的 id(0-3)。

Horovod 的介紹

使用 horovod 有一定的侵入性,代碼需要一定的修改才能變成適配分佈式訓練,但是有一個好處就是適配的成本不高,並且 horovod 提供的各種框架的支持可以讓 horovod 比較好的在各個框架的基礎上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的實現也有很多,比如 OpenMPI 還有 Nvidia 的 NCCL,還有 facebook 的 gloo,他們都實現了一種並行計算的通信和計算方式。而且 horovod 的本身的實現也很簡單。

使用

Keras 用 ResNet50 訓練 ImageNet 爲例,主要侵入了幾部分 hvd.init() 這個是 MPI 的初始化,讓並行進程能夠知道自己的 rank/local_rank 等信息。

第二部根據 local_rank(相當於單節點上的第n張卡),並且設置不佔用全部顯存,按需分配(可能因內沒有統一管理導致顯存碎片),然後傳遞給 keras 設置 session。

# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

然後在 rank 0 上恢復一個 checkpoint 並且廣播給其他節點,這裏的 broadcast 後面會介紹。

# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
    if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
        resume_from_epoch = try_epoch
        break

# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch')

# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0

設定傳輸的壓縮函數,具體的壓縮後面會提到,然後要麼從之前的模型恢復要麼重新訓練。關鍵的 wrapper 在 opt 上,會給本地的 opt 包裝一個 DistributedOptimizer

# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast both model and optimizer weights
# to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
    model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch),
                           compression=compression)
else:
    # ResNet-50 model that is included with Keras is optimized for inference.
    # Add L2 weight decay & adjust BN settings.
    model_config = model.get_config()
    for layer, layer_config in zip(model.layers, model_config['layers']):
        if hasattr(layer, 'kernel_regularizer'):
            regularizer = keras.regularizers.l2(args.wd)
            layer_config['config']['kernel_regularizer'] = \
                {'class_name': regularizer.__class__.__name__,
                 'config': regularizer.get_config()}
        if type(layer) == keras.layers.BatchNormalization:
            layer_config['config']['momentum'] = 0.9
            layer_config['config']['epsilon'] = 1e-5

    model = keras.models.Model.from_config(model_config)

    # Horovod: adjust learning rate based on number of GPUs.
    opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(),
                               momentum=args.momentum)

    # Horovod: add Horovod Distributed Optimizer.
    opt = hvd.DistributedOptimizer(opt, compression=compression)

    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=opt,
                  metrics=['accuracy', 'top_k_categorical_accuracy'])

然後設置一些回調函數, hvd.callbacks.BroadcastGlobalVariablesCallback(0) 保證的是 rank 0 上的所有參數只在 rank 0 初始化,然後廣播給其他節點,後面是學習率 decay 的設置和一些統計信息的回調打印。

callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),

    # Horovod: average metrics among workers at the end of every epoch.
    #
    # Note: This callback must be in the list before the ReduceLROnPlateau,
    # TensorBoard, or other metrics-based callbacks.
    hvd.callbacks.MetricAverageCallback(),

    # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
    # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
    # the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
    hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose),

    # Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
    hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.),
    hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1),
    hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2),
    hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3),
]

最後直接用 allreduce 計算一個 evaluation score。

# Evaluate the model on the full data set.
score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))

實現

適配層和壓縮算法

horovod 的實現主要分幾部分,第一部分是一個適配層,用於兼容各種框架,比如 tensorflow 的適配就是實現一個新的 Op,這個可以參考 add new op ,裏面規範了 Tensorflow 自定義算子的實現。

請注意,生成的函數將獲得一個蛇形名稱(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名爲 ZeroOut,則 Python 函數將稱爲 zero_out。

C++ 的定義是駝峯的,生成出來的 python 函數是下劃線小寫的,所以最後對應的是,適配Op的代碼在 horovod/tensorflow 目錄下面

C++ Python
HorovodAllgather horovod_allgather
HorovodAllreduce horovod_allreduce
HorovodBroadcast horovod_broadcast

另外在適配層可以加入一些壓縮算法(在 horovod/[framework]/compression.py ),我覺得壓縮算法和框架無關的,放到適配層下面可能有別的原因,比如 tensorflow 默認帶了一個 float16 壓縮,具體的其他壓縮算法比如 3LC ,可以通過有損壓縮或者無損壓縮提高帶寬利用率。

統一層

這一層的實現是統一的,所有的適配層最後都是發出一些 Op+Tensor 的 Message 到隊列中,後臺初始化的時候會有一個專門的線程專門消費這個隊列。他有一個同步消息的過程,相當於這個 tensor 在所有節點上都就緒以後就可以開始計算了,主體的流程是:

// The coordinator currently follows a master-worker paradigm. Rank zero acts
// as the master (the "coordinator"), whereas all other ranks are simply
// workers. Each rank runs its own background thread which progresses in ticks.
// In each tick, the following actions happen:
//
//      a) The workers send a Request to the coordinator, indicating what
//      they would like to do (which tensor they would like to gather and
//      reduce, as well as their shape and type). They repeat this for every
//      tensor that they would like to operate on.
//
//      b) The workers send an empty "DONE" message to the coordinator to
//      indicate that there are no more tensors they wish to operate on.
//
//      c) The coordinator receives the Requests from the workers, as well
//      as from its own TensorFlow ops, and stores them in a [request table]. The
//      coordinator continues to receive Request messages until it has
//      received MPI_SIZE number of empty "DONE" messages.
//
//      d) The coordinator finds all tensors that are ready to be reduced,
//      gathered, or all operations that result in an error. For each of those,
//      it sends a Response to all the workers. When no more Responses
//      are available, it sends a "DONE" response to the workers. If the process
//      is being shutdown, it instead sends a "SHUTDOWN" response.
//
//      e) The workers listen for Response messages, processing each one by
//      doing the required reduce or gather, until they receive a "DONE"
//      response from the coordinator. At that point, the tick ends.
//      If instead of "DONE" they receive "SHUTDOWN", they exit their background
//      loop.

簡單來講就是說 coordinator 集 size 個 request DONE,然後找出就緒的 tensor (在 message_table 裏面查找)構造出一個 read_to_reduce 的列表,然後發出 size 個 request 告知進程進行計算,然後 worker 接受到 response 開始真正的計算過程(通過 op_manager 具體執行)。

這是整體同步的過程,如果打開 horovod 的 trace log( HOROVOD_LOG_LEVEL=trace ) 就能看到同步的過程。horovod 的主要 Op 除了 AllReduce 之外還有 allgather 和 broadcast。

算子實現層

具體的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,這些由 op_manager 管理,他會根據優先級找到可以用來計算的 op 進行計算,比如 MPI 用的就是 MPI_Allreduce,具體 scatter-gather 和 all-gather openMPI 有現成的實現,NCCL 就直接調用 ncclAllReduce ,比較新的 nccl 也支持跨節點的 allreduce 了,不用自己再套一層。

除了 allreduce 之外,還有兩個比較重要的算子。

allgather 主要是比 allreduce 少一層 reduce,所有數據被髮送到所有進程就可以。allreduce 的第二步就是把每個進程的 scatter-reduce 的 reduce 結果發送到所有進程。

broadcast 的作用是一對多的廣播,主要是把初始化的參數同步給其他進程的時候使用。

相關文章