逐层算法
在介绍快速Cube算法之前,我们先简单回顾一下现有的算法,也称之为“逐层算法”(By Layer Cubing)。
我们知道,一个N维的完全Cube,是由:1个N维子立方体(Cuboid), N个(N-1)维Cuboid, N*(N-1)/2个(N-2)维Cuboid …, N个1维Cuboid, 1个0维Cuboid,总共2^N个子立方体组成的;在“逐层算法”中,按维度数逐渐减少来计算,每个层级的计算(除了第一层,它是从原始数据聚合而来),是基于它上一层级的结果来计算的。
举例子来说,[Group by A, B]的结果,可以基于[Group by A, B, C]的结果,通过去掉C后聚合得来的;这样可以减少重复计算;当 0维度Cuboid计算出来的时候,整个Cube的计算也就完成了。
图1展示了用该算法计算一个四维Cube的流程。
图1 逐层算法
此算法的Mapper和Reducer都比较简单。Mapper以上一层Cuboid的结果(Key-Value对)作为输入。由于Key是由各维度值拼接在一起,从其中找出要聚合的维度,去掉它的值成新的Key,然后把新Key和Value输出,进而Hadoop MapReduce对所有新Key进行排序、洗牌(shuffle)、再送到Reducer处;Reducer的输入会是一组有相同Key的Value集合,对这些Value做聚合计算,再结合Key输出就完成了一轮计算。
每一轮的计算都是一个MapReduce任务,且串行执行; 一个N维的Cube,至少需要N次MapReduce Job。
算法优点
算法缺点
当Cube有比较多维度的时候,所需要的MapReduce任务也相应增加;由于Hadoop的任务调度需要耗费额外资源,特别是集群较庞大的时候,反复递交任务造成的额外开销会相当可观;
由于Mapper不做预聚合,此算法会对Hadoop MapReduce输出较多数据; 虽然已经使用了Combiner来减少从Mapper端到Reducer端的数据传输,所有数据依然需要通过Hadoop MapReduce来排序和组合才能被聚合,无形之中增加了集群的压力;
对HDFS的读写操作较多:由于每一层计算的输出会用做下一层计算的输入,这些Key-Value需要写到HDFS上;当所有计算都完成后,Kylin还需要额外的一轮任务将这些文件转成HBase的HFile格式,以导入到HBase中去;
总体而言,该算法的效率较低,尤其是当Cube维度数较大的时候;时常有用户问,是否能改进Cube算法,缩短时间。
快速Cube算法
快速Cube算法(Fast Cubing)是麒麟团队对新算法的一个统称,它还被称作“逐段”(By Segment) 或“逐块”(By Split) 算法。
该算法的主要思想是,对Mapper所分配的数据块,将它计算成一个完整的小Cube 段(包含所有Cuboid);每个Mapper将计算完的Cube段输出给Reducer做合并,生成大Cube,也就是最终结果;图2解释了此流程。
图2逐块Cube算法
Mapper的预聚合
与旧算法相比,快速算法主要有两点不同:
我们看一个例子:某个Cube有四个维度:A、B、C、D;每个Mapper分配到的数据块有约一百万条记录;在这一百万条记录中,每个维度的基数(Cardinality)分别是Card(A), Card(B), Card(C), Card(D)。
当从原始数据计算四维Cuboid(ID: 1111)的时候:旧算法的Mapper会简单地对每条记录去除不相关的维度,然后输出到Hadoop,所以输出量依然是一百万条;新算法的Mapper,由于做了聚合,它只输出[count distinct A, B, C, D]条记录到Hadoop,此数目肯定小于原始条数;在很多时候下,它会是原来的1/10甚至1/1000。
当从四维Cuboid 1111计算三维Cuboid如0111的时候,维度A会被聚合掉;假定A维度的值均匀分布,那么聚合后的记录数会是四维Cuboid记录数的1/ Card(A),;而旧算法的Mapper输出数跟四维Cuboid记录数相同。
可以看到,在Cuboid的推算过程中的每一步,新算法都会比旧算法产生更少数据;总的加起来,新算法中的Mapper对Hadoop的输出,会比老算法少一个或几个数量级,具体数字取决于用户数据的特性;越少的数据,意味着越少的I/O和CPU,从而使得性能得以提升。
子立方体生成树的遍历
值得一提的还有一个改动,就是子立方体生成树(Cuboid Spanning Tree)的遍历次序;在旧算法中,Kylin按照层级,也就是广度优先遍历(Broad First Search)的次序计算出各个Cuboid;在快速Cube算法中,Mapper会按深度优先遍历(Depth First Search)来计算各个Cuboid。深度优先遍历是一个递归方法,将父Cuboid压栈以计算子Cuboid,直到没有子Cuboid需要计算时才出栈并输出给Hadoop;最多需要暂存N个Cuboid,N是Cube维度数。
采用DFS,是为了兼顾CPU和内存:
图3子立方体生成树的遍历
图3是一个四维Cube的完整生成树;按照DFS的次序,在0维Cuboid 输出前的计算次序是 ABCD -> BCD -> CD -> D -> *, ABCD, BCD, CD和D需要被暂存;在*被输出后,D可被输出,内存得到释放;在C被计算并输出后,CD就可以被输出; ABCD最后被输出。
采用DFS,Mapper的输出会是排序的(某些特殊情况除外):Cube行键(row key)是由[Cuboid ID + 维度值]组成;DFS访问的结果,恰好是按照Cuboid ID从小到大输出;而在同一个Cuboid内,维度值也是升序排序;所以总的输出是排序的,请看如下示例。
0000
0001[D0]
0001[D1]
....
0010[C0]
0010[C1]
....
0011[C0][D0]
0011[C0][D1]
....
....
1111[A0][B0][C0][D0]
....
注: 这里[D0]代表D维度的最小值,[D1]代表次小值,以此类推。
由于每个Mapper的输出都是排序的,Hadoop对这些输出进行归并排序的效率也会更高。
OutOfMemory error
在新算法的开发和测试初期,我们发现Mapper常常会遇到OutOfMemory而异常终止;总结下来,以下情况往往会导致该异常:
a) Hadoop Mapper所分配的堆内存较小;
b) Cube中使用了"Distinct count" (HyperLogLog会占用较大内存);
c) Cube的维度较多,导致生成树较深;
d) 分配到Mapper的数据块过大;
简单的增大Mapper的JVM heap size可以暂时解决该问题;但是不是每个用户的Hadoop机器都有大内存;算法需要足够的健壮性和适应性,否则用户会很头疼;我们花了不少努力来优化该算法,例如主动探测OOM的发生,将堆栈中的Cuboid缓存到本地磁盘等;这一系列优化在eBay内部测试的结果非常好,OOM的发生率大大降低,而性能没有明显的下降。
下面我们对快速Cube算法做一个总结。
算法优点
算法缺点
CIO之家 www.ciozj.com 公众号:imciow