Use an EventBus
            Updated: Jul 13, 2023 
        
        
        
    This example shows how to use Data Pipeline's EventBus class. It is a delivery service for applications wishing to publish and subscribe to events. More detailed information can be found in User Guide.
EventBus
The EventBus is an asynchronous, in-memory, event delivery service. When used with EventBusReader and EventBusWriter, the bus allows pipelines to be loosely coupled and used in one-to-many (publish-subscribe) scenarios. It handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shutdown.
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.eventbus;
import java.util.Arrays;
import com.northconcepts.datapipeline.eventbus.Event;
import com.northconcepts.datapipeline.eventbus.EventBus;
import com.northconcepts.datapipeline.eventbus.UntypedEventListener;
import com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener;
import com.northconcepts.datapipeline.examples.userguide.eventbus.User;
public class UseAnEventBus {
    public static void main(String[] args) throws Throwable {
        EventBus bus = new EventBus();
        // register type-specific event listener 1
        bus.addListener(IUserListener.class, new IUserListener(){
            @Override
            public void userAdded(User user) {
                System.out.println("1: user added " + user);
            }
        });
        // register type-specific event listener 2
        bus.addListener(IUserListener.class, new IUserListener(){
            @Override
            public void userAdded(User user) {
                System.out.println("2: user added " + user);
            }
        });
        // register untyped event listener
        bus.addListener(new UntypedEventListener() {
            
            @Override
            public void onEvent(Event> event) {
                System.out.println("3: " + event + " -- " + Arrays.asList(event.getArguments()));
            }
        });
        // get event publisher
        IUserListener publisher = bus.getPublisher(bus, IUserListener.class);
        // publish
        publisher.userAdded(new User("bob@example.com"));
        publisher.userAdded(new User("carol@abc.org"));
        bus.shutdown();
    }
}
Code Walkthrough
- EventBus instance is created.
- Two subscribers are added to the bus using IUserListener. To do that, addListener()method is invoked, and the implementation ofuserAdded(User user)is written in the IUserListener instance.
- Next, another subscriber (i.e. untyped event listener) is added to the bus. UntypedEventListener is used here and its onEvent()method is overridden.
- A publisher is then created to enable sending a notification when needed.
- Two users are added and corresponding listeners are notified. As the implementation involves console output, the messages are printed in the console in the defined order.
- bus.shutdown()terminates the bus gracefully, allowing queued events up to 30 seconds to be delivered.
Console Output
1: user added User [email=bob@example.com] 2: user added User [email=bob@example.com] 3: com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener.userAdded -- [User [email=bob@example.com]] 1: user added User [email=carol@abc.org] 2: user added User [email=carol@abc.org] 3: com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener.userAdded -- [User [email=carol@abc.org]] 3: com.northconcepts.datapipeline.eventbus.ShutdownListener.onShuttingDown -- [com.northconcepts.datapipeline.eventbus.EventBus@484b54c5]

