Structured Streaming介绍

中文入门编程指南

官方文档

Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1

背景

Structured Streaming接口在社区2.0版本发布测试接口,主要暴露最初的设计思路及基本接口,不具备在生产环境使用的能力;2.1版本中Structured Streaming作为主要功能发布,支持Kafka数据源、基于event_time的window及watermark功能,虽然还在Alapha阶段,但从实现的完备程度及反馈来看已具备初步的功能需求。

设计理念及所解决的问题

从Spark 0.7版本发布DStream接口及Spark Streaming模块以来,Spark具备流式处理功能且在业界有了一系列应用,但依旧存在一些问题,诟病较多的是如下几点:

  • 不支持event_time,按照到达绝对时间切分records组成DStream的方式对很多场景不适合
  • 不支持流式window操作
  • 不支持watermark,无法对乱序数据做容错

2.1版本的Spark不但解决上述问题,并将Spark Streaming的流处理方式和1.x版本中集中开发的SQL模块、DataFrame\DataSet API相融合,推出Structured Streaming引擎。其设计思路在于将持续不断的上游数据抽象为unbounded table,对流式的处理看作是表中不同部分的数据(complete\append\update mode)进行处理:

从代码层面看,Structured Streaming代码放在sql模块中,与原有SQL的datasource api、logical plan、physical plan做了诸多兼容操作,将流式处理的上游下游分别抽象为Source与Sink,基于DataFrame抽象出输入输出流DataStreamReader、DataStreamWriter。DataStreamReader中打通datasource api,支持多种上游的读取支持,DataStreamWriter中遵循惰性计算的思路实现多种触发操作及不同类型的写出模式。

Demo for struct streaming

一个完整的struct streaming示例及分布解释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

// 与其他spark作业一致,获取build spark session,对应旧版本中的spark context
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()

import spark.implicits._
// 创建一个基于stream source的dataframe

val lines = spark.readStream
.format("socket") # 对应基本概念:Source
.option("host", host)
.option("port", port)
.load()

// WordCount逻辑,与dataframe/dataset api用法完全一致,
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

// 调用dataset的writeStream创建写出流,设置对应的sink mode及类型
val query = wordCounts.writeStream
.outputMode("complete") # 对应基本概念:Output Mode
.format("console") # 对应基本概念:Sink
.start()

query.awaitTermination()

基本概念

Source

Struct Streaming的输入源,每种输入源对应自己的dataSource实现,当前支持如下三种Source:

  • File Source: 输入数据源为hdfs目录中的文件,天然支持的文件格式和dataset一致(json csv text parquet),将文件不断mv至指定目录中作为持续数据源
  • Kafka Source: 将kafka作为struct streaming source,目前支持的版本为0.10.0
  • Socket Source: 将socket输入数据作为streaming source,只用来做调试和demo使用

    Output Mode

    Output Mode都是对于每次trigger过后的result table而言的
  • Complete Mode : 从开始到目前为止的所有数据视为一张大表,query作用于整个表上的结果整体写入
  • Append Mode : 从上次trigger到目前为止,不会在发生变化的数据append到最终的sink端
  • Update Mode : 从上次trigger到目前为止,发生变化的条目写入到最终的sink端

简单用自带的StructuredNetworkWordCountWindowed实例对比下Complete mode与Update Mode:
数据输入:

1
2
3
4
[liyuanjian@MacBook~Pro ~]$ nc -l 9999
apache spark
apache hadoop
baidu inf spark

Complete mode output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+

+------+-----+
| value|count|
+------+-----+
|apache| 2|
|hadoop| 1|
| spark| 1|
+------+-----+

+------+-----+
| value|count|
+------+-----+
| baidu| 1|
|apache| 2|
|hadoop| 1|
| spark| 2|
| inf| 1|
+------+-----+

Update mode output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+

+------+-----+
| value|count|
+------+-----+
|apache| 2|
|hadoop| 1|
+------+-----+

+-----+-----+
|value|count|
+-----+-----+
|baidu| 1|
| inf| 1|
|spark| 2|
+-----+-----+

Sink

作为struct streaming的下游抽象,sink代表将最终处理完成的dataframe写出的方式,目前支持:

  1. File sink : 将数据写入hdfs目录,可以支持带partition的table写入
  2. Console sink : 将数据直接调用dataframe.show()打印在stdout,调试作用,demo中使用的都是console sink
  3. Memory sink : 将所有下游存储在driver的内存中,抽象为一张表,也只能做调试使用
  4. Foreach sink : 依赖用户实现ForeachWriter接口配合使用,foreach sink中对每次触发的dataframe,按逐个partition调用ForeachWriter的接口进行处理,可以实现ForeachWriter写入任何需要的下游存储或处理系统,接口如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    abstract class ForeachWriter[T] extends Serializable {

    // TODO: Move this to org.apache.spark.sql.util or consolidate this with batch API.

    /**
    * Called when starting to process one partition of new data in the executor. The `version` is
    * for data deduplication when there are failures. When recovering from a failure, some data may
    * be generated multiple times but they will always have the same version.
    *
    * If this method finds using the `partitionId` and `version` that this partition has already been
    * processed, it can return `false` to skip the further data processing. However, `close` still
    * will be called for cleaning up resources.
    *
    * @param partitionId the partition id.
    * @param version a unique id for data deduplication.
    * @return `true` if the corresponding partition and version id should be processed. `false`
    * indicates the partition should be skipped.
    */
    def open(partitionId: Long, version: Long): Boolean

    /**
    * Called to process the data in the executor side. This method will be called only when `open`
    * returns `true`.
    */
    def process(value: T): Unit

    /**
    * Called when stopping to process one partition of new data in the executor side. This is
    * guaranteed to be called either `open` returns `true` or `false`. However,
    * `close` won't be called in the following cases:
    * - JVM crashes without throwing a `Throwable`
    * - `open` throws a `Throwable`.
    *
    * @param errorOrNull the error thrown during processing data or null if there was no error.
    */
    def close(errorOrNull: Throwable): Unit
    }

Window

和其他流式系统一样,基于滑动时间窗的数据统计、聚合是必不可少的需求,2.1实现了基于event-time的window定义,接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Bucketize rows into one or more time windows given a timestamp specifying column. Window
* starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
* [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
* the order of months are not supported. The windows start beginning at 1970-01-01 00:00:00 UTC.
* The following example takes the average stock price for a one minute window every 10 seconds:
*
* {{{
* val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType
* df.groupBy(window($"time", "1 minute", "10 seconds"), $"stockId")
* .agg(mean("price"))
* }}}
*
* The windows will look like:
*
* {{{
* 09:00:00-09:01:00
* 09:00:10-09:01:10
* 09:00:20-09:01:20 ...
* }}}
*
* For a streaming query, you may use the function `current_timestamp` to generate windows on
* processing time.
*
* @param timeColumn The column or the expression to use as the timestamp for windowing by time.
* The time column must be of TimestampType.
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for
* valid duration identifiers. Note that the duration is a fixed length of
* time, and does not vary over time according to a calendar. For example,
* `1 day` always means 86,400,000 milliseconds, not a calendar day.
* @param slideDuration A string specifying the sliding interval of the window, e.g. `1 minute`.
* A new window will be generated every `slideDuration`. Must be less than
* or equal to the `windowDuration`. Check
* [[org.apache.spark.unsafe.types.CalendarInterval]] for valid duration
* identifiers. This duration is likewise absolute, and does not vary
* according to a calendar.
*
* @group datetime_funcs
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column = {
window(timeColumn, windowDuration, slideDuration, "0 second")
}

window通常与groupBy算子连用,通过设置dataframe中的timeColumn,windowDuration(window大小),sildeDuration(步长),来定义整个window的行为,如windowDuration = 10min, slideDuration = 5min,则代表由event-time触发每5分钟计算一次,计算的对象是当前window中10min的数据

Watermark

接上述window的介绍,对于任何基于window\event-time的聚合场景,我们都需要考虑对于乱序的过期event-time数据到达的处理行为。watermark用来给用户提供一个接口,让用户能够定义”比当前处理时间慢多久的数据可以丢弃,不再处理”,祥设文档参见:https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit#heading=h.yx3tjr1mrnl2

  • watermark的计算规则
  1. 在每次trigger触发计算时,先找到trigger data中的最大(近)event-time
  2. trigger结束后, new watermark = MAX(event-time before trigger, max event-time in trigger[步骤1]) - threashold
    watermark的限制
    watermark操作只能在logical plan中有一个,而且只能应用在从sink出发的单child关系链,简单理解就是处理不了复杂的多留join、union的各自设置watermark