常规计算引擎分类
- 批处理
• 高吞吐,低延迟
• 面向静态数据集合的处理
• 分钟甚至小时级别延迟
• 比如MR, Spark - 流式计算
• 面向行级别数据处理
• 毫秒级延迟
• 比如storm
流式计算分类
- 面向行
Apache Flink — 收集一堆数据,然后一行一行处理
Storm - 面向micro-Batch
Spark Streaming — 收集一堆数据,然后一起处理
流式计算通用户环节
数据源 —> 数据缓存 —> 流式引擎 —> 结果存储
流式计算计算方式
- 固定窗口
Spark Streaming 常规支持的方式 滑动窗口( window )
会话计算( mapWithStates )
存储Spark Streaming的状态信息(类似session),可以进行过期处理Spark Streaming编程要点
Spark Streaming: exactly once delivery
特殊情况:故障重算,推测执行等
- Monitoring and managing jobs
- where to run the driver?
Yarn cluster mode. Driver will continue to running when the client machine goes down. - How to restart driver ?
set up automatic restart.
In spark configuration (e.g. spark-defaults.conf):
1 | spark.yarn.maxAppAttempts=2 // 重试尝试次数 |
- Summary
各种1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
232. Graceful shutting down your streaming app
思路: Thread hooks – Check for an external flag every N seconds
``` scala
/** * Stop the execution of the streams, with option of ensuring all received data
* has been processed.
*
* * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
receiverTracker.stop(processAllReceivedData) //default is to wait 10 second, grace waits until done jobGenerator.stop(processAllReceivedData) // Will use spark.streaming.gracefulStopTimeout
jobExecutor.shutdown()
val terminated = if (processAllReceivedData) {
jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time
} else {
jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
}
if (!terminated) { jobExecutor.shutdownNow()
}
How to be graceful?
• cmd line
– $SPARK_HOME_DIR/bin/spark-submit –master $MASTER_REST_URL –kill $DRIVER_ID
– spark.streaming.stopGracefullyOnShutdown=true
1 | private def stopOnShutdown(): Unit = { |
• By marker file
– Touch a file when starting the app on HDFS
– Remove the file when you want to stop
– Separate thread in Spark app, calls
1 | streamingContext.stop(stopSparkContext = true, stopGracefully = true) |