雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

Spark on yarn

Posted on 2016-05-23 | In Spark |

模式:

  • yarn-cluster:
    Spark的driver运行YARN集群启动的一个application master进程中,client在初始化application后可以消失。
    Spark on YARN集群模式作业运行全过程分析
  • yarn-client:
    Spark的driver运行在client进程中,而application master只用来向YARN申请资源。
    Spark on YARN客户端模式作业运行全过程分析

Deployment Mode Summary

Mode YARN Client Mode YARN Cluster Mode
Driver runs in Client ApplicationMaster
Requests resources ApplicationMaster ApplicationMaster
Starts executor processes YARN NodeManager YARN NodeManager
Persistent services YARN ResourceManager and NodeManagers YARN ResourceManager and NodeManagers
Supports Spark Shell Yes No

参考:
Spark:Yarn-cluster和Yarn-client区别与联系
Running Spark Applications on YARN

启动App

在yarn-cluster模式中启动一个application:

1
2
3
4
5
6
7
8
9
10
11
12
13
./bin/spark-submit --class path.to.your.Class --master yarn-cluster [options] <app jar> [app options]

例如:

SPARK_JAR=hdfs://hansight/libs/spark-assembly-1.0.2-hadoop2.4.0.2.1.4.0-632.jar \
./bin/spark-submit --class org.apache.spark.examples.SparkPI \
--master yarn-cluster \
--num-executors 3 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
lib/spark-examples*.jar \
10

yarn的资源调度

YARN Capacity Scheduler 简介
YARN Independent RM 指标: Weight, Virtual Cores, Min and Max Memory, Max Running Apps, and Scheduling Policy

搭建环境

Spark On YARN 集群安装部署

Spark和MySQL交互

Posted on 2016-05-23 | In Spark |

Spark读取MySQL

Spark读取数据库(Mysql)的四种方式讲解
Spark与Mysql(JdbcRDD)整合开发
改写Spark JdbcRDD,支持自己定义分区查询条件
spark jdbc(mysql) 操作并发度优化

一、不指定查询条件

  这个方式链接MySql的函数原型是:

1
def jdbc(url: String, table: String, properties: Properties): DataFrame

我们只需要提供Driver的url,需要查询的表名,以及连接表相关属性properties。下面是具体例子:

1
2
3
4
5
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", prop )
println(df.count())
println(df.rdd.partitions.size)

我们运行上面的程序,可以看到df.rdd.partitions.size输出结果是1,这个结果的含义是iteblog表的所有数据都是由RDD的一个分区处理的,所以说,如果你这个表很大,很可能会出现OOM

1
2
WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14, spark047219):
java.lang.OutOfMemoryError: GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)

这种方式在数据量大的时候不建议使用。

二、指定数据库字段的范围

  这种方式就是通过指定数据库中某个字段的范围,但是遗憾的是,这个字段必须是数字,来看看这个函数的函数原型:

1
2
3
4
5
6
7
8
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame

  前两个字段的含义和方法一类似。columnName就是需要分区的字段,这个字段在数据库中的类型必须是数字;lowerBound就是分区的下界;upperBound就是分区的上界;numPartitions是分区的个数。同样,我们也来看看如何使用:

1
2
3
4
5
6
7
val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", "id", lowerBound, upperBound, numPartitions, prop)

这个方法可以将iteblog表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想情况下,每个分区处理相同数量的数据,我们在使用的时候不建议将这个值设置的比较大,因为这可能导致数据库挂掉!但是根据前面介绍,这个函数的缺点就是只能使用整形数据字段作为分区关键字。

  这个函数在极端情况下,也就是设置将numPartitions设置为1,其含义和第一种方式一致。

三、根据任意字段进行分区

  基于前面两种方法的限制,Spark还提供了根据任意字段进行分区的方法,函数原型如下:

1
2
3
4
5
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame

这个函数相比第一种方式多了predicates参数,我们可以通过这个参数设置分区的依据,来看看例子:

1
2
3
4
5
6
val predicates = Array[String]("reportDate <= '2014-12-31'", 
"reportDate > '2014-12-31' and reportDate <= '2015-12-31'")
val url = "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog"

val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog", predicates, prop)

最后rdd的分区数量就等于predicates.length。

四、通过load获取

Spark还提供通过load的方式来读取数据。

1
2
3
sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://www.iteblog.com:3306/iteblog?user=iteblog&password=iteblog",
"dbtable" -> "iteblog")).load()

options函数支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions选项,细心的同学肯定发现这个和方法二的参数一致。是的,其内部实现原理部分和方法二大体一致。同时load方法还支持json、orc等数据源的读取。

Spark 写入MySQL

Spark将计算结果写入到Mysql中
Spark RDD写入RMDB(Mysql)方法二

Spark 资料整理

Posted on 2016-05-23 | In Spark |

一 快速入门

Apache Spark快速入门:基本概念和例子(1)
Apache Spark快速入门:基本概念和例子(2)
Spark Streaming整体执行流程http://flykobe.com/index.php/2016/03/22/spark-streaming-execution/

二 应用调试

2.1 Spark应用程序运行的日志存在哪里

Spark应用程序运行的日志存在哪里
Spark日志确切的存放路径和部署模式相关:

  1. 如果是Spark Standalone模式,我们可以直接在Master UI界面查看应用程序的日志,在默认情况下这些日志是存储在worker节点的work目录下,这个目录可以通过SPARK_WORKER_DIR参数进行配置。
  2. 如果是Mesos模式,我们同样可以通过Mesos的Master UI界面上看到相关应用程序的日志,这些日志是存储在Mesos slave的work目录下。
  3. 如果是YARN模式,最简单地收集日志的方式是使用YARN的日志收集工具(
    logs -applicationId``` ),这个工具可以收集你应用程序相关的运行日志,但是这个工具是有限制的:**应用程序必须运行完**,因为YARN必须首先聚合这些日志;而且你必须开启日志聚合功能(yarn.log-aggregation-enable,在默认情况下,这个参数是false)。
    1
    2
    3
    4
    ``` powershell
    ./bin/yarn logs -applicationId application_1452073024926_19404
    or
    Tracking UI --> ApplicationMaster --> Executors菜单 --> Logs

2.2 通过可视化途径理解你的Spark应用程序

通过可视化途径理解你的Spark应用程序

1
2
Tracking UI --> ApplicationMaster -->Jobs菜单/Stages菜单
分析shuffle的时间

2.3 Spark作业代码(源码)IDE远程调试

Spark作业代码(源码)IDE远程调试

2.4 不要直接继承scala.App来提交代码

如果你继承了App trait,那么里面的变量被当作了单例类的field了;而如果是main方法,则当作是局部变量了。而且trait App是继承了DelayedInit,所以里面的变量只有用到了main方法的时候才会被初始化。
Spark程序编写:继承App的问题

三 REST API

Spark 1.4中REST API介绍
目前这个API支持正在运行的应用程序,也支持历史服务器。在请求URL都有/api/v1。比如,对于历史服务器来说,我们可以通过http://:18080/api/v1来获取一些信息;对于正在运行的Spark应用程序,我们可以通过http://www.iteblog.com:4040/api/v1来获取一些信息。

四 文件操作

4.1 Spark多文件输出(MultipleOutputFormat)

Spark多文件输出(MultipleOutputFormat)

4.2 Json

1
2
3
4
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sqlContext.jsonFile("[the path to file people]")
people.registerTempTable("people")
people.printSchema()
1
2
3
CREATE TEMPORARY TABLE people
USING org.apache.spark.sql.json
OPTIONS (path '[the path to the JSON dataset]')

Spark SQL中对Json支持的详细介绍

五 shuffle

Spark shuffle:hash和sort性能对比
随着mapper数量或者Reduce数量的增加,基于hash的shuffle实现的表现比基于sort的shuffle实现的表现越来越糟糕。基于这个事实,在Spark 1.2版本,默认的shuffle将选用基于sort的。在MLlib下,基于sort的Shuffle并不一定比基于hash的Shuffle表现好,所以程序选择哪个Shuffle实现是需要考虑到具体的场景,如果内置的Shuffle实现不能满足自己的需求,我们完全可以自己实现一个Shuffle。用户自定义的Shuffle必须继承ShuffleManager类,重写里面的一些方法。

六 partition

Spark分区器HashPartitioner和RangePartitioner代码详解
Spark自定义分区(Partitioner)
Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

1
2
3
4
5
6
7
8
9
10
package org.apache.spark

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

  • def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
  • def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
  • equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

七 序列化

Spark Task序列化代码分析
在Spark中自定义Kryo序列化输入输出API
require-kryo-serialization-in-spark-scala

解决序列化问题:

  1. 使用lazy引用(Lazy Reference)来实现
    lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
  2. 把对需要序列化对象的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理

八 RDD

Spark RDD API扩展开发(1): 自定义函数
Spark RDD API扩展开发(2):自定义RDD

九 kafka

Spark Streaming和Kafka整合开发指南(一)
Spark Streaming和Kafka整合开发指南(二)
Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现
Spark Streaming和Kafka整合是如何保证数据零丢失
Kafka+Spark Streaming+Redis实时系统实践

Spark函数讲解

Posted on 2016-05-18 | In Spark |

cogroup

将多个RDD中同一个Key对应的Value组合到一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], 
other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) :
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int) :
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) :
RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

cogroup函数原型一共有九个(真多)!最多可以组合四个RDD。

1
2
3
4
5
6
7
8
9
10
val data1 = sc.parallelize(List((1, "www"), (2, "bbs")))
val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very"), (3, "good")))
val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good")))
val result = data1.cogroup(data2, data3)
result.collect.foreach(println)

------------------------------------
(2,(CompactBuffer(bbs),CompactBuffer(iteblog),CompactBuffer(com)))
(1,(CompactBuffer(www),CompactBuffer(iteblog),CompactBuffer(com)))
(3,(CompactBuffer(),CompactBuffer(very, good),CompactBuffer(good)))

combineByKey

使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。

函数原型

1
2
3
4
5
6
7
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
mergeCombiners: (C, C) => C) : RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
Boolean = true, serializer: Serializer = null): RDD[(K, C)]

  第一个和第二个函数都是基于第三个函数实现的,使用的是HashPartitioner,Serializer为null。而第三个函数我们可以指定分区,如果需要使用Serializer的话也可以指定。combineByKey函数比较重要,我们熟悉地诸如aggregateByKey、foldByKey、reduceByKey等函数都是基于该函数实现的。默认情况会在Map端进行组合操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),(2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val result = data.combineByKey(List(_), (x: List [String], y: String) => y :: x, (x: List[String],y:List[String]) => x ::: y)
result.collect.foreach(println)

(2,List(com, iteblog, bbs))
(1,List(com, iteblog, www))
(3,List(good))

val data2 = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3)))
val result2 = data2.combineByKey(x => x,(x: Int, y:Int) => x + y, (x:Int, y: Int) => x + y)
result2.collect.foreach(println)

(iteblog,4)
(bbs,1)

coalesce

对RDD中的分区重新进行合并。

函数原型

1
2
def coalesce(numPartitions: Int, shuffle: Boolean = false)
    (implicit ord: Ordering[T] = null): RDD[T]

返回一个新的RDD,且该RDD的分区个数等于numPartitions个数。如果shuffle设置为true,则会进行shuffle。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var data = sc.parallelize(List(1,2,3,4))
val result = data.coalesce(2, false)
println(result.partitions.length)
println(result.toDebugString)
val result1 = data.coalesce(2, true)
println(result1.toDebugString)

2
(2) CoalescedRDD[1] at coalesce at CoalesceSuite.scala:16 []
| ParallelCollectionRDD[0] at parallelize at CoalesceSuite.scala:15 []

(2) MapPartitionsRDD[5] at coalesce at CoalesceSuite.scala:19 []
| CoalescedRDD[4] at coalesce at CoalesceSuite.scala:19 []
| ShuffledRDD[3] at coalesce at CoalesceSuite.scala:19 []
+-(2) MapPartitionsRDD[2] at coalesce at CoalesceSuite.scala:19 []
| ParallelCollectionRDD[0] at parallelize at CoalesceSuite.scala:15 []

从上面可以看出shuffle为false的时候并不进行shuffle操作;而为true的时候会进行shuffle操作。RDD.partitions.length可以获取相关RDD的分区数。

checkpoint

为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

1
def checkpoint()

scala>val data = sc.parallelize(1 to 100000 , 15)
scala> sc.setCheckpointDir(“/app/ecom/cm/ods/tmp/sunke/spark/checkpoint”)
scala> data.checkpoint
scala> data.count
res3: Long = 100000

结果:

[work@yq01-cm-m32-201502nova228.yq01.baidu.com ~]$ hadoop fs -ls /app/ecom/cm/ods/tmp/sunke/spark/checkpoint
16/05/18 11:27:20 INFO common.UpdateService: ZkstatusUpdater to yq01-wutai-hdfs.dmop.baidu.com:54310 started
Found 1 items
drwxr-xr-x 3 ods ods 0 2016-05-18 11:26 /app/ecom/cm/ods/tmp/sunke/spark/checkpoint/590a5e9b-26de-416f-aecb-22ea5e29d893
执行完count之后,会在checkpoint目录下产生出多个(数量和你分区个数有关)二进制的文件。

cartesian

从名字就可以看出这是笛卡儿的意思,就是对给的两个RDD进行笛卡儿计算。官方文档说明:Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.

1
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

该函数返回的是Pair类型的RDD,计算结果是当前RDD和other RDD中每个元素进行笛卡儿计算的结果。最后返回的是CartesianRDD。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val a = sc.parallelize(List(1,2,3))
val b = sc.parallelize(List(4,5,6))
val result = a.cartesian(b)
result.collect.foreach(println)

(1,4)
(1,5)
(1,6)
(2,4)
(3,4)
(2,5)
(2,6)
(3,5)
(3,6)

笛卡儿计算是很恐怖的,它会迅速消耗大量的内存,所以在使用这个函数的时候请小心。

cache

使用MEMORY_ONLY储存级别对RDD进行缓存,其内部实现是调用persist()函数的。官方文档定义:Persist this RDD with the default storage level (MEMORY_ONLY).

1
def cache() : this.type

scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[44] at parallelize at :12
scala> data.getStorageLevel
res65: org.apache.spark.storage.StorageLevel =
  StorageLevel(false, false, false, false, 1)
scala> data.cache
res66: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[44] at parallelize at :12
scala> data.getStorageLevel
res67: org.apache.spark.storage.StorageLevel =
  StorageLevel(false, true, false, true, 1)
我们先是定义了一个RDD,然后通过getStorageLevel函数得到该RDD的默认存储级别,这里是NONE。然后我们调用cache函数,将RDD的存储级别改成了MEMORY_ONLY(看StorageLevel的第二个参数)。关于StorageLevel的其他的几种存储级别介绍请参照StorageLevel类进行了解。

aggregate

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
  aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def seqOP(a: Int, b: Int): Int = {
println("seqOp: " + a + "\t" + b)
a + b
}

def combOp(a: Int, b: Int): Int = {
println("combOp: " + a + "\t" + b)
a + b
}

val z1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val result1 = z1.aggregate(3)(seqOP, combOp)
println("result1: "+result1)

计算过程为:
partition内部聚合:
3(初始值)+1+2+3=9
3(初始值)+4+5+6=18
combine:
3(初始值)+9+18

result1: 30

1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2、从aggregate 函数的定义可知,combine函数的输出类型必须和输入的类型一致

aggregateByKey

该函数和aggregate类似,但操作的RDD是Pair类型的。
Aggregate the values of each key, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

1
2
3
4
5
6
7
8
9
10
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外,其函数声明和aggregate很类似;其他的aggregateByKey函数实现最终都是调用这个。

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner。
def aggregateByKey[U: ClassTag](zeroValue: U)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3)))

def seq(a:Int, b:Int) : Int ={
println("seq: " + a + "\t " + b)
a + b
}

def comb(a:Int, b:Int) : Int ={
println("comb: " + a + "\t " + b)
a + b
}

val result = data.aggregateByKey(1)(seq, comb)
result.collect.foreach(println)
计算过程:
1(初始值)+3+2 1(初始值)+4
1(初始值)+3
(2,4)
(1,11)

aggregateByKey和aggregate结果有点不一样。如果用aggregate函数对含有3、2、4三个元素的RDD进行计算,初始值为1的时候,计算的结果应该是10,而这里是9,这是因为aggregate函数中的初始值需要和reduce函数以及combine函数结合计算,而aggregateByKey中的初始值只需要和reduce函数计算,不需要和combine函数结合计算,所以导致结果有点不一样。

sortBy和sortByKey

在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。

sortBy

sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:
该函数最多可以传三个参数:
  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
  第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

1
2
3
4
5
6
7
8
9
10
11
/**
* Return this RDD sorted by the given key function.
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values

从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了,它的实现如下:

1
2
3
4
5
6
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = {
map(x => (f(x), x))
}

Demo:

1
2
3
4
val data = List(3,1,90,3,5,12)
val rdd = sc.parallelize(data)
val result = rdd.sortBy(x => x, false)
result.collect.foreach(println)

sortByKey函数

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下:

1
2
3
4
5
6
7
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。

1
2
3
4
5
6
7
8
9
10
11
val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
val b = sc.parallelize(List(3, 1, 9, 12, 4))
val c = b.zip(a)
c.sortByKey().collect

// 修改默认的排序规则
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) =
a.toString.compare(b.toString)
}
c.sortByKey().collect

collectAsMap

功能和collect函数类似。该函数用于Pair RDD,最终返回Map类型的结果。官方文档说明:
Return the key-value pairs in this RDD to the master as a Map.
Warning: this doesn’t return a multimap (so if you have multiple values to the same key, only one value per key is preserved in the map returned

1
def collectAsMap(): Map[K, V]

1
2
3
val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
data.collectAsMap.foreach(print)
(2,com)(1,com)(3,good)

从结果我们可以看出,如果RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value。

常见Web容器的ClassLoader机制

Posted on 2016-05-05 | In Web |

流web容器(jetty,tomcat,jboss)的classloader机制对比和相关问题分析

SpringTest整合JUnit4使用总结

Posted on 2016-05-04 | In Spring |

一 加入依赖包

使用Spring的测试框架需要加入以下依赖包:

  • JUnit 4 (官方下载:http://www.junit.org/)
  • Spring Test (Spring框架中的test包)
  • Spring 相关其他依赖包(不再赘述了,就是context等包)

    二 创建测试源目录和包

    在此,推荐创建一个和src平级的源文件目录,因为src内的类都是为日后产品准备的,而此处的类仅仅用于测试。而包的名称可以和src中的目录同名,这样由于在test源目录中,所以不会有冲突,而且名称又一模一样,更方便检索。

    三 创建测试类

    创建一个测试用的类,推荐名称为 “被测试类名称 + Test”。
    测试类应该继承与
    ```AbstractTransactionalJUnit4SpringContextTests```
    1
    2
    3
    4
    5
    6
    7
    对于 AbstractJUnit4springcontextTests 和 AbstractTransactionalJUnit4SpringContextTests 类的选择:
    如果再你的测试类中,需要用到事务管理(比如要在测试结果出来之后回滚测试内容),就可以使用AbstractTransactionalJUnit4SpringTests类。事务管理的使用方法和正常使用Spring事务管理是一样的。再此需要注意的是,如果想要使用声明式事务管理,即使用AbstractTransactionalJUnitSpringContextTests类,请在applicationContext.xml文件中加入transactionManager bean:
    ``` xml
    <bean id="transactionManager"
    class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
    </bean>

如果没有添加上述bean,将会抛出NoSuchBeanDefinitionException,指明 No bean named ‘transactionManager’ is definded.

四 配置测试类

添加如下内容在class前,用于配置applicationContext.xml文件的位置。

1
2
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")

五 创建测试方法

创建测试用方法,推荐名称为 “被测方法名称+ Test”。
测试方法上方加入 @Test

六 通过JUnit 4 执行

右键方法名,选择则“Run As”→“JUnit Test”即可

附录1:整体测试类文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/* @(#) UserDaoTest.java
*
*/
package com.phj.dao;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.phj.entity.User;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class UserDaoTest extends AbstractJUnit4SpringContextTests {

@Resource
private UserDaoInterface userDao;

@Test
public void saveTest() {

User user1 = new User();
user1.setUsername("tom");
user1.setPassword("123456");
user1.setNickName("tom");
user1.setEmail("tom@gmail.com");

User user2 = new User();
user2.setUsername("admin");
user2.setPassword("123456");
user2.setNickName("admin");
user2.setEmail("admin@admin.com");

User user3 = new User();
user3.setUsername("feihong");
user3.setPassword("123456");
user3.setNickName("phj");
user3.setEmail("test@gmail.com");

userDao.save(user1);
userDao.save(user2);
userDao.save(user3);
}
}

Hive的Transform和UDF

Posted on 2016-05-03 | In Hive |

UDTF

  • Hive中UDTF编写和使用

UDAF

  • Hive udaf开发入门和运行过程详解
  • Hive通用型自定义聚合函数(UDAF)

Hive中的TRANSFORM:使用脚本完成Map/Reduce

转自: http://www.coder4.com/archives/4052

首先来看一下数据:

1
2
3
4
5
hive> select * from test;
OK
1 3
2 2
3 1

假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

我们看一下Python的代码:

1
2
3
4
5
6
7
8
9
10
11
12
#!/home/tops/bin/python

import sys
import hashlib

for line in sys.stdin:
line = line.strip()
arr = line.split()
md5_arr = []
for a in arr:
md5_arr.append(hashlib.md5(a).hexdigest())
print "\t".join(md5_arr)

在Hive中,使用脚本,首先要将他们加入:

1
add file /xxxx/test.py

然后,在调用时,使用TRANSFORM语法。

1
2
3
4
5
6
SELECT 
TRANSFORM (col1, col2)
USING './test.py'
AS (new1, new2)
FORM
test;

这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

这里有一个小坑:有时候,我们结合INSERT OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是\t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是\t。不要考虑外面其他的分割符号!

最后,解释一下MAP、REDUCE。

在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

1
Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!

所以、混用map reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add file。

如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

例如:

1
2
3
4
5
CREATE TABLE features
(
id BIGINT,
norm_features MAP<STRING, FLOAT>
);

答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

例如,以上面的表结构为例,每行输出应为:

1
1^Ifeature1^C1.0^Bfeature2^C2.0

其中^I是tab键,这是TRANSFORM要求的分割符号。^B和^C是Hive存储时MAP类型的KV分割符。

另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

1
2
3
SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 MAP<STRING, FLOAT>)

Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Mapper.
*/
public interface Mapper {
/**
* Maps a single row into an intermediate rows.
*
* @param record
* input record
* @param output
* collect mapped rows.
* @throws Exception
* on error
*/
void map(String[] record, Output output) throws Exception;
}

可以将一列拆分为多列

使用样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ExecuteMap {

private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";

private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();

public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new Exception("Process class must be given");
}

new GenericMR().map(System.in, System.out,
getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
}

private static Mapper getMapper(String parserClass, String[] args)
throws ClassNotFoundException {
if (mappers.containsKey(parserClass)) {
return mappers.get(parserClass);
}

Class[] classes = new Class[args.length];
for (int i = 0; i < classes.length; ++i) {
classes[i] = String.class;
}
try {
Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
mappers.put(parserClass, mapper);
return mapper;
} catch (ClassNotFoundException e) {
throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
} catch (Exception e) {
throw new ClassNotFoundException("Error Constructing processor", e);
}

}
}
1
2
3
4
MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "

COMMAND="FROM dw_rtb.event_fact_adx_auction "
COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "

Hive Python Streaming的原理及写法

http://www.tuicool.com/articles/vmumUjA

REST API 设计最佳实践

Posted on 2016-05-02 | In REST |

RESTful API 设计最佳实践

Spark SQL汇总

Posted on 2016-05-02 | In Spark |

使用Spark SQL读取Hive上的数据

Spark 注册UDF-Scala实现

Spark SQL使用Hive的UDF

spark-sql thriftserver 方式使用hive udf函数

Spark SQL使用Hive的UDF并且注册临时表

Java基础:Object类中的方法解析

Posted on 2016-04-24 | In Java |

概述

Object类是类层次结构的根,Java中所有的类从根本上都继承自这个类。Object类是Java中唯一没有父类的类。其他所有的类,包括标准容器类,比如数组,都继承了Object类中的方法。

Object类位于java.lang包中,java.lang包包含着Java最基础和核心的类,在编译时会自动导入。Object类没有定义属性,一共有13个方法,具体的类定义结构如下图:

Object类中的方法

Java中的每个类都具有定义在Object类中的这些方法。

构造方法

Object()```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
大部分情况下,Java中通过形如 new A(args..)形式创建一个属于该类型的对象。其中A即是类名,A(args..)即此类定义中相对应的构造函数。通过此种形式创建的对象都是通过类中的构造函数完成。为体现此特性,Java中规定:在类定义过程中,对于未定义构造函数的类,默认会有一个无参数的构造函数,作为所有类的基类,Object类自然要反映出此特性,在源码中,未给出Object类构造函数定义,但实际上,此构造函数是存在的。

当然,并不是所有的类都是通过此种方式去构建,也自然的,并不是所有的类构造函数都是public。

## private static native void registerNatives();

registerNatives函数前面有native关键字修饰,Java中,用native关键字修饰的函数表明该方法的实现并不是在Java中去完成,而是由C/C++去完成,并被编译成了.dll,由Java去调用。方法的具体实现体在dll文件中,对于不同平台,其具体实现应该有所不同。用native修饰,即表示操作系统,需要提供此方法,Java本身需要使用。具体到registerNatives()方法本身,其主要作用是将C/C++中的方法映射到Java中的native方法,实现方法命名的解耦。

## protected Object clone()

Creates and returns a copy of this object.

Object类中的说明是:
``` java
  protected Object clone() throws CloneNotSupportedException

这个方法比较特殊:
首先,使用这个方法的类必须实现java.lang.Cloneable接口,否则会抛出CloneNotSupportedException异常。
Cloneable接口中不包含任何方法,所以实现它时只要在类声明中加上implements语句即可。

第二个比较特殊的地方在于这个方法是protected修饰的,覆写clone()方法的时候需要写成public,才能让类外部的代码调用。

public final native Class<?> getClass();

getClass()也是一个native方法,返回的是此Object对象的类对象/运行时类对象Class<?>。效果与Object.class相同。

首先解释下”类对象”的概念:在Java中,类是是对具有一组相同特征或行为的实例的抽象并进行描述,对象则是此类所描述的特征或行为的具体实例。作为概念层次的类,其本身也具有某些共同的特性,如都具有类名称、由类加载器去加载,都具有包,具有父类,属性和方法等。于是,Java中有专门定义了一个类,Class,去描述其他类所具有的这些特性,因此,从此角度去看,类本身也都是属于Class类的对象。为与经常意义上的对象相区分,在此称之为”类对象”。

boolean equals(Object obj)

Indicates whether some other object is “equal to” this one.
“==”运算符判断两个引用是否指向同一个对象。==表示的是变量值完成相同(对于基础类型,地址中存储的是值,引用类型则存储指向实际对象的地址);
equals表示的是对象的内容完全相同,此处的内容多指对象的特征/属性。

对于Object类的equals()方法来说,它判断调用equals()方法的引用于传进来的引用是否一致,即这两个引用是否指向的是同一个对象。

Object类中的equals()方法如下:

1
2
3
public boolean equals(Object obj) {
return (this == obj);
}

即Object类中的equals()方法等价于==。

只有当继承Object的类覆写(override)了equals()方法之后,继承类实现了用equals()方法比较两个对象是否相等,才可以说equals()方法与==的不同。

equals()方法需要具有如下特点:

  • 自反性(reflexive):任何非空引用x,x.equals(x)返回为true。
  • 对称性(symmetric):任何非空引用x和y,x.equals(y)返回true当且仅当y.equals(x)返回true。
  • 传递性(transitive):任何非空引用x和y,如果x.equals(y)返回true,并且y.equals(z)返回true,那么x.equals(z)返回true。
  • 一致性(consistent):两个非空引用x和y,x.equals(y)的多次调用应该保持一致的结果,(前提条件是在多次比较之间没有修改x和y用于比较的相关信息)。
  • 约定:对于任何非空引用x,x.equals(null)应该返回为false。
  • 并且覆写equals()方法时,应该同时覆写hashCode()方法,反之亦然。

int hashCode()

Returns a hash code value for the object.

当你覆写(override)了equals()方法之后,必须也覆写hashCode()方法,反之亦然。

这个方法返回一个整型值(hash code value),如果两个对象被equals()方法判断为相等,那么它们就应该拥有同样的hash code。

Object类的hashCode()方法为不同的对象返回不同的值,Object类的hashCode值表示的是对象的地址。

hashCode的一般性契约(需要满足的条件)如下:

1.在Java应用的一次执行过程中,如果对象用于equals比较的信息没有被修改,那么同一个对象多次调用hashCode()方法应该返回同一个整型值。应用的多次执行中,这个值不需要保持一致,即每次执行都是保持着各自不同的值。
2.如果equals()判断两个对象相等,那么它们的hashCode()方法应该返回同样的值。
3.并没有强制要求如果equals()判断两个对象不相等,那么它们的hashCode()方法就应该返回不同的值。即,两个对象用equals()方法比较返回false,它们的hashCode可以相同也可以不同。但是,应该意识到,为两个不相等的对象产生两个不同的hashCode可以改善哈希表的性能。

WHY?
其实,这主要体现在hashCode()方法的作用上,其主要用于增强哈希表的性能。

以集合类中,以Set为例,当新加一个对象时,需要判断现有集合中是否已经存在与此对象相等的对象,如果没有hashCode()方法,需要将Set进行一次遍历,并逐一用equals()方法判断两个对象是否相等,此种算法时间复杂度为o(n)。通过借助于hasCode方法,先计算出即将新加入对象的哈希码,然后根据哈希算法计算出此对象的位置,直接判断此位置上是否已有对象即可。(注:Set的底层用的是Map的原理实现)

String toString()

Returns a string representation of the object.
当打印引用,如调用System.out.println()时,会自动调用对象的toString()方法,打印出引用所指的对象的toString()方法的返回值,因为每个类都直接或间接地继承自Object,因此每个类都有toString()方法。

Object类中的toString()方法定义如下:

1
2
3
public String toString() {
return getClass().getName() + "@" + Integer.toHexString(hashCode());
}

wait(…) / notify() / notifyAll()

protected void finalize()

finalize方法主要与Java垃圾回收机制有关。

1…789…11

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4