Spark源码注释中有下面一句话:
Asynchronously passes SparkListenerEvents to registered SparkListeners
即所有spark消息SparkListenerEvents 被异步的发送给已经注册过的SparkListeners.
在SparkContext中, 首先会创建LiveListenerBus实例,这个类主要功能如下:
- 保存所有消息队列,负责消息的缓存
- 保存所有注册过的listener,负责消息的分发
listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性.
消息队列实际上是保存在类AsynchronousListenerBus中的:
1 | private val EVENT_QUEUE_CAPACITY = 10000 |
事件队列的长度为10000,当缓存事件数量达到上限后,新来的事件会被丢弃,
在SparkContext中,会
- 创建LiveListenerBus类类型的成员变量listenerBus
- 创建各种listener,并加入到listenerBus中
- post一些事件到listenerBus中
- 调用listenerBus.start() 来启动事件处理程序
这里有一点需要注意的是, 在listenerBus.start() 调用之前, 可以向其中post消息, 这些消息会被缓存起来,等start函数调用之后, 消费者线程会分发这些缓存的消息.