Hive MapReduce Spark分布式生成唯一数值型ID

转载自: http://lxw1234.com/archives/2016/12/798.htm

在实际业务场景下,经常会遇到在Hive、MapReduce、Spark中需要生成唯一的数值型ID。

一般常用的做法有:

  1. MapReduce中使用1个Reduce来生成;
  2. Hive中使用row_number分析函数来生成,其实也是1个Reduce;
  3. 借助HBase或Redis或Zookeeper等其它框架的计数器来生成;

数据量不大的情况下,可以直接使用1和2方法来生成,但如果数据量巨大,1个Reduce处理起来就非常慢。

在数据量非常大的情况下,如果你仅仅需要唯一的数值型ID,注意:不是需要”连续的唯一的数值型ID”,那么可以考虑采用本文中介绍的方法,否则,请使用第3种方法来完成。

Spark中生成这样的非连续唯一数值型ID,非常简单,直接使用zipWithUniqueId()即可。

关于zipWithUniqueId,可参考:http://lxw1234.com/archives/2015/07/352.htm

参考zipWithUniqueId()的方法,在MapReduce和Hive中,实现如下:

在Spark中,zipWithUniqueId是通过使用分区Index作为每个分区ID的开始值,在每个分区内,ID增长的步长为该RDD的分区数,那么在MapReduce和Hive中,也可以照此思路实现,Spark中的分区数,即为MapReduce中的Map数,Spark分区的Index,即为Map Task的ID。Map数,可以通过JobConf的getNumMapTasks(),而Map Task ID,可以通过参数mapred.task.id获取,格式如:attempt_1478926768563_0537_m_000004_0,截取m_000004_0中的4,再加1,作为该Map Task的ID起始值。注意:这两个只均需要在Job运行时才能获取。另外,从图中也可以看出,每个分区/Map Task中的数据量不是绝对一致的,因此,生成的ID不是连续的。

下面的UDF可以在Hive中直接使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.lxw1234.hive.udf;

import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.LongWritable;

@UDFType(deterministic = false, stateful = true)
public class RowSeq2 extends GenericUDF {

private static LongWritable result = new LongWritable();
private static final char SEPARATOR = '_';
private static final String ATTEMPT = "attempt";
private long initID = 0l;
private int increment = 0;


@Override
public void configure(MapredContext context) {
increment = context.getJobConf().getNumMapTasks();
if(increment == 0) {
throw new IllegalArgumentException("mapred.map.tasks is zero");
}

initID = getInitId(context.getJobConf().get("mapred.task.id"),increment);
if(initID == 0l) {
throw new IllegalArgumentException("mapred.task.id");
}

System.out.println("initID : " + initID + " increment : " + increment);
}

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
result.set(getValue());
increment(increment);
return result;
}

@Override
public String getDisplayString(String[] children) {
return "RowSeq-func()";
}

private synchronized void increment(int incr) {
initID += incr;
}

private synchronized long getValue() {
return initID;
}

//attempt_1478926768563_0537_m_000004_0 // return 0+1
private long getInitId (String taskAttemptIDstr,int numTasks)
throws IllegalArgumentException {
try {
String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR));
if(parts.length == 6) {
if(parts[0].equals(ATTEMPT)) {
if(!parts[3].equals("m") && !parts[3].equals("r")) {
throw new Exception();
}
long result = Long.parseLong(parts[4]);
if(result >= numTasks) { //if taskid >= numtasks
throw new Exception("TaskAttemptId string : " + taskAttemptIDstr
+ " parse ID [" + result + "] >= numTasks[" + numTasks + "] ..");
}
return result + 1;
}
}
} catch (Exception e) {}
throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr
+ " is not properly formed");
}

}