雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

Hadoop的安全机制

Posted on 2017-03-28 | In Hadoop |

Hadoop Kerberos安全机制介绍

参考: http://dongxicheng.org/mapreduce/hadoop-kerberos-introduction/

在Hadoop1.0.0或者CDH3版本后,加入了Kerberos认证机制。使得集群中的节点就是它们所宣称的,是信赖的。Kerberos可以将认证的密钥在集群部署时事先放到可靠的节点上。集群运行时,集群内的节点使用密钥得到认证。只有被认证过节点才能正常使用。企图冒充的节点由于没有事先得到的密钥信息,无法与集群内部的节点通信。防止了恶意的使用或篡改Hadoop集群的问题,确保了Hadoop集群的可靠安全。

  • 解决服务器到服务器的认证
    由于kerberos对集群里的所有机器都分发了keytab,相互之间使用密钥进行通信,确保不会冒充服务器的情况。集群中的机器就是它们所宣称的,是可靠的。
    防止了用户伪装成Datanode,Tasktracker,去接受JobTracker,Namenode的任务指派。
  • 解决client到服务器的认证
    Kerberos对可信任的客户端提供认证,确保他们可以执行作业的相关操作。防止用户恶意冒充client提交作业的情况。
    用户无法伪装成其他用户入侵到一个HDFS 或者MapReduce集群上
    用户即使知道datanode的相关信息,也无法读取HDFS上的数据
    用户无法发送对于作业的操作到JobTracker上

SparkSQL有必要坐下来聊聊Join

Posted on 2017-03-28 | In Spark |

转自 http://hbasefly.com/2017/03/19/sparksql-basic-join/

HashJoin

Broadcast Hash Join

Shuffle Hash Join

Sort-Merge Join

Java的通用I/O API设计

Posted on 2017-03-19 | In Java |

博客

wiki说明

代码库

How to Design a Good API and Why it Matters(by Joshua Bloch)

作为Scala语法糖的设计模式

Posted on 2017-03-15 | In Scala |

转载: http://zhangyi.farbox.com/post/designthinking/design-patterns-with-scala-syntax-sugar

Scala算是一门博采众家之长的语言,兼具OO与FP的特性,若使用恰当,可以更好地将OO与FP的各自优势发挥到极致;然而问题也随之而来,倘若过分地夸大OO特性,Scala就变成了一门精简版的Java,写出的是没有Scala Style的拙劣代码;倘若过分追求FP的不变性等特性,因为Scala在类型系统以及Monad实现的繁琐性,又可能导致代码变得复杂,不易阅读,反而得不偿失。

看来,赋予程序员选择的自由,有时候未必是好事!

在OO世界里,设计模式曾经风靡全世界,你不懂设计模式,都不好意思说自己是程序员。现在呢?说你懂设计模式,倒显得你逼格低了,心里鄙视:“这年头谁还用设计模式,早过时了!”程序员心中的鄙视链开始加成,直接失血二十格。

其实什么事情都得辩证来看!设计模式对OO设计的推进作用不容忽视,更不容轻视。我只是反对那种为了“模式”而“模式”的僵化思想,如果没有明白设计模式的本质思想,了解根本的设计原理,设计模式无非就是花拳绣腿罢了。当然,在FP世界里,设计模式开始变味开始走形,但诸多模式的本质,例如封装、抽象,仍然贯穿其中,不过是表达形式迥然而已罢了。

在混合了OO与FP的Scala语言中,我们来观察设计模式的实现,会非常有趣。Pavel Fatin有篇博客Design Patterns in Scala将Java设计模式与Scala进行了对比,值得一读。我这里想借用他的案例,然后从另一个角度来俯瞰设计模式。

在Pavel Fatin比较的设计模式中,部分模式在Scala中不过是一种语法糖(Syntax Sugar),包括:

  • Factory Method
  • Lazy Initialization
  • Singleton
  • Adapter
  • Value Object
  • Factory Method

文中给出的Factory Method模式,准确地说其实是静态工厂模式,它并不在GOF 23种模式之列,但作为对复杂创建逻辑的一种封装,常常被开发人员使用。站在OCP(开放封闭原则)的角度讲,该模式对扩展不是开放的,但对于修改而言,却是封闭的。如果创建逻辑发生了变化,可以保证仅修改该静态工厂方法一处。同时,该模式还可以极大地简化对象创建的API。

在Scala中,通过引入伴生对象(Companion Object)来简化静态工厂方法,语法更加干净,体现了Scala精简的设计哲学。即使不是要使用静态工厂,我们也常常建议为Scala类定义伴生对象,尤其是在DSL上下文中,更是如此,因为这样可以减少new关键字对代码的干扰。

Lazy Initialization

lazy修饰符在Scala中有更深远的涵义,例如牵涉到所谓严格(Strictness)函数与非严格(Non-strictness)函数。在Scala中,若未明确声明,所有函数都是严格求值的,即函数会立即对它的参数进行求值。而如果对val变量添加lazy修饰符,则Scala会延迟对该变量求值,直到它第一次被引用时。如果要定义非严格函数,可以将函数设置为by name参数。

scala的lazy修饰符常常被用作定义一些消耗资源的变量。这些资源在初始化时并不需要,只有在调用某些方法时,才需要准备好这些资源。例如在Spark SQL的QeuryExecution类中,包括optimizedPlan、sparkPlan、executedPlan以及toRdd等,都被定义为lazy val:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.execute(logical)
}
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
}

这样设计有一个好处是,当程序在执行到这些步骤时,并不会被马上执行,从而使得初始化QueryExecution变得更快。只有在需要时,这些变量对应的代码才会执行。这也是延迟加载的涵义。

Singleton Pattern

C#提供了静态类的概念,但Java没有,而Scala则通过引入Object弥补了Java的这一缺失,而且从语义上讲,似乎比静态类(Static Class)更容易让人理解。

Object可以派生自多个trait。例如派生自App trait,就可直接享有main函数的福利。

1
2
3
4
5
6
7
8
9
10
11
12
trait App extends DelayedInit {
def main(args: Array[String]) = {
this._args = args
for (proc <- initCode) proc()
if (util.Properties.propIsSet("scala.time")) {
val total = currentTime - executionStart
Console.println("[total " + total + "ms]")
}
}
}

object Main extends App

继承多个trait的好处是代码复用。我们可以将许多小粒度方法的实现定义在多个trait中。这些方法如果被类继承,则成为实例方法,如果被Object继承,则变成了线程安全的静态方法(因为继承trait的实现就是一个mixin)。多么奇妙!所以很多时候,我们会尽量保证Obejct的短小精悍,然后将许多逻辑放到trait中。当你看到如下代码时,其实不必惊讶:

1
2
3
4
5
object Main extends App 
with InitHook
with ShutdownHook
with ActorSystemProvider
with ScheduledTaskSupport

这种小粒度的trait既可以保证代码的复用,也有助于职责分离,还有利于测试。真是再好不过了!

Adapter Pattern

隐式转换当然可以用作Adapter。在Scala中,之所以可以更好地调用Java库,隐式转换功不可没。从语法上看,隐式转换比C#提供的扩展方法更强大,适用范围更广。

Pavel Fatin给出了日志转换的Adapter案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
trait Log {
def warning(message: String)
def error(message: String)
}

final class Logger {
def log(level: Level, message: String) { /* ... */ }
}

implicit class LoggerToLogAdapter(logger: Logger) extends Log {
def warning(message: String) { logger.log(WARNING, message) }
def error(message: String) { logger.log(ERROR, message) }
}

val log: Log = new Logger()

这里的隐式类LoggerToLogAdapter可以将Logger适配为Log。与Java实现Adapter模式不同的是,我们不需要去创建LoggerToLogAdapter的实例。如上代码中,创建的是Logger实例。Logger自身与Log无关,但在创建该对象的上下文中,由于我们定义了隐式类,当Scala编译器遇到该隐式类时,就会为Logger添加通过隐式类定义的代码,包括隐式类中定义的对Log的继承,以及额外增加的warning与error方法。

在大多数场景,Adapter关注的是接口之间的适配。但是,当要适配的接口只有一个函数时,在支持高阶函数(甚至只要支持Lambda)的语言中,此时的Adapter模式就味如鸡肋了。假设Log与Logger接口只有一个log函数(不管它的函数名是什么),接收的参数为(Level, String),那么从抽象的角度来看,它们其实属于相同的一个抽象:

1
f: (Level, String) => Unit

任何一个符合该定义的函数,都是完全适配的,没有类型与函数名的约束。

如果再加上泛型,抽象会更加彻底。例如典型的Load Pattern实现:

1
2
3
4
5
6
def using[A](r : Resource)(f : Resource => A) : A =
try {
f(r)
} finally {
r.dispose()
}

泛型A可以是任何类型,包括Unit类型。这里的f扩大了抽象范围,只要满足从Resource转换到A的语义,都可以传递给using函数。更而甚者可以完全抛开对Resource类型的依赖,只需要定义了close()方法,都可以作为参数传入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

def using[A <: def close():Unit, B][resource: A](f: A => B): B =
try {
f(resource)
} finally {
resource.close()
}

using(io.Source.fromFile("example.txt")) { source => {
for (line <- source.getLines) {
println(line)
}
}
}

因为FileResource定义了close()函数,所以可以作为参数传给using()函数。

Value Object

Value Object来自DDD中的概念,通常指的是没有唯一标识的不变对象。Java没有Value Object的语法,然而因其在多数业务领域中被频繁使用,Scala为其提供了快捷语法Case Class。在几乎所有的Scala项目中,都可以看到Case Class的身影。除了在业务中表现Value Object之外,还可以用于消息传递(例如AKKA在Actor之间传递的消息)、序列化等场景。此外,Case Class又可以很好地支持模式匹配,或者作为典型的代数数据类型(ADT)。例如Scala中的List,可以被定义为:

1
2
3
4

sealed trait List[+T]
case object Nil extends List[Nothing]
case class Cons[+T](h: T, t: List[T]) extends List[T]

这里,case object是一个单例的值对象。而Nil与Cons又都同时继承自一个sealed trait。在消息定义时,我们常常采用这样的ADT定义。例如List定义中,Nil与Cons就是List ADT的sum或者union,而Cons构造器则被称之为是参数h(代表List的head)与t(代表List的tail)的product。这也是ADT(algebraic data type)之所以得名。注意它与OO中的ADT(抽象数据类型)是风马牛不相及的两个概念。

Spark的UDF

Posted on 2017-03-15 | In Spark |

参考:
http://zhangyi.farbox.com/post/framework/udf-and-udaf-in-spark
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

UDF的引入极大地丰富了Spark SQL的表现力。一方面,它让我们享受了利用Scala(当然,也包括Java或Python)更为自然地编写代码实现函数的福利,另一方面,又能精简SQL(或者DataFrame的API),更加写意自如地完成复杂的数据分析。尤其采用SQL语句去执行数据分析时,UDF帮助我们在SQL函数与Scala函数之间左右逢源,还可以在一定程度上化解不同数据源具有歧异函数的尴尬。想想不同关系数据库处理日期或时间的函数名称吧!

UDF

注册制

用Scala编写的UDF与普通的Scala函数没有任何区别,唯一需要多执行的一个步骤是要让SQLContext注册它。例如:

1
2
3
4
5
6

def len(bookTitle: String):Int = bookTitle.length

sqlContext.udf.register("len", len _)

val booksWithLongTitle = sqlContext.sql("select title, author from books where len(title) > 10")

编写的UDF可以放到SQL语句的fields部分,也可以作为where、groupBy或者having子句的一部分。

若使用DataFrame的API,则可以以字符串的形式将UDF传入:

1
val booksWithLongTitle = dataFrame.filter("longLength(title, 10)")

非注册制

DataFrame的API也可以接收Column对象,可以用$符号来包裹一个字符串表示一个Column。$是定义在SQLContext对象implicits中的一个隐式转换。此时,UDF的定义也不相同,不能直接定义Scala函数,而是要用定义在org.apache.spark.sql.functions中的udf方法来接收一个函数。这种方式无需register:

1
2
3
4
5
6
import org.apache.spark.sql.functions._

val longLength = udf((bookTitle: String, length: Int) => bookTitle.length > length)

import sqlContext.implicits._
val booksWithLongTitle = dataFrame.filter(longLength($"title", $"10"))

不幸,运行这段代码会抛出异常:

1
cannot resolve '10' given input columns id, title, author, price, publishedDate;

因为采用$来包裹一个常量,会让Spark错以为这是一个Column。这时,需要定义在org.apache.spark.sql.functions中的lit函数来帮助:

1
val booksWithLongTitle = dataFrame.filter(longLength($"title", lit(10)))

UDAF

参考 https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html
普通的UDF却也存在一个缺陷,就是无法在函数内部支持对表数据的聚合运算。例如,当我要对销量执行年度同比计算,就需要对当年和上一年的销量分别求和,然后再利用同比公式进行计算。此时,UDF就无能为力了。该UDAF(User Defined Aggregate Function)粉墨登场的时候了。

Spark为所有的UDAF定义了一个父类UserDefinedAggregateFunction。要继承这个类,需要实现父类的几个抽象方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def inputSchema: StructType  输入参数类型,映射为每一个Field

def bufferSchema: StructType 中间结果类型

def dataType: DataType 返回结果

def deterministic: Boolean 对于一组输入是否输出相同的结果

def initialize(buffer: MutableAggregationBuffer): Unit 初始化buffer

def update(buffer: MutableAggregationBuffer, input: Row): Unit 更新row和buffer

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit merge两个buffer

def evaluate(buffer: Row): Any 计算最终结果

可以将inputSchema理解为UDAF与DataFrame列有关的输入样式。例如年同比函数需要对某个可以运算的指标与时间维度进行处理,就需要在inputSchema中定义它们。

1
2
3
def inputSchema: StructType = {
StructType(StructField("metric", DoubleType) :: StructField("timeCategory", DateType) :: Nil)
}

代码创建了拥有两个StructField的StructType。StructField的名字并没有特别要求,完全可以认为是两个内部结构的列名占位符。至于UDAF具体要操作DataFrame的哪个列,取决于调用者,但前提是数据类型必须符合事先的设置,如这里的DoubleType与DateType类型。这两个类型被定义在org.apache.spark.sql.types中。

bufferSchema用于定义存储聚合运算时产生的中间数据结果的Schema,例如我们需要存储当年与上一年的销量总和,就需要定义两个StructField:

1
2
3
def bufferSchema: StructType = {
StructType(StructField("sumOfCurrent", DoubleType) :: StructField("sumOfPrevious", DoubleType) :: Nil)
}

dataType标明了UDAF函数的返回值类型,deterministic是一个布尔值,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。

顾名思义,initialize就是对聚合运算中间结果的初始化,在我们这个例子中,两个求和的中间值都被初始化为0d:

1
2
3
4
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0.0)
buffer.update(1, 0.0)
}

update函数的第一个参数为bufferSchema中两个Field的索引,默认以0开始,所以第一行就是针对“sumOfCurrent”的求和值进行初始化。

UDAF的核心计算都发生在update函数中。在我们这个例子中,需要用户设置计算同比的时间周期。这个时间周期值属于外部输入,但却并非inputSchema的一部分,所以应该从UDAF对应类的构造函数中传入。我为时间周期定义了一个样例类,且对于同比函数,我们只要求输入当年的时间周期,上一年的时间周期可以通过对年份减1来完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case class DateRange(startDate: Timestamp, endDate: Timestamp) {
def in(targetDate: Date): Boolean = {
targetDate.before(endDate) && targetDate.after(startDate)
}
}

class YearOnYearBasis(current: DateRange) extends UserDefinedAggregateFunction {
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (current.in(input.getAs[Date](1))) {
buffer(0) = buffer.getAs[Double](0) + input.getAs[Double](0)
}
val previous = DateRange(subtractOneYear(current.startDate), subtractOneYear(current.endDate))
if (previous.in(input.getAs[Date](1))) {
buffer(1) = buffer.getAs[Double](0) + input.getAs[Double](0)
}
}
}

update函数的第二个参数input: Row对应的并非DataFrame的行,而是被inputSchema投影了的行。以本例而言,每一个input就应该只有两个Field的值。倘若我们在调用这个UDAF函数时,分别传入了销量和销售日期两个列的话,则input(0)代表的就是销量,input(1)代表的就是销售日期。

merge函数负责合并两个聚合运算的buffer,再将其存储到MutableAggregationBuffer中:

1
2
3
4
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)
buffer1(1) = buffer1.getAs[Double](1) + buffer2.getAs[Double](1)
}

最后,由evaluate函数完成对聚合Buffer值的运算,得到最后的结果:

1
2
3
4
5
6
def evaluate(buffer: Row): Any = {
if (buffer.getDouble(1) == 0.0)
0.0
else
(buffer.getDouble(0) - buffer.getDouble(1)) / buffer.getDouble(1) * 100
}

假设我们创建了这样一个简单的DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val conf = new SparkConf().setAppName("TestUDF").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val sales = Seq(
(1, "Widget Co", 1000.00, 0.00, "AZ", "2014-01-01"),
(2, "Acme Widgets", 2000.00, 500.00, "CA", "2014-02-01"),
(3, "Widgetry", 1000.00, 200.00, "CA", "2015-01-11"),
(4, "Widgets R Us", 2000.00, 0.0, "CA", "2015-02-19"),
(5, "Ye Olde Widgete", 3000.00, 0.0, "MA", "2015-02-28")
)

val salesRows = sc.parallelize(sales, 4)
val salesDF = salesRows.toDF("id", "name", "sales", "discount", "state", "saleDate")
salesDF.registerTempTable("sales")

那么,要使用之前定义的UDAF,则需要实例化该UDAF类,然后再通过udf进行注册:

1
2
3
4
5
6
val current = DateRange(Timestamp.valueOf("2015-01-01 00:00:00"), Timestamp.valueOf("2015-12-31 00:00:00"))
val yearOnYear = new YearOnYearBasis(current)

sqlContext.udf.register("yearOnYear", yearOnYear)
val dataFrame = sqlContext.sql("select yearOnYear(sales, saleDate) as yearOnYear from sales")
dataFrame.show()

在使用上,除了需要对UDAF进行实例化之外,与普通的UDF使用没有任何区别。但显然,UDAF更加地强大和灵活。如果Spark自身没有提供符合你需求的函数,且需要进行较为复杂的聚合运算,UDAF是一个不错的选择。

通过Spark提供的UDF与UDAF,你可以慢慢实现属于自己行业的函数库,让Spark SQL变得越来越强大,对于使用者而言,却能变得越来越简单。

如何优雅地终止正在运行的Spark Streaming程序

Posted on 2017-03-02 | In Spark |

转自: https://www.iteblog.com/archives/1890.html

一直运行的Spark Streaming程序如何关闭呢?是直接使用kill命令强制关闭吗?这种手段是可以达到关闭的目的,但是带来的后果就是可能会导致数据的丢失,因为这时候如果程序正在处理接收到的数据,但是由于接收到kill命令,那它只能停止整个程序,而那些正在处理或者还没有处理的数据可能就会被丢失。那我们咋办?这里有两种方法。

等作业运行完再关闭

我们都知道,Spark Streaming每隔batchDuration的时间会把源源不断的流数据分割成一批有限数据集,然后计算这些数据,我们可以从Spark提供的监控页面看到当前batch是否执行完成,当作业执行完,我们就可以手动执行kill命令来强制关闭这个Streaming作业。这种方式的缺点就是得盯着监控页面,然后决定关不关闭,很不灵活。

通过Spark内置机制关闭

其实Spark内置就为我们提供了一种优雅的方法来关闭长期运行的Streaming作业,我们来看看 StreamingContext类中定义的一个 stop 方法:

1
def stop(stopSparkContext: Boolean, stopGracefully: Boolean)

官方文档对其解释是:Stop the execution of the streams, with option of ensuring all received data has been processed. 控制所有接收的数据是否被处理的参数就是 stopGracefully,如果我们将它设置为true,Spark则会等待所有接收的数据被处理完成,然后再关闭计算引擎,这样就可以避免数据的丢失。现在的问题是我们在哪里调用这个stop方法?

Spark 1.4版本之前
在Spark 1.4版本之前,我们需要手动调用这个 stop 方法,一种比较合适的方式是通过 Runtime.getRuntime().addShutdownHook 来添加一个钩子,其会在JVM关闭的之前执行传递给他的函数,如下:

1
2
3
4
5
6
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() {
log("Gracefully stop Spark Streaming")
streamingContext.stop(true, true)
}
})

如果你使用的是Scala,我们还可以通过以下的方法实现类似的功能:

1
2
3
scala.sys.addShutdownHook({
streamingContext.stop(true,true)
)})

通过上面的办法,我们客户确保程序退出之前会执行上面的函数,从而保证Streaming程序关闭的时候不丢失数据。

Spark 1.4版本之后
上面方式可以达到我们的需求,但是在每个程序里面都添加这样的重复代码也未免太过麻烦了!值得高兴的是,从Apache Spark 1.4版本开始,Spark内置提供了spark.streaming.stopGracefullyOnShutdown参数来决定是否需要以Gracefully方式来关闭Streaming程序(详情请参见SPARK-7776)。Spark会在启动 StreamingContext 的时候注册这个钩子,如下:

1
2
3
4
5
6
7
8
9
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

从上面的代码可以看出,我们可以根据自己的需求来设置 spark.streaming.stopGracefullyOnShutdown 的值,而不需要在每个Streaming程序里面手动调用StreamingContext的stop方法,确实方便多了。不过虽然这个参数在Spark 1.4开始引入,但是却是在Spark 1.6才开始才有文档正式介绍(可以参见https://github.com/apache/spark/pull/8898和http://spark.apache.org/docs/1.6.0/configuration.html)

Spark有用的配置选项

Posted on 2017-02-23 |

任务提交

  • spark任务对应的Jar,配置该选项后,每次提交任务不用上传该assembly.jar,减少了任务启动时间
    1
    2
    ``` shell
    spark.yarn.jar hdfs://****:****/path/spark/spark-assembly-***.jar
  • yarn提交任务时的黑名单
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ## Shuffle

    - ```spark.shuffle.file.buffer```

    Size of the in-memory buffer for each shuffle file output stream. These buffers reduce the number of disk seeks and system calls made in creating intermediate shuffle files.

    ## Memory

    - ```spark.yarn.executor.memoryOverhead```
    The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).


    ## Spark SQL

    - ``` spark.sql.shuffle.partitions

Spark SQL 2.0版本之前,reduce的个数是通过spark.default.parallelism和spark.sql.shuffle.partitions两个参数进行配置。如果配置过大,将会导致下游产生很多碎片化的Task,或者下游HDFS产生很多小文件。如果设置过小,将会导致单个ReduceTask计算负载过大。

  • ```
    1
    2
    Spark SQL 2.0+支持通过spark.sql.adaptive.enabled来设置reduce大小自适应
    - ```spark.sql.files.ignoreCorruptFiles

Spark2.0 DataSource API

Posted on 2017-01-12 | In Spark |

Spark 1 如何实现Spark External Datasource

Spark Data Source API: Extending Our Spark SQL Query Engine

Spark SQL之External DataSource外部数据源(一)示例

Spark SQL之External DataSource外部数据源(二)源码分析

参考实现:

  • spark-sequoiadb
  • spark-csv
  • spark-redis
  • Spark多数据源计算实践及其在GrowingIO的实践

no

Spark 2 如何实现Spark External Datasource

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-datasource-api.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-datasource.html

http://www.spark.tc/exploring-the-apache-spark-datasource-api/

新特性:

  • 子查询的自持
  • 更加丰富的读写api支持,包括
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    - RelationProvider

    https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-datasource-api.html

    https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-datasource.html

    http://www.spark.tc/exploring-the-apache-spark-datasource-api/

    [关键](http://wiki.baidu.com/pages/viewpage.action?pageId=213816907)

    - DataSourceRegister

    triat ```DataSourceRegister``` is an interface to register DataSources under their shortName aliases (to look them up later).
    ``` scala
    package org.apache.spark.sql.sources

    trait DataSourceRegister {
    def shortName(): String
    }

It allows users to use the data source alias as the format type over the fully qualified class name.

Hive MapReduce Spark分布式生成唯一数值型ID

Posted on 2017-01-12 | In Hive |

转载自: http://lxw1234.com/archives/2016/12/798.htm

在实际业务场景下,经常会遇到在Hive、MapReduce、Spark中需要生成唯一的数值型ID。

一般常用的做法有:

  1. MapReduce中使用1个Reduce来生成;
  2. Hive中使用row_number分析函数来生成,其实也是1个Reduce;
  3. 借助HBase或Redis或Zookeeper等其它框架的计数器来生成;

数据量不大的情况下,可以直接使用1和2方法来生成,但如果数据量巨大,1个Reduce处理起来就非常慢。

在数据量非常大的情况下,如果你仅仅需要唯一的数值型ID,注意:不是需要”连续的唯一的数值型ID”,那么可以考虑采用本文中介绍的方法,否则,请使用第3种方法来完成。

Spark中生成这样的非连续唯一数值型ID,非常简单,直接使用zipWithUniqueId()即可。

关于zipWithUniqueId,可参考:http://lxw1234.com/archives/2015/07/352.htm

参考zipWithUniqueId()的方法,在MapReduce和Hive中,实现如下:

在Spark中,zipWithUniqueId是通过使用分区Index作为每个分区ID的开始值,在每个分区内,ID增长的步长为该RDD的分区数,那么在MapReduce和Hive中,也可以照此思路实现,Spark中的分区数,即为MapReduce中的Map数,Spark分区的Index,即为Map Task的ID。Map数,可以通过JobConf的getNumMapTasks(),而Map Task ID,可以通过参数mapred.task.id获取,格式如:attempt_1478926768563_0537_m_000004_0,截取m_000004_0中的4,再加1,作为该Map Task的ID起始值。注意:这两个只均需要在Job运行时才能获取。另外,从图中也可以看出,每个分区/Map Task中的数据量不是绝对一致的,因此,生成的ID不是连续的。

下面的UDF可以在Hive中直接使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.lxw1234.hive.udf;

import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.LongWritable;

@UDFType(deterministic = false, stateful = true)
public class RowSeq2 extends GenericUDF {

private static LongWritable result = new LongWritable();
private static final char SEPARATOR = '_';
private static final String ATTEMPT = "attempt";
private long initID = 0l;
private int increment = 0;


@Override
public void configure(MapredContext context) {
increment = context.getJobConf().getNumMapTasks();
if(increment == 0) {
throw new IllegalArgumentException("mapred.map.tasks is zero");
}

initID = getInitId(context.getJobConf().get("mapred.task.id"),increment);
if(initID == 0l) {
throw new IllegalArgumentException("mapred.task.id");
}

System.out.println("initID : " + initID + " increment : " + increment);
}

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
result.set(getValue());
increment(increment);
return result;
}

@Override
public String getDisplayString(String[] children) {
return "RowSeq-func()";
}

private synchronized void increment(int incr) {
initID += incr;
}

private synchronized long getValue() {
return initID;
}

//attempt_1478926768563_0537_m_000004_0 // return 0+1
private long getInitId (String taskAttemptIDstr,int numTasks)
throws IllegalArgumentException {
try {
String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR));
if(parts.length == 6) {
if(parts[0].equals(ATTEMPT)) {
if(!parts[3].equals("m") && !parts[3].equals("r")) {
throw new Exception();
}
long result = Long.parseLong(parts[4]);
if(result >= numTasks) { //if taskid >= numtasks
throw new Exception("TaskAttemptId string : " + taskAttemptIDstr
+ " parse ID [" + result + "] >= numTasks[" + numTasks + "] ..");
}
return result + 1;
}
}
} catch (Exception e) {}
throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr
+ " is not properly formed");
}

}

Spark2.0学习笔记

Posted on 2017-01-12 | In Spark |

Catalog API

Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面是管
理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及
持久化的元数据(比如Hivemeta store或者HCatalog)。
Spark的早期版本是没有标准的API来访问这些元数据的。用户通常使用查询语句(比
如 show tables )来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同元数据
类型的操作也是不一样的。
这种情况在Spark 2.0中得到改变。Spark 2.0中添加了标准的API(称为catalog)来访问
Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。

1
2
3
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder.appName("spark session example").enableHiveSupport().getOrCreate()
val catalog = sparkSession.catalog
  • Querying the databases
    一旦创建好catalog对象之后,可以使用它来查询元数据中的数据库,catalog上的
    API返回的结果全部都是dataset。
    listDatabases 返回元数据中所有的数据库。默认情况下,元数据仅仅只有名为default的数
    据库。如果是Hive元数据,那么它会从Hive元数据中获取所有的数据库。 listDatabases 返
    回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据。

    1
    catalog.listDatabases().select("name").show(false)
  • 使用createTempView注册Dataframe
    在Spark的早期版本,我们使用 registerTempTable 来注册Dataframe。然而在Spark
    2.0中,这个API已经被遗弃了。 registerTempTable 名字很让人误解,因为用户会认为这个函
    数会将Dataframe持久化并且保证这个临时表,但是实际上并不是这样的,所以社区才有意将它
    替换成 createTempView 。 createTempView 的使用方法如下:

    1
    df.createTempView("iteblog")
  • 查询表
    正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中
    的表。它会展示出Spark SQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是
    default)中的表。如下:

    1
    catalog.listTables().select("name").show(false)
  • 判断某个表是否缓存

    1
    2
    catalog.isCached("iteblog")
    df.cache()
  • 删除view
    以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么它会从元数据中删除表

    1
    catalog.dropTempView("iteblog")
  • 查询已经注册的函数
    仅可以使用Catalog API操作表,还可以用它操作UDF。下面代码片段展示
    SparkSession上所有已经注册号的函数,当然也包括了Spark内置的函数。

    1
    catalog.listFunctions().select("name","className","isTemporary").show(100, false)

Catalyst优化器

Spark SQL使用Catalyst优化所有的查询,包括spark sql和dataframe dsl。这个优化器的使用使得查询比直接使用RDD要快很多。Spark在每个版本都会对Catalyst进行优化以便提高查询性能,而不需要用户修改他们的代码。
Catalyst是一个单独的模块类库,这个模块是基于规则的系统。这个框架中的每个规则都是针对某个特定的情况来优化的。比如: ConstantFolding 规则用于移除查询中的常量表达式。
在Spark的早期版本,如果需要添加自定义的优化规则,我们需要修改Spark的源码,这在很多情况下是不太可取的,比如我们仅仅需要优化特定的领域或者场景。所以开发社区想有一种可插拨的方式在Catalyst中添加优化规则。值得高兴的是,Spark 2.0提供了这种实验式的API,我们可以基于这些API添加自定义的优化规则。本文将介绍如何编写自定义的优化规则,并将这些规则添加到Catalyst中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 	// dataframe的优化计划(Optimized plan)
val df = spark.read.option("header", "true").csv("file:///user/iteblog/sales.csv")
val multipliedDF = df.selectExpr("amountPaid * 1")
println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)
// Spark自动对每一行的 amountPaid 乘上 1.0 。但是这不是最优计划!因为如果是乘以1,最终的结果是一样的。所有我们可以利用这个知识来编写自定义的优化规则,并将这个规则加入到Catalyst中

// 自定义优化计划
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Literal, Multiply}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left, right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
println("optimization of one applied")
left
}
}
// 我们扩展了Rule,Rule是直接操作逻辑计划的。绝大多数的规则都是使用Scala中的模式匹
配。在上面的代码中,我们首先判断优化的操作数(operand)是否是文字(literal),然后判断其值
是否是1.0。为了简便起见,我们限定了1出现的位置,如果1出现在左边,这个优化规则将不起
作用。但是我们可以仿照上面的示例轻松地实现.通过上面的规则,如果右边的值是1,我们将直接返回左边的值。


// 将自定义的优化规则加入到Catalyst中
spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)

// 使用自定义优化规则
val newMultipliedDF = df.selectExpr("amountPaid * 1")
println(newMultipliedDF.queryExecution.optimizedPlan.numberedTreeString)

Spark SQL中Window API

Spark SQL中的window API是从1.4版本开始引入的,以便支持更智能的分组功能。这个功
能对于那些有SQL背景的人来说非常有用;但是在Spark 1.x中,window API一大缺点就是无法
使用时间来创建窗口。时间在诸如金融、电信等领域有着非常重要的角色,基于时间来理解数据
变得至关重要。
不过值得高兴的是,在Spark 2.0中,window API内置也支持time windows!Spark SQL
中的time windows和Spark Streaming中的time windows非常类似。

  • tumbling window
    所有的timewindow API需要一个类型为timestamp的列。

    by```语句中使用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    window方法的第一个参数指定了时间所在的列;第二个参数指定了窗口的持续时间
    (duration),它的单位可以是seconds、minutes、hours、days或者weeks。创建好窗口之
    后,我们可以计算平均值。
    ``` scala
    // 使用了内置的year函数来提取出日期中的年
    val stocks2016 = stocksDF.filter("year(Date)==2016")

    // 对每个星期创建一个窗口,这种类型的窗口通常被称为tumbling window
    val tumblingWindowDS = stocks2016.groupBy(window(stocks2016.col("Date"),"1 week")).agg(avg("Close").as("weekly_average"))

    def printWindow(windowDF:DataFrame, aggCol:String) ={
    windowDF.sort("window.start").
    select("window.start","window.end",s"$aggCol").
    show(truncate = false)
    }
    // 打印window的值
    printWindow(tumblingWindowDS,"weekly_average")

  • 用sliding window(滑动窗口)
    到目前为止,没有相关API来创建带有开始时间的tumbling
    window,但是我们可以通过将窗口时间(window duration)和滑动时间(slide duration)设置成
    一样来创建带有开始时间的tumbling window。

1
2
3
// 4 days 参数就是开始时间的偏移量;前两个参数分别代表窗口时间和滑动时间
val iteblogWindowWithStartTime = stocks2016.groupBy(window(stocks2016.col("Date"),"1
week","1 week", "4 days")).agg(avg("Close").as("weekly_average"))
1…456…11

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4