Spark Job任务提交,取消,查看进度

使用一个SparkContext时,可以针对不同Job进行分组提交和取消:

1
2
3
4
5
6
7
8
9
// 提交任务
private SparkContext sc;
private SQLContext sqlc;
sc.setJobGroup(jobGroup, description, true);
sqlc.sql(st);
sc.clearJobGroup();

// 取消任务
sc.cancelJobGroup(jobGroup)

获取任务执行进度信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
String jobGroup = getJobGroup(context);
SQLContext sqlc = getSparkInterpreter().getSQLContext();
SparkContext sc = sqlc.sparkContext();
int completedTasks = 0;
int totalTasks = 0;
JobProgressListener sparkListener = new JobProgressListener(context.getConf());


DAGScheduler scheduler = sc.dagScheduler();
HashSet<ActiveJob> jobs = scheduler.activeJobs();
Iterator<ActiveJob> it = jobs.iterator();
while (it.hasNext()) {
ActiveJob job = it.next();
String g = (String) job.properties().get("spark.jobGroup.id");
if (jobGroup.equals(g)) {
int[] progressInfo = null;

progressInfo = getProgressFromStage_1_1x(sparkListener, job.finalStage());

totalTasks += progressInfo[0];
completedTasks += progressInfo[1];
}
}

if (totalTasks == 0) {
return 0;
}
return completedTasks * 100 / totalTasks;
}

private int[] getProgressFromStage_1_1x(JobProgressListener sparkListener, Stage stage) {
int numTasks = stage.numTasks();
int completedTasks = 0;

try {
Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData");
HashMap<Tuple2<Object, Object>, Object> stageIdData =
(HashMap<Tuple2<Object, Object>, Object>) stageIdToData.invoke(sparkListener);
Class<?> stageUIDataClass =
this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");

Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks");

Set<Tuple2<Object, Object>> keys =
JavaConverters.asJavaSetConverter(stageIdData.keySet()).asJava();
for (Tuple2<Object, Object> k : keys) {
if (stage.id() == (int) k._1()) {
Object uiData = stageIdData.get(k).get();
completedTasks += (int) numCompletedTasks.invoke(uiData);
}
}
} catch (Exception e) {
logger.error("Error on getting progress information", e);
}

List<Stage> parents = JavaConversions.asJavaList(stage.parents());
if (parents != null) {
for (Stage s : parents) {
int[] p = getProgressFromStage_1_1x(sparkListener, s);
numTasks += p[0];
completedTasks += p[1];
}
}
return new int[] {numTasks, completedTasks};
}