点击关注上方“ 知了小巷 ”,

设为“置顶或星标”,第一时间送达干货。

ApplicationMaster<-->ResourceManager

“通用”YARN应用涉及的角色及交互:

RM:ResourceManager

AM:ApplicationMaster

NM:NodeManager

交互中用到的主要通信协议:

ApplicationClientProtocol

ApplicationMasterProtocol

ContainerManagementProtocol

Client<-->ResourceManager

客户端程序与RM进行交互,通过YarnClient对象来实现。

ApplicationMaster<-->ResourceManager

AM与RM进行交互,通过AMRMClientAsync对象来实现,

AMRMClientAsync.CallbackHandler异步处理事件信息。

ApplicationMaster<-->NodeManager

AM与NM进行交互,通过NMClientAsync对象来实现,主要是启动Container,

NMClientAsync.CallbackHandler异步处理Container事件。

接口请求和响应的proto message定义:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto。

Hadoop版本3.2.1

Flink版本1.10

1.以Flink中Yarn per-job模式下

JobManager------

进程YarnJobClusterEntrypoint为例

// 起点是 YarnJobClusterEntrypoint#main 方法

// 落点是 YarnResourceManager

/**

* The yarn implementation of the resource manager. Used when the system is started

* via the resource framework YARN.

*/

public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>

implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {

// 传说中的ApplicationMaster

...

/** resourceManagerClient与ResourceManager进行交互 Client to communicate with the Resource Manager (YARN's master). */

private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;


/** nodeManagerClient与NodeManager进行交互 Client to communicate with the Node manager and launch TaskExecutor processes. */

private NMClientAsync nodeManagerClient;

...

}

AMRMClientAsync

abstract class(YARN应用需要自定义实现),用来处理与ResourceManager之间的通信和交互,它提供对事件的异步更新操作,比如Container的分配和资源使用结束。它包含一个线程,定期向ResourceManager发送心跳。

需要通过实现AMRMClientAsync.CallbackHandler回调接口来配合AMRMClientAsync。

2.简单实例MyCallbackHandler

AMRMClientAsync客户端生命周期

3.AMRMClientAsync部分源码

4.AMRMClientAsyncImpl部分源码

5.AMRMClient部分源码

package org.apache.hadoop.yarn.client.api;


import ...


// 抽象类AMRMClient

public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends

AbstractService {

...

}

6.AMRMClientImpl部分源码

7.ApplicationMasterProtocol部分源码

ApplicationMasterProtocol接口比较简单,只有三个方法

package org.apache.hadoop.yarn.api;


import ...

// 接口ApplicationMasterProtocol

public interface ApplicationMasterProtocol {

// 向RM注册自己(AM)

public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request)

throws YarnException, IOException;

// 告诉RM,让RM注销自己(AM),有可能AM已经成功执行结束,也有可能应用失败了

public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request)

throws YarnException, IOException;

// AM与RM之间的主要接口(方法),处理AllocateRequest并返回AllocateResponse

// 就是传说中的请求Container,是成批申请和响应的(比如Flink JobManager一次申请3个TaskManager)

// 最多执行一次,不会重复和过度分配

public AllocateResponse allocate(AllocateRequest request)

throws YarnException, IOException;

}

8.ApplicationMasterProtocolPBClientImpl部分源码

package org.apache.hadoop.yarn.api.impl.pb.client;


import ...


// 客户端ApplicationMasterProtocol接口的实现

public class ApplicationMasterProtocolPBClientImpl implements ApplicationMasterProtocol, Closeable {


private ApplicationMasterProtocolPB proxy;


public ApplicationMasterProtocolPBClientImpl(long clientVersion, InetSocketAddress addr,

Configuration conf) throws IOException {

RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);

// 底层会调用java.lang.reflect.Proxy#newProxyInstance

proxy =

(ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, clientVersion,

addr, conf);

}

...

}

9.ApplicationMasterProtocolPB

package org.apache.hadoop.yarn.api;


import ...


@Private

@Unstable

@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB",

protocolVersion = 1)

public interface ApplicationMasterProtocolPB extends ApplicationMasterProtocolService.BlockingInterface {


}

10.ApplicationMasterProtocolService的定义

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationmaster_protocol.proto

option java_package = "org.apache.hadoop.yarn.proto";

option java_outer_classname = "ApplicationMasterProtocol";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

package hadoop.yarn;


import "yarn_service_protos.proto";


service ApplicationMasterProtocolService {

rpc registerApplicationMaster (RegisterApplicationMasterRequestProto) returns (RegisterApplicationMasterResponseProto);

rpc finishApplicationMaster (FinishApplicationMasterRequestProto) returns (FinishApplicationMasterResponseProto);

rpc allocate (AllocateRequestProto) returns (AllocateResponseProto);

}

11.ApplicationMasterProtocolPBServiceImpl部分源码

ApplicationMasterProtocolPB接口的服务端(RM)实现

package org.apache.hadoop.yarn.api.impl.pb.service;


import ...


@Private

public class ApplicationMasterProtocolPBServiceImpl implements ApplicationMasterProtocolPB {


private ApplicationMasterProtocol real;

// ResourceManager启动时会通过此构造方法初始化real对象

public ApplicationMasterProtocolPBServiceImpl(ApplicationMasterProtocol impl) {

this.real = impl;

}

...

}

12.ApplicationMasterService部分源码

package org.apache.hadoop.yarn.server.resourcemanager;


import ...


@SuppressWarnings("unchecked")

@Private

public class ApplicationMasterService extends AbstractService implements

ApplicationMasterProtocol {

// 最终会调用到这里的方法并返回结果

...

}

【END】

 

相关文章