In part 1 of the event bus series we discussed implementing a simple and powerful event bus using just three classes. If you haven’t read it yet, I strongly recommend you read it first.
Use dynamic proxies to create a simple, powerful event bus (Part 1)
In this blog we’ll build on part 1 by adding several important features to the event bus to make it production ready. Since these features are fairly well contained, you can start with the section that most interests you, download the code, or view the project on GitHub.
- Event Filtering
- Event Topics
- Untyped Listeners
- Exception Handling
- Garbage Collection
- Thread-local Events
Event Filtering
Event filters help reduce noise by allowing listeners to automatically ignore events they would normally receive. This is accomplished by including a new EventFilter
object alongside every listener added to the bus.
1 2 3 4 5 6 7 |
public class EventBus { ... public <T extends EventListener> void addListener(Class<T> eventListenerClass, EventFilter filter, T listener) { ... } } |
The EventFilter
class has a single abstract allow(event, listener)
method that returns true if the event should be delivered to the listener.
Several filters are already are part of the bus, including EventSourceFilter
. This filter only allow events that were produced by one of a set of sources.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class EventSourceFilter implements EventFilter { private final Object[] source; public EventSourceFilter(Object ... source) { this.source = source; } @Override public boolean allow(Event<?> event, Object listener) { if (source == null || source.length == 0) { return true; } for (Object s : source) { if (event.getEventSource() == s) { return true; } } return false; } } |
In addition to the new EventFilter interface and the addListener update, a few other changes are needed to handle filters:
- A new
EventListenerStub
class to hold a listener and filter together. - The
eventListeners
field on EventBus is converted from aSet<? extends EventListener>
to aList<EventListenerStub<? extends EventListener>>
. We swapSet
forList
because we can no longer depend on the listener’s object identity to prevent duplicate listeners in the set (now that it’s wrapped in anEventListenerStub
). - The bus’
deliverEvent()
method callsEventFilter.allow()
to check if the event should be delivered.
Here’s a quick example of EventSourceFilter
in action.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public class EventSourceFilterExample { public static void main(String[] args) throws Throwable { EventSourceFilterExample eventSource1 = new EventSourceFilterExample(); EventSourceFilterExample eventSource2 = new EventSourceFilterExample(); Location store = new Location(); EventBus eventBus = new EventBus(); // listen for events from eventSource1 eventBus.addListener(WalkListener.class, new EventSourceFilter(eventSource1), new WalkListener() { public void walkTo(Location location) { System.out.println("walking from eventSource1..."); } }); // listen for events from eventSource1 or eventSource2 eventBus.addListener(WalkListener.class, new EventSourceFilter(eventSource1, eventSource2), new WalkListener() { public void walkTo(Location location) { System.out.println("walking from eventSource1 or eventSource2..."); } }); // eventSource2 is set as the source for this publisher WalkListener publisher = eventBus.getPublisher(eventSource2, WalkListener.class); publisher.walkTo(store); publisher.walkTo(store); publisher.walkTo(store); Thread.sleep(2000L); eventBus.shutdown(); System.out.println("The End."); } } |
Event Topics
It’s not always ideal for us to filter by the event’s source. Sometimes it’s more convenient to filter by a value known to both publishers and listeners. That’s where topics come in. Topic values can bet passed as an optional parameter while obtaining a publisher from the bus. Every method call made on that publisher will include the topic value. Listeners can watch for specific topics by including a TopicFilter
when registering with the bus. Topics can be any java.lang.Object
or subclass we like. I recommend enums because they are IDE and refactoring friendly, but a string or any object will do.
Publishers
A few small changes on the publisher side will handle the registration and propagation of topics for us. First, both the Event
and EventSourceInvocationHandler
classes will track topics. EventSourceInvocationHandler
will have it set during construction and pass it on to every event it creates.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class EventSourceInvocationHandler<T extends EventListener> implements InvocationHandler { private final EventBus eventBus; private final Object eventSource; private final Class<T> eventListenerClass; private final Object topic; public EventSourceInvocationHandler(EventBus eventBus, Object eventSource, Class<T> eventListenerClass, Object topic) { ... } @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { eventBus.publishEvent(new Event<T>(eventSource, proxy, eventListenerClass, topic, method, arguments)); return null; } } |
Second, the bus’ getPublisher()
method will be overloaded to take in a topic value and set it on the EventSourceInvocationHandler
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class EventBus { ... public T getPublisher(Object eventSource, Class eventListenerClass) { return getPublisher(eventSource, eventListenerClass, null); } public T getPublisher(Object eventSource, Class eventListenerClass, Object topic) { EventSourceInvocationHandler handler = new EventSourceInvocationHandler(this, eventSource, eventListenerClass, topic); return (T) Proxy.newProxyInstance( eventListenerClass.getClassLoader(), new Class[]{eventListenerClass}, handler); } } |
Listeners
The TopicFilter
class allows listeners to receive events matching one or more topic values. The implementation uses exact matching, but if that’s too simplistic for your case, it should be easy enough to implement your own hierarchical or pattern matching filter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public class TopicFilter implements EventFilter { private final Object[] topic; public TopicFilter(Object... topic) { this.topic = topic; } @Override public boolean allow(Event<?> event, Object listener) { if (topic == null || topic.length == 0) { return true; } final Object t = event.getTopic(); for (Object t2 : topic) { if (t == t2 || (t != null && t.equals(t2))) { return true; } } return false; } } |
Here’s an example that uses enum topics to publish and filter events.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
public class TopicFilterExample { public static enum Topic { STROLL, RUSH } public static void main(String[] args) throws Throwable { TopicFilterExample eventSource = new TopicFilterExample(); Location store = new Location(); EventBus eventBus = new EventBus(); // listen for events under the STROLL topic eventBus.addListener(WalkListener.class, new TopicFilter(Topic.STROLL), new WalkListener() { public void walkTo(Location location) { System.out.println("walking slowly...out for a stroll"); } }); // listen for events under the RUSH topic eventBus.addListener(WalkListener.class, new TopicFilter(Topic.RUSH), new WalkListener() { public void walkTo(Location location) { System.out.println("walking quickly...in a rush"); } }); // RUSH is set as the topic for this publisher WalkListener publisher = eventBus. getPublisher(eventSource, WalkListener.class, Topic.RUSH); publisher.walkTo(store); publisher.walkTo(store); publisher.walkTo(store); Thread.sleep(2000L); eventBus.shutdown(); System.out.println("The End."); } } |
Untyped Listeners
Some use-cases require that we listen to all events passing through the bus regardless of their type. Audit, monitoring, or even debug requirements might call for such a feature. In these special cases we can register an UntypedEventListener
with the bus. These listeners are notified in the form of the Event
object after all typed listeners have received their notifications.
EventListenerStub Refactoring
Now that we’re handling both typed and untyped listeners, EventListenerStub
will undergo refactoring to reduce code duplication. First, EventListenerStub
is divided into three classes. The original EventListenerStub
and two inner subclasses: Typed
and Untyped
. The new subclasses are responsible for providing the delivery logic for their kind of listeners while allowing EventListenerStub
to handle filtering and other common behaviour.
Just as before, the Typed
subclass invokes the original called method on each listener.
1 2 3 4 5 6 7 |
class Typed<S extends EventListener> extends EventListenerStub<S> { ... protected void deliverEventImpl(EventBus eventBus, Event<?> event) throws Throwable { S listener = getListener(); event.getMethod().invoke(listener, event.getArguments()); } } |
While the Untyped
subclass calls the onEvent method on untyped listeners.
1 2 3 4 5 6 7 |
class Untyped extends EventListenerStub<UntypedEventListener> { ... protected void deliverEventImpl(EventBus eventBus, Event<?> event) throws Throwable { UntypedEventListener listener = getListener(); listener.onEvent(event); } } |
EventBus Refactoring
The event bus gets a few upgrades.
- The old
eventListeners
field is renamed totypedListeners
to compliment the newuntypedListeners
field. - Both versions of addListener (typed and untyped) take care to add the correct subclass of EventListenerStub to their lists.
- The bus’
deliverEvent
method now delegates to thedeliverEvent
method on each stub to ensure the correct delivery behaviour.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public class EventBus { private final Map<Class<? extends EventListener>, TypedListenerList<?>> typedListeners = new HashMap<Class<? extends EventListener>, TypedListenerList<?>>(); private final List<EventListenerStub<UntypedEventListener>> untypedListeners = new CopyOnWriteArrayList<EventListenerStub<UntypedEventListener>>(); public <T extends EventListener> void addListener(Class<T> eventListenerClass, EventFilter filter, T listener) { TypedListenerList<T> list; ... list.getList().add(new EventListenerStub.Typed<T>(filter, listener)); } public void addListener(EventFilter filter, UntypedEventListener listener) { untypedListeners.add(new EventListenerStub.Untyped(filter, listener)); } protected <T extends EventListener> void deliverEvent(Event<T> event) throws Throwable { // Deliver to typed listeners TypedListenerList<T> list = getTypedListenerList(event.getEventListenerClass()); if (list != null) { for (EventListenerStub<?> stub : list.getList()) { stub.deliverEvent(this, event); } } // Deliver to untyped listeners for (EventListenerStub<UntypedEventListener> stub : untypedListeners) { stub.deliverEvent(this, event); } } } |
Here’s an example of an untyped listener watching all events on the bus.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class UntypedEventListenerExample { public static void main(String[] args) throws Throwable { UntypedEventListenerExample eventSource = new UntypedEventListenerExample(); Location store = new Location(); EventBus eventBus = new EventBus(); eventBus.addListener(null, new UntypedEventListener() { public void onEvent(Event<?> event) { System.out.println("event: " + event.getEventListenerClass().getSimpleName() + "." + event.getMethod().getName() ); } }); WalkListener publisher = eventBus.getPublisher(eventSource, WalkListener.class); publisher.walkTo(store); publisher.walkTo(store); publisher.walkTo(store); Thread.sleep(2000L); eventBus.shutdown(); System.out.println("The End."); } } |
Exception Handling
So far we haven’t said anything about exceptions. If you’ve looked at the source code from part 1, you’ll see that exceptions were caught, wrapped in RuntimeExceptions, and then rethrown. This saved us from the extra coding to declare checked exceptions, but did nothing to actually handle them. What we need now is an exception handling strategy that:
- Handles all exceptions regardless of which thread throws them
- Handles exceptions for both typed and untyped listeners
- Doesn’t require knowledge of the bus’ internal workings
- Is decoupled from listeners
- Is simple to use
Exception Events
We can accomplish all of these requirements by simply leveraging the bus itself. By letting the event bus publish exceptions like any other event, we gain all of its advantages (decoupling, threading, filters, etc.) for the price of a small change.
The new ExceptionListener
interface is used for both publishing and observing exceptions. The bus holds a publisher reference which it exposes through its handleException
method.
Since the bus delegates the actual event delivery to EventListenerStub
, that class is also responsible for reporting exceptions to the bus by calling handleException
. The handleException
method takes care not to introduce infinite loops by not publishing exceptions for events where it was the publisher.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class EventBus { ... private final ExceptionListener exceptionPublisher; public EventBus() { exceptionPublisher = getPublisher(this, ExceptionListener.class); } public void addExceptionListener(EventFilter filter, ExceptionListener listener) { addListener(ExceptionListener.class, filter, listener); } protected void handleException(Event<?> event, Object listener, Throwable e) { if (e instanceof InvocationTargetException) { e = e.getCause(); } if (event.getEventPublisher() != exceptionPublisher) { exceptionPublisher.onException(event, e, listener); } } ... } |
This example prints each exception’s message along with the event class’ name and method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public class ExceptionListenerExample { public static void main(String[] args) throws Throwable { ExceptionListenerExample eventSource = new ExceptionListenerExample(); Location store = new Location(); EventBus eventBus = new EventBus(); eventBus.addListener(WalkListener.class, null, new WalkListener() { public void walkTo(Location location) { throw new RuntimeException("always fails"); } }); eventBus.addExceptionListener(null, new ExceptionListener() { @Override public void onException(Event<?> event, Throwable exception, Object targetListener) { System.out.println("exception in " + event.getEventListenerClass().getSimpleName() + "." + event.getMethod().getName() + ": " + exception.getMessage()); } }); WalkListener publisher = eventBus.getPublisher(eventSource, WalkListener.class); publisher.walkTo(store); publisher.walkTo(store); publisher.walkTo(store); Thread.sleep(2000L); eventBus.shutdown(); System.out.println("The End."); } } |
Garbage Collection
Memory leaks are a real concern when using the observer pattern. Observers that are never unregistered may never get garbage collected and your application’s memory could continue to grow until it eventually fails with an OutOfMemoryError
. Memory leaks are arguably even more of a concern when working with event buses because they are responsible for holding on to all observers for all subjects. If a bus lives for the entire run of your app while holding on to every single all observer, we could get that OutOfMemoryError
even sooner.
Soft References
The java.lang.ref
package is Java’s answer to allowing objects to be garbage collected, even while there are still live references to them. The key is to wrap those objects with one of the classes in this package. In the event bus, we now wrap each listener in a SoftReference
. This allows listeners to be automatically garbage collected when memory gets low, but now introduces a level of indirection to our code. We chose to use SoftReference
over a WeakReference
because they are garbage collected less aggressively by the JVM. Since most listeners tend to be anonymous inner classes, only referenced by the bus, I’d hate to see them disappear while the system is still flush with memory. (For more on Java references, read Ethan Nicholas’ classic blog Understanding Weak References. )
EventListenerStub
The code that manages listeners through soft references can e found in EventListenerStub
and its two subclasses — Typed
and Untyped
. EventListenerStub
wraps the listener and ensures the getter method is used for all access. The Typed
and Untyped
subclasses now perform null checks (in case listeners were garbage collected) before actually delivering events.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
abstract class EventListenerStub<T> { ... private final Reference<T> listener; public EventListenerStub(EventFilter filter, T listener, ReferenceQueue<? super T> referenceQueue) { ... this.listener = new SoftReference<T>(listener, referenceQueue); } protected T getListener() { return listener.get(); } ... //==================================================================== // Typed Implementation //==================================================================== final static class Typed<S extends EventListener> extends EventListenerStub<S> { ... protected void deliverEventImpl(EventBus eventBus, Event<?> event) throws Throwable { S listener = getListener(); if (listener != null) { // in case it was garbage collected event.getMethod().invoke(listener, event.getArguments()); } } } //==================================================================== // Untyped Implementation //==================================================================== final static class Untyped extends EventListenerStub<UntypedEventListener> { ... protected void deliverEventImpl(EventBus eventBus, Event<?> event) throws Throwable { UntypedEventListener listener = getListener(); if (listener != null) { // in case it was garbage collected listener.onEvent(event); } } } } |
Reference Clean-up
You might have noticed the ReferenceQueue
parameter in EventListenerStub
‘s constructor. Reference queues allow the garbage collector to inform us when the objects in the soft references are actually released. The exact details differ for each kind of reference, but generally the garbage collector will put released references into the queue for us. We simply have to poll the queue periodically to be notified. Using reference queues allow us to be very efficient at detecting garbage collection, it saves us from having to test every listener reference to do a proper clean-up.
The event bus now holds two reference queues for garbage. One queue for typed listeners and the other for untyped. The queues are passed into the EventListenerStub
subclasses along with each listener during registration. Finally, each event delivery now finishes with a call to the bus’ cleanupGarbage()
method which polls the queues and removes anything found there. Although cleanupGarbage()
returns very quickly when queues are empty, you could probably come up with an even better approach than checking every time.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
public class EventBus { private final ReferenceQueue<? super EventListener> garbageTypedListeners = new ReferenceQueue<EventListener>(); private final ReferenceQueue<UntypedEventListener> garbageUntypedListeners = new ReferenceQueue<UntypedEventListener>(); ... public <T extends EventListener> void addListener(Class<T> eventListenerClass, EventFilter filter, T listener) { ... list.getList().add(new EventListenerStub.Typed<T>(filter, listener, garbageTypedListeners)); } protected void removeTypedListenerReference(Reference<?> reference) { ... } public void addListener(EventFilter filter, UntypedEventListener listener) { untypedListeners.add(new EventListenerStub.Untyped(filter, listener, garbageUntypedListeners)); } protected void removeUntypedListenerReference(Reference<?> reference) { ... } protected <T extends EventListener> void deliverEvent(Event<T> event) throws Throwable { ... cleanupGarbage(); } protected void cleanupGarbage() { Reference<?> reference; while ((reference = garbageTypedListeners.poll()) != null) { removeTypedListenerReference(reference); } while ((reference = garbageUntypedListeners.poll()) != null) { removeUntypedListenerReference(reference); } } ... } |
Here’s an example that shows soft references surviving a forced garbage collection (System.gc()
), but not a true memory exhaustion (allocating 100 GB or RAM). This example was run with the following configuration:
- Sun/Oracle’s JDK 1.6.0.10 running in Eclipse 3.5
- Windows XP Professional, Service Pack 3
- 3 GB or RAM
- Swap file disabled
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
public class GarbageCollectionExample { public static void main(String[] args) throws Throwable { GarbageCollectionExample eventSource = new GarbageCollectionExample(); Location store = new Location(); EventBus eventBus = new EventBus(); eventBus.addListener(WalkListener.class, null, new WalkListener() { public void walkTo(Location location) { System.out.println("walking..."); } }); WalkListener publisher = eventBus.getPublisher(eventSource, WalkListener.class); publisher.walkTo(store); // Force GC. Should remove weak references. // Soft references should not be affected System.gc(); publisher.walkTo(store); // Allocate memory until out-of-memory // garbage collector remove soft references List<byte[]> bytes = new ArrayList<byte[]>(10000); try { for (int i = 0; i < 10000; i++) { bytes.add(new byte[10 * 1024 * 1024]); // allocate 10MB blocks } } catch (Throwable e) { e.printStackTrace(); } publisher.walkTo(store); Thread.sleep(2000L); eventBus.shutdown(); System.out.println("The End."); } } |
Thread-local Events
In the Untyped Listeners section we saw how to implement a bus wide listener. This gave us access to the raw Event
object containing things like the event source, listener interface, and invoked method. Unfortunately, this forced us into choice. Either we listen for typed events and used whatever parameters the listener interface defines or go untyped and have the full Event
object. Going untyped means extra filtering coding to select the exact event we’re interested in. Fortunately, we don’t have to make that choice. We can continue to work with typed listeners and access the full event using a thread-local when needed.
Thread-locals
Thread-locals allow us attach values to the current thread then access those values from any method running on that thread. This saves us having to modify methods to explicitly pass values around. For the event bus, this means linking the Event
object to the delivery thread which we can then access anywhere, including inside a typed listener.
The event bus contains all of the changes:
- A new thread-local field for the event (
currentEvent
) - A getter method for use anywhere (as long as it’s in the delivery thread)
- Setting and removing the current event during delivery
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public class EventBus { private final ThreadLocal<Event<?>> currentEvent = new ThreadLocal<Event<?>>(); ... public Event<?> getCurrentEvent() { return currentEvent.get(); } protected <T extends EventListener> void deliverEvent(Event<T> event) throws Throwable { try { currentEvent.set(event); // Deliver to typed listeners TypedListenerList<T> list = getTypedListenerList(event.getEventListenerClass()); if (list != null) { for (EventListenerStub<?> stub : list.getList()) { stub.deliverEvent(this, event); } } // Deliver to untyped listeners for (EventListenerStub<UntypedEventListener> stub : untypedListeners) { stub.deliverEvent(this, event); } cleanupGarbage(); } finally { currentEvent.remove(); } } } |
This example uses the thread-local’s event to display the name of the invoked method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class ThreadLocalExample { public static void main(String[] args) throws Throwable { ThreadLocalExample eventSource = new ThreadLocalExample(); Location store = new Location(); final EventBus eventBus = new EventBus(); eventBus.addListener(WalkListener.class, null, new WalkListener() { public void walkTo(Location location) { System.out.println("called " + eventBus.getCurrentEvent().getMethod().getName()); } }); WalkListener publisher = eventBus.getPublisher(eventSource, WalkListener.class); publisher.walkTo(store); Thread.sleep(2000L); eventBus.shutdown(); System.out.println("The End."); } } |
Next Time
That’s a wrap. We’ve covered a lot of ground in this blog. Hopefully, I left you with a few ideas you can use in your own event bus or applications. Do you have a question or suggestion to improve some of the ideas we’ve discussed? Leave a comment, I’d like to hear about it.
In part 3 of the Event Bus Series, we’ll discuss implementing the event bus using GWT (Google Web Toolkit) Generators. Stay tuned.
Download
The event bus download contains the entire source code (including Eclipse project). Alternatively, you can view the project on GitHub. The source code is licensed under the terms of the Apache License, Version 2.0.
If you enjoyed this, I’ve got an exception tracking tool for Java coming out. Sign-up at StackHunter.com to be notified when it does.
You should consider putting the code up on github so others can browse/contribute to it (I’d be willing to help you with this if you’d like). Also since this is generally useful library code it’d also be useful if you were to publish it to a centralized maven repository so that others can use it more easily.
Brandon – I really like your ideas to host the event bus as an open source project and distribute it using Maven.
GitHub and Git are new to me, but I’ve heard good things about both so I’m looking forward to learning them. I’m going to start on this today, but it may take me a couple days set things up.
Of course your help would be greatly appreciated and I’ll email you to discuss.
Thanks for your suggestions and offer.
Dele
Sounds great, email away. I played a little bit with maven earlier today, seemed really easy to get your code building in it.
I agree with Brandon. This code is very nice and useful. People will benefit much more from it if you publish it to a centralized maven repo.
Thanks for your suggestion Otavio, I’ll post back here when the repo is ready.
Otavio – we’ll post to a central repo once unit tests are added. In the mean time the code’s available on GitHub (thanks to Brandon) :
https://github.com/NorthConcepts/event-bus
We’ve also created a development repo at https://104.131.41.115/maven3. You can pull it by adding the following to your pom.xml:
<dependencies>
<dependency>
<groupId>com.northconcepts.eventbus</groupId>
<artifactId>NorthConcepts-EventBus</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>northconcepts</id>
<url>https://104.131.41.115/maven3</url>
</repository>
</repositories>
Cheers,
Dele
Nice work! This looks like pretty neat.
open source jdon framework is a Event framework based on Disruptor, it use annotation for event bus. http://www.jdon.org
Pingback: How to build a simple GWT event bus using Generators | North Concepts