最近在做如何利用spark的api接口来获取占用的内存数、执行时间、跑的sparksql的等相关信息。查阅文档
https://spark.apache.org/docs/latest/monitoring.html#rest-api
找到了一些想要的信息.
rest-api
除了在UI中查看指标外,它们还可以作为JSON使用。这为开发人员提供了一种简便的方法来为Spark创建新的可视化和监视工具。JSON可用于正在运行的应用程序和历史记录服务器中。接口安装在/api/v1。如,对于历史记录,可以通过
http://<server-url>:18080/api/v1
访问他们,而对于正在运行的应用程序,则可以通过
http://localhost:4040/api/v1
查找到。
在API中,应用程序由其应用程序ID引用[app-id]。在YARN上运行时,每个应用程序可能会进行多次尝试,但是仅针对群集模式下的应用程序具有尝试ID,而客户端模式下的应用程序没有尝试ID。YARN群集模式下的应用程序可以通过其标识[attempt-id]。在下面列出的API中,当以YARN群集模式运行时, [app-id]实际上将为[base-app-id]/[attempt-id],其中[base-app-id]YARN应用程序ID.
接口 | 含义 |
---|---|
/applications | 所有应用程序的列表。 ?status=[completed/running]仅列出处于选定状态的应用程序。 ?minDate=[date]最早列出的开始日期/时间。 ?maxDate=[date]列出的最晚开始日期/时间。 ?minEndDate=[date]列出的最早结束日期/时间。 ?maxEndDate=[date]列出的最晚结束日期/时间。 ?limit=[limit]限制列出的应用程序数量。 例子: ?minDate=2015-02-10 ?minDate=2015-02-03T16:42:40.000GMT ?maxDate=2015-02-11T20:41:30.000GMT ?minEndDate=2015-02-12 ?minEndDate=2015-02-12T09:15:10.000GMT ?maxEndDate=2015-02-14T16:30:45.000GMT ?limit=10 |
/applications/[app-id]/jobs | 所有作业的列表 ?status=[running/succeeded/failed/unknown]仅列出处于特定状态的作业。 |
/applications/[app-id]/jobs/[job-id] | job的详细信息 |
/applications/[app-id]/stages | 所有阶段的信息列表 ?status=[active/complete/pending/failed]仅列出该状态的阶段。 |
/applications/[app-id]/stages/[stage-id] | 具体某个阶段的详细信息 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] | 阶段尝试的详细信息 没懂 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary | 给定阶段尝试中所有任务的摘要指标。 ?quantiles用给定的分位数总结指标。 例:?quantiles=0.01,0.5,0.99 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList | 给定阶段尝试的所有任务的列表。 ?offset=[offset]&length=[len]列出给定范围内的任务。 ?sortBy=[runtime-runtime]排序任务。 例:?offset=10&length=50&sortBy=runtime |
/applications/[app-id]/executors | 给定应用程序的所有活动执行者的列表。 |
/applications/[app-id]/executors/[executor-id]/threads | 在给定的活动执行器中运行的所有线程的堆栈跟踪。无法通过历史记录服务器使用。 |
/applications/[app-id]/allexecutors | 给定应用程序的所有(活动和无效)执行程序的列表。 |
/applications/[app-id]/storage/rdd | 给定应用程序的已存储RDD列表。 |
/applications/[app-id]/storage/rdd/[rdd-id] | 给定RDD的存储状态的详细信息。 |
/applications/[base-app-id]/logs | 将给定应用程序所有尝试的事件日志下载为zip文件中的文件。 |
/applications/[base-app-id]/[attempt-id]/logs | 下载特定应用程序尝试的事件日志为zip文件。 |
/applications/[app-id]/streaming/statistics | 流上下文的统计信息。 |
/applications/[app-id]/streaming/receivers | 所有流式接收器的列表。 |
/applications/[app-id]/streaming/receivers/[stream-id] | 给定接收者的详细信息。 |
/applications/[app-id]/streaming/batches | 所有保留批次的列表。 |
/applications/[app-id]/streaming/batches/[batch-id] | 给定批次的详细信息。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations | 给定批次的所有输出操作的列表。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] | 给定操作和给定批次的详细信息。 |
/applications/[app-id]/environment | 给定应用程序的环境详细信息。 |
/version | 获取当前的Spark版本。 |
rest-api 例子
/api/v1/applications
curl -s --negotiate -u : -H "Accept: application/json" -X GET "http://****:18081/api/v1/applications/application_1572579346761_563234"
结果如下:
{
"id" : "application_1572579346761_563234",
"name" : "SparkSQL::****ip地址",
"attempts" : [ {
"startTime" : "2019-11-05T04:30:49.152GMT",
"endTime" : "2019-11-05T04:33:06.537GMT",
"lastUpdated" : "2019-11-05T04:33:06.584GMT",
"duration" : 137385,
"sparkUser" : "portal",
"completed" : true,
"appSparkVersion" : "2.3.2",
"lastUpdatedEpoch" : 1572928386584,
"startTimeEpoch" : 1572928249152,
"endTimeEpoch" : 1572928386537
} ]
####
从这里我们可以获取执行时长、执行时间、提交者等有用信息
/api/v1/applications/application_1572579346761_56234/jobs
curl -s --negotiate -u : -H "Accept: application/json" -X GET "http://*****:18081/api/v1/applications/application_1572579346761_56234/jobs"
结果如下:
[ {
"jobId" : 1, //当前Job的id,int型
"name" : "processLine at CliDriver.java:311",
"submissionTime" : "2019-11-05T04:32:17.289GMT", //提交时间
"completionTime" : "2019-11-05T04:33:05.133GMT", //完成时间
"stageIds" : [ 1, 2 ],
"status" : "SUCCEEDED", //当前Job状态
"numTasks" : 28, //总任务
"numActiveTasks" : 0,
"numCompletedTasks" : 5, //当前已完成任务
"numSkippedTasks" : 23,
"numFailedTasks" : 0, //失败的任务
"numKilledTasks" : 0,
"numCompletedIndices" : 5,
"numActiveStages" : -1,
"numCompletedStages" : 1,
"numSkippedStages" : 1,
"numFailedStages" : 0,
"killedTasksSummary" : { }
}, {
"jobId" : 0,
"name" : "processLine at CliDriver.java:311",
"submissionTime" : "2019-11-05T04:31:27.853GMT",
"completionTime" : "2019-11-05T04:32:17.229GMT",
"stageIds" : [ 0 ],
"status" : "SUCCEEDED",
"numTasks" : 23,
"numActiveTasks" : 0,
"numCompletedTasks" : 23,
"numSkippedTasks" : 0,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 23,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0,
"killedTasksSummary" : { }
} ]
/api/v1/applications/application_1572579346761_563234/stages
curl -s --negotiate -u : -H "Accept: application/json" -X GET "http://******:18081/api/v1/applications/application_1572579346761_234/stages"
运行结果如下:
[ {
"status" : "COMPLETE", //状态
"stageId" : 2,
"attemptId" : 0,
"numTasks" : 5,
"numActiveTasks" : 0,
"numCompleteTasks" : 5,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 5,
"executorRunTime" : 221157,
"executorCpuTime" : 202281091903,
"submissionTime" : "2019-11-05T04:32:17.294GMT",
"firstTaskLaunchedTime" : "2019-11-05T04:32:17.470GMT",
"completionTime" : "2019-11-05T04:33:05.132GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 1543321703,
"outputRecords" : 4806030,
"shuffleReadBytes" : 2403684362,
"shuffleReadRecords" : 4806030,
"shuffleWriteBytes" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "processLine at CliDriver.java:311",
"description" : "insert into table portal.ddm_bjrec_exp_program_etl_hourly partition(dt, dt_hour) select devId as deviceid, curtime, code, datas, lower(devOS) as platform, team, program, type, version as app_version, devtype as device_type, passport, subtype, requestId as request_id, `from` as refer_from, net, operation, `timestamp` as client_time, fn, offset, size, canal, latitude, longitude, location_code, ip, ua, ts, app, devIdOrg as deviceid_org, passportOrg as passport_org, cost, recalls, skipIds as skip_ids, date_format(from_unixtime(`timestamp` / 1000), 'yyyy-MM-dd') as dt, concat(date_format(from_unixtime(`timestamp` / 1000), 'yyyy-MM-dd'), '-', date_format(from_unixtime(`timestamp` / 1000), 'HH')) as dt_hour from portal.odm_datacenter_exp_program_split_hourly where dt_hour = date_format('2019-11-05 09:30:00', 'yyyy-MM-dd-HH') distribute by cast(rand() * 5 as bigint) ",
"details" : "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:515)\norg.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:195)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)\norg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)\norg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)\nazkaban.jobtype.HadoopSecureSparkSQLWrapper.main(HadoopSecureSparkSQLWrapper.java:149)",
"schedulingPool" : "default",
"rddIds" : [ 9, 8, 7 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
}, {
"status" : "SKIPPED",
"stageId" : 1,
"attemptId" : 0,
"numTasks" : 23,
"numActiveTasks" : 0,
"numCompleteTasks" : 0,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 0,
"executorRunTime" : 0,
"executorCpuTime" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "processLine at CliDriver.java:311",
"details" : "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:515)\norg.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:195)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)\norg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)\norg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)\nazkaban.jobtype.HadoopSecureSparkSQLWrapper.main(HadoopSecureSparkSQLWrapper.java:149)",
"schedulingPool" : "default",
"rddIds" : [ 6, 4, 1, 0, 3, 2, 5 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
}, {
"status" : "COMPLETE",
"stageId" : 0,
"attemptId" : 0,
"numTasks" : 23,
"numActiveTasks" : 0,
"numCompleteTasks" : 23,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 23,
"executorRunTime" : 531926,
"executorCpuTime" : 412178121673,
"submissionTime" : "2019-11-05T04:31:27.914GMT",
"firstTaskLaunchedTime" : "2019-11-05T04:31:30.048GMT",
"completionTime" : "2019-11-05T04:32:17.218GMT",
"inputBytes" : 5865825094,
"inputRecords" : 4806030,
"outputBytes" : 0,
"outputRecords" : 0,
"shuffleReadBytes" : 0,
"shuffleReadRecords" : 0,
"shuffleWriteBytes" : 2403684362,
"shuffleWriteRecords" : 4806030,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "processLine at CliDriver.java:311",
"description" : "insert into table portal.ddm_bjrec_exp_program_etl_hourly partition(dt, dt_hour) select devId as deviceid, curtime, code, datas, lower(devOS) as platform, team, program, type, version as app_version, devtype as device_type, passport, subtype, requestId as request_id, `from` as refer_from, net, operation, `timestamp` as client_time, fn, offset, size, canal, latitude, longitude, location_code, ip, ua, ts, app, devIdOrg as deviceid_org, passportOrg as passport_org, cost, recalls, skipIds as skip_ids, date_format(from_unixtime(`timestamp` / 1000), 'yyyy-MM-dd') as dt, concat(date_format(from_unixtime(`timestamp` / 1000), 'yyyy-MM-dd'), '-', date_format(from_unixtime(`timestamp` / 1000), 'HH')) as dt_hour from portal.odm_datacenter_exp_program_split_hourly where dt_hour = date_format('2019-11-05 09:30:00', 'yyyy-MM-dd-HH') distribute by cast(rand() * 5 as bigint) ",
"details" : "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:515)\norg.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:195)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)\norg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)\norg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)\nazkaban.jobtype.HadoopSecureSparkSQLWrapper.main(HadoopSecureSparkSQLWrapper.java:149)",
"schedulingPool" : "default",
"rddIds" : [ 6, 4, 1, 0, 3, 2, 5 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
} ]
从这里我们可以从description字段内获取sql信息。
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]
curl -s --negotiate -u : -H "Accept: application/json" -X GET "http://*****:18081/api/v1/applications/application_1572579346761_563234/stages/2/0"
{
"status" : "COMPLETE",
"stageId" : 2,
"attemptId" : 0,
"numTasks" : 5,
"numActiveTasks" : 0,
"numCompleteTasks" : 5,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 5,
"executorRunTime" : 221157,
"executorCpuTime" : 202281091903,
"submissionTime" : "2019-11-05T04:32:17.294GMT",
"firstTaskLaunchedTime" : "2019-11-05T04:32:17.470GMT",
"completionTime" : "2019-11-05T04:33:05.132GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 1543321703,
"outputRecords" : 4806030,
"shuffleReadBytes" : 2403684362,
"shuffleReadRecords" : 4806030,
"shuffleWriteBytes" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"name" : "processLine at CliDriver.java:311",
"description" : "insert into table portal.ddm_bjrec_exp_program_etl_hourly partition(dt, dt_hour) select devId as deviceid, curtime, code, datas, lower(devOS) as platform, team, program, type, version as app_version, devtype as device_type, passport, subtype, requestId as request_id, `from` as refer_from, net, operation, `timestamp` as client_time, fn, offset, size, canal, latitude, longitude, location_code, ip, ua, ts, app, devIdOrg as deviceid_org, passportOrg as passport_org, cost, recalls, skipIds as skip_ids, date_format(from_unixtime(`timestamp` / 1000), 'yyyy-MM-dd') as dt, concat(date_format(from_unixtime(`timestamp` / 1000), 'yyyy-MM-dd'), '-', date_format(from_unixtime(`timestamp` / 1000), 'HH')) as dt_hour from portal.odm_datacenter_exp_program_split_hourly where dt_hour = date_format('2019-11-05 09:30:00', 'yyyy-MM-dd-HH') distribute by cast(rand() * 5 as bigint) ",
"details" : "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:515)\norg.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:195)\norg.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)\norg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)\norg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)\nazkaban.jobtype.HadoopSecureSparkSQLWrapper.main(HadoopSecureSparkSQLWrapper.java:149)",
"schedulingPool" : "default",
"rddIds" : [ 9, 8, 7 ],
"accumulatorUpdates" : [ ],
"tasks" : {
"24" : {
"taskId" : 24,
"index" : 1,
"attempt" : 0,
"launchTime" : "2019-11-05T04:32:17.472GMT",
"duration" : 45349,
"executorId" : "1",
"host" : "hadoop2676.jd.163.org",
"status" : "SUCCESS",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 204,
"executorDeserializeCpuTime" : 157691982,
"executorRunTime" : 45118,
"executorCpuTime" : 39326605831,
"resultSize" : 2419,
"jvmGcTime" : 3692,
"resultSerializationTime" : 3,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputMetrics" : {
"bytesRead" : 0,
"recordsRead" : 0
},
"outputMetrics" : {
"bytesWritten" : 307781731,
"recordsWritten" : 961182
},
"shuffleReadMetrics" : {
"remoteBlocksFetched" : 19,
"localBlocksFetched" : 4,
"fetchWaitTime" : 0,
"remoteBytesRead" : 392828014,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 88014344,
"recordsRead" : 961182
},
"shuffleWriteMetrics" : {
"bytesWritten" : 0,
"writeTime" : 0,
"recordsWritten" : 0
}
}
},
"25" : {
"taskId" : 25,
"index" : 2,
"attempt" : 0,
"launchTime" : "2019-11-05T04:32:17.472GMT",
"duration" : 47660,
"executorId" : "2",
"host" : "hadoop4370.jd.163.org",
"status" : "SUCCESS",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 304,
"executorDeserializeCpuTime" : 247841638,
"executorRunTime" : 47338,
"executorCpuTime" : 43484148688,
"resultSize" : 2419,
"jvmGcTime" : 2591,
"resultSerializationTime" : 2,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputMetrics" : {
"bytesRead" : 0,
"recordsRead" : 0
},
"outputMetrics" : {
"bytesWritten" : 308763731,
"recordsWritten" : 962051
},
"shuffleReadMetrics" : {
"remoteBlocksFetched" : 20,
"localBlocksFetched" : 3,
"fetchWaitTime" : 1,
"remoteBytesRead" : 412983064,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 68200792,
"recordsRead" : 962051
},
"shuffleWriteMetrics" : {
"bytesWritten" : 0,
"writeTime" : 0,
"recordsWritten" : 0
}
}
},
"27" : {
"taskId" : 27,
"index" : 4,
"attempt" : 0,
"launchTime" : "2019-11-05T04:32:17.472GMT",
"duration" : 44489,
"executorId" : "4",
"host" : "hadoop4654.jd.163.org",
"status" : "SUCCESS",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 231,
"executorDeserializeCpuTime" : 212311324,
"executorRunTime" : 44239,
"executorCpuTime" : 41012850494,
"resultSize" : 2419,
"jvmGcTime" : 2575,
"resultSerializationTime" : 2,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputMetrics" : {
"bytesRead" : 0,
"recordsRead" : 0
},
"outputMetrics" : {
"bytesWritten" : 308673330,
"recordsWritten" : 959526
},
"shuffleReadMetrics" : {
"remoteBlocksFetched" : 19,
"localBlocksFetched" : 4,
"fetchWaitTime" : 1,
"remoteBytesRead" : 401217137,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 78862126,
"recordsRead" : 959526
},
"shuffleWriteMetrics" : {
"bytesWritten" : 0,
"writeTime" : 0,
"recordsWritten" : 0
}
}
},
"26" : {
"taskId" : 26,
"index" : 3,
"attempt" : 0,
"launchTime" : "2019-11-05T04:32:17.472GMT",
"duration" : 39396,
"executorId" : "6",
"host" : "hadoop4689.jd.163.org",
"status" : "SUCCESS",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 215,
"executorDeserializeCpuTime" : 190986186,
"executorRunTime" : 39153,
"executorCpuTime" : 36152483165,
"resultSize" : 2419,
"jvmGcTime" : 2252,
"resultSerializationTime" : 3,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputMetrics" : {
"bytesRead" : 0,
"recordsRead" : 0
},
"outputMetrics" : {
"bytesWritten" : 310398163,
"recordsWritten" : 963486
},
"shuffleReadMetrics" : {
"remoteBlocksFetched" : 19,
"localBlocksFetched" : 4,
"fetchWaitTime" : 0,
"remoteBytesRead" : 399683333,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 82031243,
"recordsRead" : 963486
},
"shuffleWriteMetrics" : {
"bytesWritten" : 0,
"writeTime" : 0,
"recordsWritten" : 0
}
}
},
"23" : {
"taskId" : 23,
"index" : 0,
"attempt" : 0,
"launchTime" : "2019-11-05T04:32:17.470GMT",
"duration" : 45620,
"executorId" : "5",
"host" : "hadoop4689.jd.163.org",
"status" : "SUCCESS",
"taskLocality" : "PROCESS_LOCAL",
"speculative" : false,
"accumulatorUpdates" : [ ],
"taskMetrics" : {
"executorDeserializeTime" : 286,
"executorDeserializeCpuTime" : 235080867,
"executorRunTime" : 45309,
"executorCpuTime" : 42305003725,
"resultSize" : 2419,
"jvmGcTime" : 2375,
"resultSerializationTime" : 2,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0,
"peakExecutionMemory" : 0,
"inputMetrics" : {
"bytesRead" : 0,
"recordsRead" : 0
},
"outputMetrics" : {
"bytesWritten" : 307704748,
"recordsWritten" : 959785
},
"shuffleReadMetrics" : {
"remoteBlocksFetched" : 19,
"localBlocksFetched" : 4,
"fetchWaitTime" : 0,
"remoteBytesRead" : 391632051,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 88232258,
"recordsRead" : 959785
},
"shuffleWriteMetrics" : {
"bytesWritten" : 0,
"writeTime" : 0,
"recordsWritten" : 0
}
}
}
},
"executorSummary" : {
"4" : {
"taskTime" : 44489,
"failedTasks" : 0,
"succeededTasks" : 1,
"killedTasks" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 308673330,
"outputRecords" : 959526,
"shuffleRead" : 480079263,
"shuffleReadRecords" : 959526,
"shuffleWrite" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0
},
"5" : {
"taskTime" : 45620,
"failedTasks" : 0,
"succeededTasks" : 1,
"killedTasks" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 307704748,
"outputRecords" : 959785,
"shuffleRead" : 479864309,
"shuffleReadRecords" : 959785,
"shuffleWrite" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0
},
"6" : {
"taskTime" : 39396,
"failedTasks" : 0,
"succeededTasks" : 1,
"killedTasks" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 310398163,
"outputRecords" : 963486,
"shuffleRead" : 481714576,
"shuffleReadRecords" : 963486,
"shuffleWrite" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0
},
"1" : {
"taskTime" : 45349,
"failedTasks" : 0,
"succeededTasks" : 1,
"killedTasks" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 307781731,
"outputRecords" : 961182,
"shuffleRead" : 480842358,
"shuffleReadRecords" : 961182,
"shuffleWrite" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0
},
"2" : {
"taskTime" : 47660,
"failedTasks" : 0,
"succeededTasks" : 1,
"killedTasks" : 0,
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 308763731,
"outputRecords" : 962051,
"shuffleRead" : 481183856,
"shuffleReadRecords" : 962051,
"shuffleWrite" : 0,
"shuffleWriteRecords" : 0,
"memoryBytesSpilled" : 0,
"diskBytesSpilled" : 0
}
},
"killedTasksSummary" : { }
}
####
executorSummary
当一个Task运行完成时,获取该Task对应Stage的executorSummary信息,这个executorSummary中记录了每个Executor对应的ExecutorSummary信息,其中包括task开始时间,失败task个数,成功task个数,输入输出字节数,shuffle read/write字节数等。然后根据这个Task所属的executorId,找到当前Task的运行统计信息execSummary。如果这个Task运行成功,就将成功task个数加一,否则就将失败task个数加一。然后根据Task运行状态,更新对应Stage中失败或成功Task个数。进一步,更新对应Job中失败或成功的Task个数。
没有评论