SparkListener机制详解

Spark源码注释中有下面一句话:

Asynchronously passes SparkListenerEvents to registered SparkListeners

即所有spark消息SparkListenerEvents 被异步的发送给已经注册过的SparkListeners.
在SparkContext中, 首先会创建LiveListenerBus实例,这个类主要功能如下:

  • 保存所有消息队列,负责消息的缓存
  • 保存所有注册过的listener,负责消息的分发

listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性.

消息队列实际上是保存在类AsynchronousListenerBus中的:

1
2
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)

事件队列的长度为10000,当缓存事件数量达到上限后,新来的事件会被丢弃,

在SparkContext中,会

  • 创建LiveListenerBus类类型的成员变量listenerBus
  • 创建各种listener,并加入到listenerBus中
  • post一些事件到listenerBus中
  • 调用listenerBus.start() 来启动事件处理程序

这里有一点需要注意的是, 在listenerBus.start() 调用之前, 可以向其中post消息, 这些消息会被缓存起来,等start函数调用之后, 消费者线程会分发这些缓存的消息.