Use an EventBus
Updated: Jul 13, 2023
This example shows how to use Data Pipeline's EventBus class. It is a delivery service for applications wishing to publish and subscribe to events. More detailed information can be found in User Guide.
EventBus
The EventBus is an asynchronous, in-memory, event delivery service. When used with EventBusReader and EventBusWriter, the bus allows pipelines to be loosely coupled and used in one-to-many (publish-subscribe) scenarios. It handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shutdown.
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.eventbus; import java.util.Arrays; import com.northconcepts.datapipeline.eventbus.Event; import com.northconcepts.datapipeline.eventbus.EventBus; import com.northconcepts.datapipeline.eventbus.UntypedEventListener; import com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener; import com.northconcepts.datapipeline.examples.userguide.eventbus.User; public class UseAnEventBus { public static void main(String[] args) throws Throwable { EventBus bus = new EventBus(); // register type-specific event listener 1 bus.addListener(IUserListener.class, new IUserListener(){ @Override public void userAdded(User user) { System.out.println("1: user added " + user); } }); // register type-specific event listener 2 bus.addListener(IUserListener.class, new IUserListener(){ @Override public void userAdded(User user) { System.out.println("2: user added " + user); } }); // register untyped event listener bus.addListener(new UntypedEventListener() { @Override public void onEvent(Event> event) { System.out.println("3: " + event + " -- " + Arrays.asList(event.getArguments())); } }); // get event publisher IUserListener publisher = bus.getPublisher(bus, IUserListener.class); // publish publisher.userAdded(new User("bob@example.com")); publisher.userAdded(new User("carol@abc.org")); bus.shutdown(); } }
Code Walkthrough
- EventBus instance is created.
- Two subscribers are added to the bus using IUserListener. To do that,
addListener()
method is invoked, and the implementation ofuserAdded(User user)
is written in the IUserListener instance. - Next, another subscriber (i.e. untyped event listener) is added to the bus. UntypedEventListener is used here and its
onEvent()
method is overridden. - A publisher is then created to enable sending a notification when needed.
- Two users are added and corresponding listeners are notified. As the implementation involves console output, the messages are printed in the console in the defined order.
bus.shutdown()
terminates the bus gracefully, allowing queued events up to 30 seconds to be delivered.
Console Output
1: user added User [email=bob@example.com] 2: user added User [email=bob@example.com] 3: com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener.userAdded -- [User [email=bob@example.com]] 1: user added User [email=carol@abc.org] 2: user added User [email=carol@abc.org] 3: com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener.userAdded -- [User [email=carol@abc.org]] 3: com.northconcepts.datapipeline.eventbus.ShutdownListener.onShuttingDown -- [com.northconcepts.datapipeline.eventbus.EventBus@484b54c5]