上篇文章介紹了編寫 Yarn Application 的整體框架流程,本篇文章將詳細介紹其中 Client 部分的編寫方式。 一、Yarn Client 編寫方法 本篇代碼已上傳 Github: Github - MyYarnClient 一)編寫流程 1、創建並啟動 Client YarnClie ...
上篇文章介紹了編寫 Yarn Application 的整體框架流程,本篇文章將詳細介紹其中 Client 部分的編寫方式。
一、Yarn Client 編寫方法
本篇代碼已上傳 Github:
Github - MyYarnClient
一)編寫流程
1、創建並啟動 Client
YarnClient 內容通過 ApplicationClientProtocol 與 ResourceManager 通信,向 RM 的ApplicationsManager
申請 Application。
跟蹤進去可以在 YarnClientImpl
找到 rpc:
this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
2、通過YarnClient 創建 Application
GetNewApplicationResponse
中除了包含 ApplicationId
,還包括集群最大/最小資源,給任務啟動設置的資源作參考。
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
3、關鍵:完善 ApplicationSubmissionContext
需要在 ApplicationSubmissionContext
中定義 RM 啟動 AM 時所需的全部信息,主要包括:
- app 信息:id,name
- 隊列、優先順序信息
- 提交用戶
- ContainerLaunchContext:定義 AM 啟動所需信息
- RECT
- Resources (binaries, jars, files etc.):其中包括 Application master jar
- Environment settings (CLASSPATH etc.)
- Command to be executed
- security Tokens
- RECT
// 3 完善 ApplicationSubmissionContext 所需內容
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId applicationId = appContext.getApplicationId();
// 3.1 設置application name
appContext.setApplicationName("my-test-app");
// 3.2 設置ContainerLaunchContext
// localResources, env, commands 等
// application master 的 jar 放到 localResources 中
// 這部分較長省略,請到代碼中查看
ContainerLaunchContext amContainerCtx = createAMContainerLaunchContext(
conf, app.getApplicationSubmissionContext().getApplicationId());
appContext.setAMContainerSpec(amContainerCtx);
// 3.3 設置優先順序
Priority pri = Priority.newInstance(0);
appContext.setPriority(pri);
// 3.4 設置隊列
appContext.setQueue("default");
// 3.5 設置 am 資源
int amMemory = 2048;
int amVCores = 2;
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
4、提交 Application
提交後,RM 接收到 Application,根據資源請求分配容器,最終將 AM 啟動在容器中。
這裡交給 YarnClientImpl
執行 rmClient.submitApplication(request)
,通過 RPC ApplicationClientProtocol
提交到 RM
ApplicationId appId = yarnClient.submitApplication(appContext);
5、獲取任務進度信息
ApplicationReport report = yarnClient.getApplicationReport(appId);
log.info("Got application report " +
", clientToAMToken=" + report.getClientToAMToken()
+ ", appDiagnostics=" + report.getDiagnostics()
+ ", appMasterHost=" + report.getHost()
+ ", appQueue=" + report.getQueue()
+ ", appMasterRpcPort=" + report.getRpcPort()
+ ", appStartTime=" + report.getStartTime()
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ ", appTrackingUrl=" + report.getTrackingUrl()
+ ", appUser=" + report.getUser());
6、kill Application
當 Application 運行了過長的時間或者其他的原因,client 可以 kill application。
流程是:client 像 RM 發送 kill 信號,再傳遞給 AM
yarnClient.killApplication(appId);
二)涉及的通信協議
參考文章:
Hadoop: Writing YARN Applications - Writing a simple Client
《Hadoop 技術內幕 - 深入解析 Yarn 結構設計與實現原理》第四章