horovod 實現分析
摘要:# Horovod: (optional) compression algorithm. compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none # Restore from a previous checkpoint, if initial_epoch is specified. # Horovod: restore on the first worker which will broadcast both model and optimizer weights # to other workers. if resume_from_epoch > 0 and hvd.rank() == 0: model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch), compression=compression) else: # ResNet-50 model that is included with Keras is optimized for inference. # Add L2 weight decay & adjust BN settings. model_config = model.get_config() for layer, layer_config in zip(model.layers, model_config['layers']): if hasattr(layer, 'kernel_regularizer'): regularizer = keras.regularizers.l2(args.wd) layer_config['config']['kernel_regularizer'] = \ {'class_name': regularizer.__class__.__name__, 'config': regularizer.get_config()} if type(layer) == keras.layers.BatchNormalization: layer_config['config']['momentum'] = 0.9 layer_config['config']['epsilon'] = 1e-5 model = keras.models.Model.from_config(model_config) # Horovod: adjust learning rate based on number of GPUs. opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(), momentum=args.momentum) # Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt, compression=compression) model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=['accuracy', 'top_k_categorical_accuracy'])。使用 horovod 有一定的侵入性,代碼需要一定的修改才能變成適配分佈式訓練,但是有一個好處就是適配的成本不高,並且 horovod 提供的各種框架的支持可以讓 horovod 比較好的在各個框架的基礎上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的實現也有很多,比如 OpenMPI 還有 Nvidia 的 NCCL,還有 facebook 的 gloo,他們都實現了一種並行計算的通信和計算方式。
Horovod 是一個兼容主流計算框架的分佈式機器學習訓練框架,主要基於的算法是 AllReduce,這個是 baidu-research 在17年做的一個實現,這個東西原來是高性能計算範疇裏的東西應用了 MPI 並行計算接口來實現,這是並行計算裏的一個框架,已經很老了, 這裏 有一個介紹 MPI 的 tutorial 寫的比較好。
在介紹 horovod 的之前需要解釋一下 AllReduce。在 MapReduce 裏面 reduce 被翻譯成了規約,在上面提到的 MPI tutorial 裏面的解釋是
Reduce is a classic concept from functional programming. Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. For example, let’s say we have a list of numbers [1, 2, 3, 4, 5]
. Reducing this list of numbers with the sum function would produce sum([1, 2, 3, 4, 5]) = 15
. Similarly, the multiplication reduction would yield multiply([1, 2, 3, 4, 5]) = 120
.
就是說把一個大的集合“縮減”成了小的集合,這裏要注意的是這種縮減的計算是要滿足交換律的,也就是減法或者除法是不行的,因爲在並行計算當中不太好去控制計算的順序。Reduce 就是這個意思,具體到 MPI_Reduce 就是把不同節點的數字“縮減”到一個節點上,支持的計算方式有加法乘法和取大小值等。
教程中給出的 Reduce 是求和。
AllReduce 就是在每個節點都獲得 Reduce 的結果
基於這個標準就有很多的 All-Reduce 的實現,比如 Ring-Reduce,這個實現分兩部分,一部分是 Scatter-Reduce 另一部分是 All-Gather。最早是在 這篇 post 裏提到的。這個算法的好處是可以擺脫之前 PS 非常依賴 Parameter-Server 的帶寬,Parameter-Server 的帶寬會成爲計算瓶頸的問題,而 AllReduce 可以讓每個節點在帶寬傳輸中的位置是對等的,並且減少傳輸次數。具體的算法可以看文章的解釋,scatter-reduce 就是讓每個節點有 K/N 的一個 reduce(也就是 sum),然後把自己的一個 K/N 的 reduce 再傳遞給其他節點,每個節點只和自己相鄰的節點通信。
In the system we described, each of the N GPUs will send and receive values N-1 times for the scatter-reduce, and N-1 times for the allgather. Each time, the GPUs will send K / N values, where K is the total number of values in array being summed across the different GPUs. Therefore, the total amount of data transferred to and from every GPU is
Data Transferred=2(N−1)KN
數據傳輸量在 N 比較大的時候越沒有影響,這就消弭了多節點給 Parameter-Server 造成的瓶頸。
還有一些其他術語,假設有 4 臺 4 卡的 GPU 服務器。size 是工作進程(GPU)的數量(6),rank 是所有工作進程的 id(0-15),local rank 是當前服務器上的 id(0-3)。
Horovod 的介紹
使用 horovod 有一定的侵入性,代碼需要一定的修改才能變成適配分佈式訓練,但是有一個好處就是適配的成本不高,並且 horovod 提供的各種框架的支持可以讓 horovod 比較好的在各個框架的基礎上使用,他支持 tensorflow/keras/mxnet/pytorch,MPI 的實現也有很多,比如 OpenMPI 還有 Nvidia 的 NCCL,還有 facebook 的 gloo,他們都實現了一種並行計算的通信和計算方式。而且 horovod 的本身的實現也很簡單。
使用
以 Keras 用 ResNet50 訓練 ImageNet 爲例,主要侵入了幾部分 hvd.init()
這個是 MPI 的初始化,讓並行進程能夠知道自己的 rank/local_rank 等信息。
第二部根據 local_rank(相當於單節點上的第n張卡),並且設置不佔用全部顯存,按需分配(可能因內沒有統一管理導致顯存碎片),然後傳遞給 keras 設置 session。
# Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank()) K.set_session(tf.Session(config=config))
然後在 rank 0 上恢復一個 checkpoint 並且廣播給其他節點,這裏的 broadcast 後面會介紹。
# If set > 0, will resume training from a given checkpoint. resume_from_epoch = 0 for try_epoch in range(args.epochs, 0, -1): if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)): resume_from_epoch = try_epoch break # Horovod: broadcast resume_from_epoch from rank 0 (which will have # checkpoints) to other ranks. resume_from_epoch = hvd.broadcast(resume_from_epoch, 0, name='resume_from_epoch') # Horovod: print logs on the first worker. verbose = 1 if hvd.rank() == 0 else 0
設定傳輸的壓縮函數,具體的壓縮後面會提到,然後要麼從之前的模型恢復要麼重新訓練。關鍵的 wrapper 在 opt
上,會給本地的 opt
包裝一個 DistributedOptimizer
。
# Horovod: (optional) compression algorithm. compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none # Restore from a previous checkpoint, if initial_epoch is specified. # Horovod: restore on the first worker which will broadcast both model and optimizer weights # to other workers. if resume_from_epoch > 0 and hvd.rank() == 0: model = hvd.load_model(args.checkpoint_format.format(epoch=resume_from_epoch), compression=compression) else: # ResNet-50 model that is included with Keras is optimized for inference. # Add L2 weight decay & adjust BN settings. model_config = model.get_config() for layer, layer_config in zip(model.layers, model_config['layers']): if hasattr(layer, 'kernel_regularizer'): regularizer = keras.regularizers.l2(args.wd) layer_config['config']['kernel_regularizer'] = \ {'class_name': regularizer.__class__.__name__, 'config': regularizer.get_config()} if type(layer) == keras.layers.BatchNormalization: layer_config['config']['momentum'] = 0.9 layer_config['config']['epsilon'] = 1e-5 model = keras.models.Model.from_config(model_config) # Horovod: adjust learning rate based on number of GPUs. opt = keras.optimizers.SGD(lr=args.base_lr * hvd.size(), momentum=args.momentum) # Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt, compression=compression) model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=['accuracy', 'top_k_categorical_accuracy'])
然後設置一些回調函數, hvd.callbacks.BroadcastGlobalVariablesCallback(0)
保證的是 rank 0 上的所有參數只在 rank 0 初始化,然後廣播給其他節點,後面是學習率 decay 的設置和一些統計信息的回調打印。
callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), # Horovod: average metrics among workers at the end of every epoch. # # Note: This callback must be in the list before the ReduceLROnPlateau, # TensorBoard, or other metrics-based callbacks. hvd.callbacks.MetricAverageCallback(), # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during # the first five epochs. See https://arxiv.org/abs/1706.02677 for details. hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=args.warmup_epochs, verbose=verbose), # Horovod: after the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs. hvd.callbacks.LearningRateScheduleCallback(start_epoch=args.warmup_epochs, end_epoch=30, multiplier=1.), hvd.callbacks.LearningRateScheduleCallback(start_epoch=30, end_epoch=60, multiplier=1e-1), hvd.callbacks.LearningRateScheduleCallback(start_epoch=60, end_epoch=80, multiplier=1e-2), hvd.callbacks.LearningRateScheduleCallback(start_epoch=80, multiplier=1e-3), ]
最後直接用 allreduce 計算一個 evaluation score。
# Evaluate the model on the full data set. score = hvd.allreduce(model.evaluate_generator(input_fn(False, args.train_dir, args.val_batch_size),NUM_IMAGES['validation']))
實現
適配層和壓縮算法
horovod 的實現主要分幾部分,第一部分是一個適配層,用於兼容各種框架,比如 tensorflow 的適配就是實現一個新的 Op,這個可以參考 add new op ,裏面規範了 Tensorflow 自定義算子的實現。
請注意,生成的函數將獲得一個蛇形名稱(以符合 PEP8)。因此,如果您的操作在 C++ 文件中命名爲 ZeroOut,則 Python 函數將稱爲 zero_out。
C++ 的定義是駝峯的,生成出來的 python 函數是下劃線小寫的,所以最後對應的是,適配Op的代碼在 horovod/tensorflow 目錄下面
C++ | Python |
---|---|
HorovodAllgather | horovod_allgather |
HorovodAllreduce | horovod_allreduce |
HorovodBroadcast | horovod_broadcast |
另外在適配層可以加入一些壓縮算法(在 horovod/[framework]/compression.py
),我覺得壓縮算法和框架無關的,放到適配層下面可能有別的原因,比如 tensorflow 默認帶了一個 float16 壓縮,具體的其他壓縮算法比如 3LC ,可以通過有損壓縮或者無損壓縮提高帶寬利用率。
統一層
這一層的實現是統一的,所有的適配層最後都是發出一些 Op+Tensor 的 Message 到隊列中,後臺初始化的時候會有一個專門的線程專門消費這個隊列。他有一個同步消息的過程,相當於這個 tensor 在所有節點上都就緒以後就可以開始計算了,主體的流程是:
// The coordinator currently follows a master-worker paradigm. Rank zero acts // as the master (the "coordinator"), whereas all other ranks are simply // workers. Each rank runs its own background thread which progresses in ticks. // In each tick, the following actions happen: // // a) The workers send a Request to the coordinator, indicating what // they would like to do (which tensor they would like to gather and // reduce, as well as their shape and type). They repeat this for every // tensor that they would like to operate on. // // b) The workers send an empty "DONE" message to the coordinator to // indicate that there are no more tensors they wish to operate on. // // c) The coordinator receives the Requests from the workers, as well // as from its own TensorFlow ops, and stores them in a [request table]. The // coordinator continues to receive Request messages until it has // received MPI_SIZE number of empty "DONE" messages. // // d) The coordinator finds all tensors that are ready to be reduced, // gathered, or all operations that result in an error. For each of those, // it sends a Response to all the workers. When no more Responses // are available, it sends a "DONE" response to the workers. If the process // is being shutdown, it instead sends a "SHUTDOWN" response. // // e) The workers listen for Response messages, processing each one by // doing the required reduce or gather, until they receive a "DONE" // response from the coordinator. At that point, the tick ends. // If instead of "DONE" they receive "SHUTDOWN", they exit their background // loop.
簡單來講就是說 coordinator 集 size 個 request DONE,然後找出就緒的 tensor (在 message_table 裏面查找)構造出一個 read_to_reduce 的列表,然後發出 size 個 request 告知進程進行計算,然後 worker 接受到 response 開始真正的計算過程(通過 op_manager 具體執行)。
這是整體同步的過程,如果打開 horovod 的 trace log( HOROVOD_LOG_LEVEL=trace
) 就能看到同步的過程。horovod 的主要 Op 除了 AllReduce 之外還有 allgather 和 broadcast。
算子實現層
具體的 op 在 common/op 可以看到有 NCCL/Gloo/MPI 等等的,這些由 op_manager 管理,他會根據優先級找到可以用來計算的 op 進行計算,比如 MPI 用的就是 MPI_Allreduce,具體 scatter-gather 和 all-gather openMPI 有現成的實現,NCCL 就直接調用 ncclAllReduce
,比較新的 nccl 也支持跨節點的 allreduce 了,不用自己再套一層。
除了 allreduce 之外,還有兩個比較重要的算子。
allgather 主要是比 allreduce 少一層 reduce,所有數據被髮送到所有進程就可以。allreduce 的第二步就是把每個進程的 scatter-reduce 的 reduce 結果發送到所有進程。
broadcast 的作用是一對多的廣播,主要是把初始化的參數同步給其他進程的時候使用。