LiveListenerBus
asynchronously passes listener events to registered Spark listeners.
LiveListenerBus
is a single-JVM SparkListenerBus that uses listenerThread to poll events. Emitters are supposed to use post
method to post SparkListenerEvent
events.
Note
|
The event queue is java.util.concurrent.LinkedBlockingQueue with capacity of 10000 SparkListenerEvent events.
|
Note
|
An instance of LiveListenerBus is created in SparkContext.
|
start(sc: SparkContext): Unit
start
starts processing events.
Internally, it saves the input SparkContext
for later use and starts listenerThread. It makes sure that it only happens when LiveListenerBus
has not been started before (i.e. started
is disabled).
If however LiveListenerBus
has already been started, a IllegalStateException
is thrown:
[name] already started!
post(event: SparkListenerEvent): Unit
post
puts the input event
onto the internal eventQueue
queue and releases the internal eventLock
semaphore. If the event placement was not successful (and it could happen since it is tapped at 10000 events) onDropEvent method is called.
The event publishing is only possible when stopped
flag has been enabled.
Caution
|
FIXME Who’s enabling the stopped flag and when/why?
|
If LiveListenerBus
has been stopped, the following ERROR appears in the logs:
ERROR [name] has already stopped! Dropping event [event]
onDropEvent(event: SparkListenerEvent): Unit
onDropEvent
is called when no further events can be added to the internal eventQueue
queue (while posting a SparkListenerEvent event).
It simply prints out the following ERROR message to the logs and ensures that it happens only once.
ERROR Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
Note
|
It uses the internal logDroppedEvent atomic variable to track the state.
|
stop(): Unit
stop
releases the internal eventLock
semaphore and waits until listenerThread dies. It can only happen after all events were posted (and polling eventQueue
gives nothing).
It checks that started
flag is enabled (i.e. true
) and throws a IllegalStateException
otherwise.
Attempted to stop [name] that has not yet started!
stopped
flag is enabled.
LiveListenerBus
uses SparkListenerBus
single daemon thread that ensures that the polling events from the event queue is only after the listener was started and only one event at a time.
Caution
|
FIXME There is some logic around no events in the queue. |
spark.extraListeners
(default: empty) is a comma-separated list of listener class names that should be registered with LiveListenerBus when SparkContext is initialized.
SparkListenerBus
is a ListenerBus that manages SparkListenerInterface listeners that process SparkListenerEvent events.
It comes with a custom doPostEvent
method.
doPostEvent(listener: SparkListenerInterface, event: SparkListenerEvent): Unit
doPostEvent
method simply relays SparkListenerEvent
events to appropriate SparkListenerInterface
methods as follows:
SparkListenerEvent | SparkListenerInterface’s Method |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
event ignored |
other event types |
|
Note
|
There are two custom SparkListenerBus listeners: LiveListenerBus and ReplayListenerBus.
|
ListenerBus[L <: AnyRef, E]
ListenerBus
is an event bus that post events (of type E
) to all registered listeners (of type L
).
It manages listeners
of type L
, i.e. it can add to and remove listeners from an internal listeners
collection.
addListener(listener: L): Unit
removeListener(listener: L): Unit
It can post events of type E
to all registered listeners (using postToAll
method). It simply iterates over the internal listeners
collection and executes the abstract doPostEvent
method.
doPostEvent(listener: L, event: E): Unit
Note
|
doPostEvent is provided by more specialized ListenerBus event buses.
|
In case of exception while posting an event to a listener you should see the following ERROR message in the logs and the exception.
ERROR Listener [listener] threw an exception
Note
|
There are three custom ListenerBus listeners: SparkListenerBus, StreamingQueryListenerBus, and StreamingListenerBus.
|
Tip
|
Enable Add the following line to
Refer to Logging. |