Use an EventBus

This example shows how to use Data Pipeline's EventBus class. It is a delivery service for applications wishing to publish and subscribe to events. More detailed information can be found in User Guide.

 

EventBus

The EventBus is an asynchronous, in-memory, event delivery service. When used with EventBusReader and EventBusWriter, the bus allows pipelines to be loosely coupled and used in one-to-many (publish-subscribe) scenarios. It handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shutdown.

 

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.eventbus;

import java.util.Arrays;

import com.northconcepts.datapipeline.eventbus.Event;
import com.northconcepts.datapipeline.eventbus.EventBus;
import com.northconcepts.datapipeline.eventbus.UntypedEventListener;
import com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener;
import com.northconcepts.datapipeline.examples.userguide.eventbus.User;

public class UseAnEventBus {

    public static void main(String[] args) throws Throwable {
        EventBus bus = new EventBus();

        // register type-specific event listener 1
        bus.addListener(IUserListener.class, new IUserListener(){
            @Override
            public void userAdded(User user) {
                System.out.println("1: user added " + user);
            }
        });

        // register type-specific event listener 2
        bus.addListener(IUserListener.class, new IUserListener(){
            @Override
            public void userAdded(User user) {
                System.out.println("2: user added " + user);
            }
        });

        // register untyped event listener
        bus.addListener(new UntypedEventListener() {
            
            @Override
            public void onEvent(Event event) {
                System.out.println("3: " + event + " -- " + Arrays.asList(event.getArguments()));
            }
        });

        // get event publisher
        IUserListener publisher = bus.getPublisher(bus, IUserListener.class);

        // publish
        publisher.userAdded(new User("bob@example.com"));
        publisher.userAdded(new User("carol@abc.org"));

        bus.shutdown();

    }
}

 

Code Walkthrough

  1. EventBus instance is created.
  2. Two subscribers are added to the bus using IUserListener. To do that, addListener() method is invoked, and the implementation of userAdded(User user) is written in the IUserListener instance.
  3. Next, another subscriber (i.e. untyped event listener) is added to the bus. UntypedEventListener is used here and its onEvent() method is overridden.
  4.  A publisher is then created to enable sending a notification when needed. 
  5. Two users are added and corresponding listeners are notified. As the implementation involves console output, the messages are printed in the console in the defined order.
  6. bus.shutdown() terminates the bus gracefully, allowing queued events up to 30 seconds to be delivered.

 

Console Output

1: user added User [email=bob@example.com]
2: user added User [email=bob@example.com]
3: com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener.userAdded -- [User [email=bob@example.com]]
1: user added User [email=carol@abc.org]
2: user added User [email=carol@abc.org]
3: com.northconcepts.datapipeline.examples.userguide.eventbus.IUserListener.userAdded -- [User [email=carol@abc.org]]
3: com.northconcepts.datapipeline.eventbus.ShutdownListener.onShuttingDown -- [com.northconcepts.datapipeline.eventbus.EventBus@484b54c5]

 

Mobile Analytics