流式计算概述和Spark Streaming tips

常规计算引擎分类

  1. 批处理
    • 高吞吐,低延迟
    • 面向静态数据集合的处理
    • 分钟甚至小时级别延迟
    • 比如MR, Spark
  2. 流式计算
    • 面向行级别数据处理
    • 毫秒级延迟
    • 比如storm

流式计算分类

  1. 面向行
    Apache Flink — 收集一堆数据,然后一行一行处理
    Storm
  2. 面向micro-Batch
    Spark Streaming — 收集一堆数据,然后一起处理

流式计算通用户环节

数据源 —> 数据缓存 —> 流式引擎 —> 结果存储

流式计算计算方式

  1. 固定窗口
    Spark Streaming 常规支持的方式
  2. 滑动窗口( window )

  3. 会话计算( mapWithStates )
    存储Spark Streaming的状态信息(类似session),可以进行过期处理

    Spark Streaming编程要点

Spark Streaming: exactly once delivery
特殊情况:故障重算,推测执行等

  1. Monitoring and managing jobs
  • where to run the driver?
    Yarn cluster mode. Driver will continue to running when the client machine goes down.
  • How to restart driver ?
    set up automatic restart.
    In spark configuration (e.g. spark-defaults.conf):
1
2
3
4
5
6
spark.yarn.maxAppAttempts=2  // 重试尝试次数
spark.yarn.am.attemptFailuresValidityInterval=1h // 重置尝试次数的时间
spark.yarn.max.executor.failures={8 * num_executors} // executor失败的最大次数
spark.yarn.executor.failuresValidityInterval=1h // 重置失败的时间
spark.task.maxFailures=8 // task重试次数 默认是4
spark.speculation=true //预测执行, 前提:task是幂等
  • Summary
    各种
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    2. Graceful shutting down your streaming app
    思路: Thread hooks – Check for an external flag every N seconds

    ``` scala
    /** * Stop the execution of the streams, with option of ensuring all received data
    * has been processed.
    *
    * * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
    * will be stopped regardless of whether this StreamingContext has been
    * started.
    * @param stopGracefully if true, stops gracefully by waiting for the processing of all
    * received data to be completed
    */
    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
    receiverTracker.stop(processAllReceivedData) //default is to wait 10 second, grace waits until done jobGenerator.stop(processAllReceivedData) // Will use spark.streaming.gracefulStopTimeout
    jobExecutor.shutdown()
    val terminated = if (processAllReceivedData) {
    jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time
    } else {
    jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
    }
    if (!terminated) { jobExecutor.shutdownNow()
    }

How to be graceful?
• cmd line
– $SPARK_HOME_DIR/bin/spark-submit –master $MASTER_REST_URL –kill $DRIVER_ID
– spark.streaming.stopGracefullyOnShutdown=true

1
2
3
4
5
private def stopOnShutdown(): Unit = { 
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully) }

• By marker file
– Touch a file when starting the app on HDFS
– Remove the file when you want to stop
– Separate thread in Spark app, calls

1
streamingContext.stop(stopSparkContext = true, stopGracefully = true)