大数据下的DistinctCount

在数据库中,常常会有Distinct Count的操作,比如,查看每一选修课程的人数:

1
2
3
select course, count(distinct sid)
from stu_table
group by course;

Hive

在大数据场景下,报表很重要一项是UV(Unique Visitor)统计,即某时间段内用户人数。例如,查看一周内app的用户分布情况,Hive中写HiveQL实现:

1
2
3
4
5
select app, count(distinct uid) as uv
from log_table
where week_cal = '2016-03-27'
order by uv desc
limit 20

Pig

大部分情况下,Hive的执行效率偏低,我更为偏爱Pig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- all users define DISTINCT_COUNT(A, a) returns dist {
B = foreach $A generate $a;
unique_B = distinct B;
C = group unique_B all;
$dist = foreach C generate SIZE(unique_B);
}
A = load '/path/to/data' using PigStorage() as (app, uid);
B = DISTINCT_COUNT(A, uid);

-- <app, users>
A = load '/path/to/data' using PigStorage() as (app, uid);
B = distinct A;
C = group B by app;
D = foreach C generate group as app, COUNT($1) as uv;
-- or
D = foreach C generate group as app, SIZE($1) as uv;

DataFu 为pig提供基数估计的UDF

Count:
1
2
3
4
5
```sql
define HyperLogLogPlusPlus datafu.pig.stats.HyperLogLogPlusPlus();
A = load '/path/to/data' using PigStorage() as (app, uid);
B = group A by app;
C = foreach B generate group as app, HyperLogLogPlusPlus($1) as uv;

Spark

在Spark中,Load数据后通过RDD一系列的转换——map、distinct、reduceByKey进行Distinct Count:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rdd.map { row => (row.app, row.uid) }
.distinct()
.map { line => (line._1, 1) }
.reduceByKey(_ + _)

// or
rdd.map { row => (row.app, row.uid) }
.distinct()
.mapValues{ _ => 1 }
.reduceByKey(_ + _)

// or
rdd.map { row => (row.app, row.uid) }
.distinct()
.map(_._1)
.countByValue()

同时,Spark提供近似Distinct Count的API:

1
2
rdd.map { row => (row.app, row.uid) }
.countApproxDistinctByKey(0.001)

实现是基于HyperLogLog算法:

The algorithm used is based on streamlib’s implementation of “HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm”, available here.

或者,将Schema化的RDD转成DataFrame后,registerTempTable然后执行sql命令亦可:

1
2
3
4
5
val sqlContext = new SQLContext(sc)
val df = rdd.toDF()
df.registerTempTable("app_table")

val appUsers = sqlContext.sql("select app, count(distinct uid) as uv from app_table group by app")

Bitmap

Bitmap介绍

《编程珠玑》上是这样介绍bitmap的:

Bitmap是一个十分有用的数据结构。所谓的Bitmap就是用一个bit位来标记某个元素对应的Value,而Key即是该元素。由于采用了Bit为单位来存储数据,因此在内存占用方面,可以大大节省。

简而言之——用一个bit(0或1)表示某元素是否出现过,其在bitmap的位置对应于其index。《编程珠玑》给出了一个用bitmap做排序的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* Copyright (C) 1999 Lucent Technologies */
/* From 'Programming Pearls' by Jon Bentley */
/* bitsort.c -- bitmap sort from Column 1 * Sort distinct integers in the range [0..N-1] */
#include <stdio.h>

#define BITSPERWORD 32
#define SHIFT 5
#define MASK 0x1F
#define N 10000000
int a[1 + N / BITSPERWORD];

void set(int i) { a[i >> SHIFT] |= (1 << (i & MASK)); }

void clr(int i) { a[i >> SHIFT] &= ~(1 << (i & MASK)); }

int test(int i) { return a[i >> SHIFT] & (1 << (i & MASK)); }

int main() {
int i;
for (i = 0; i < N; i++)
clr(i);
/* Replace above 2 lines with below 3 for word-parallel init int top = 1 + N/BITSPERWORD; for (i = 0; i < top; i++) a[i] = 0; */
while (scanf("%d", &i) != EOF)
set(i);
for (i = 0; i < N; i++)
if (test(i))
printf("%d\n", i);
return 0;
}

上面代码中,用int的数组存储bitmap,对于每一个待排序的int数,其对应的index为其int值。

Distinct Count优化

  1. index生成

为了使用bitmap做Distinct Count,首先需得到每个用户(uid)对应(在bitmap中)的index。有两种办法可以得到从1开始编号index表(与uid一一对应):

  • hash,但是要找到无碰撞且hash值均匀分布[1, +∞)区间的hash函数是非常困难的;
    维护一张uid与index之间的映射表,并增量更新
  • 比较两种方法,第二种方法更为简单可行。
  1. UV计算

在index生成完成后,RDD[(uid, V)]与RDD[(uid, index)]join得到index化的RDD。bitmap的开源实现有EWAH,采用RLE(Run Length Encoding)压缩,很好地解决了存储空间的浪费。Distinct Count计算转变成了求bitmap中1的个数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// distinct count for rdd(not pair) and the rdd must be sorted in each partition
// 计算独立值的count
def distinctCount(rdd: RDD[Int]): Int = {
val bitmap = rdd.aggregate[EWAHCompressedBitmap](new EWAHCompressedBitmap())(
(u: EWAHCompressedBitmap, v: Int) => {
u.set(v)
u
},
(u1: EWAHCompressedBitmap, u2: EWAHCompressedBitmap) => u1.or(u2)
)
bitmap.cardinality()
}

// the tuple_2 is the index
// 计算每个值对应的count
def groupCount[K: ClassTag](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
val grouped: RDD[(K, EWAHCompressedBitmap)] = rdd.combineByKey[EWAHCompressedBitmap](
(v: Int) => EWAHCompressedBitmap.bitmapOf(v),
(c: EWAHCompressedBitmap, v: Int) => {
c.set(v)
c
},
(c1: EWAHCompressedBitmap, c2: EWAHCompressedBitmap) => c1.or(c2))
grouped.map(t => (t._1, t._2.cardinality()))
}

但是,在上述计算中,由于EWAHCompressedBitmap的set方法要求int值是升序的,也就是说RDD的每一个partition的index应是升序排列:

1
2
3
4
5
6
// sort pair RDD by value
def sortPairRDD[K](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
rdd.mapPartitions(iter => {
iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator
})
}

为了避免排序,可以为每一个uid生成一个bitmap,然后在Distinct Count时将bitmap进行or运算亦可:

1
2
rdd.reduceByKey(_ or _)
.mapValues(_._2.cardinality())