Use an EventBus in a Pipeline

Updated: Jul 13, 2023

This example uses DataPipeline's EventBus class in a pipeline to process data in a loosely coupled manner. The EventBus class is a delivery service for applications wishing to publish and subscribe to events. More detailed information about the EventBus can be found in the user guide.

EventBus

EventBus is an asynchronous, in-memory, event delivery service. When used with EventBusReader and EventBusWriter, the bus allows pipelines to be loosely coupled and used in one-to-many (publish-subscribe) scenarios. The bus handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shutdown.

 

Input CSV File

year,month,product_name,unit_price,qty,total
2014,4,Action Figure,5.99,5,29.95
2015,6,Teddy Bear,9.99,5,49.95
2014,5,Teddy Bear,9.99,2,19.98
...

 

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.eventbus;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.eventbus.EventBus;
import com.northconcepts.datapipeline.eventbus.EventBusReader;
import com.northconcepts.datapipeline.eventbus.EventBusWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.xml.SimpleXmlWriter;

public class UseAnEventBusInAPipeline {

    public static void main(String[] args) throws Throwable {
        DataReader reader ;
        DataWriter writer;

        final String PURCHASES_TOPIC = "purchases";

        EventBus bus = new EventBus().setName("App1 Bus");

        // read bus, write purchases to console
        reader = new EventBusReader(bus, PURCHASES_TOPIC);
        writer = new StreamWriter(System.out);
        Job.runAsync(reader, writer);

        // read bus, write purchases to file
        reader = new EventBusReader(bus, PURCHASES_TOPIC);
        writer = new SimpleXmlWriter(new File("example/data/output/purchases.xml"))
                .setPretty(true);
        Job.runAsync(reader, writer);

        // write purchases to bus
        reader = new CSVReader(new File("example/data/input/purchases.csv"))
                .setFieldNamesInFirstRow(true);
        writer = new EventBusWriter(bus, PURCHASES_TOPIC);
        Job.run(reader, writer);

        bus.shutdown();
    }
}

 

Code Walkthrough

  1. EventBus is created with a name App1 Bus.
  2. EventBusReader is created to pull records from the bus to a pipeline. It is created with a topic name purchases and this topic name is used later when writing records to the bus.
  3. Data is transferred from EventBusReader to StreamWriter via Job.runAsync() method. The reason Job.runAsync() is used instead of Job.run() is the need for executing the job in a new thread so that it doesn't block the current thread from running other jobs. Read more about running asynchronous Jobs.
  4. Next, data is again transferred from EventBusReader to the output XML file purchases.xml via SimpleXmlWriter and Job.runAsync() method.
  5. CSVReader is created corresponding to an input file purchases.csv.
  6. EventBusWriter is created to send records from any pipeline to an event bus. In this example, records from the input CSV file are going to be written to the event bus. Notice how it uses the same topic name as EventBusReader.
  7. Data is transferred from CSVReader to EventBusWriter via Job.run() method. See how to compile and run data pipeline jobs.
  8. bus.shutdown() terminates the bus gracefully, allowing queued events up to 30 seconds to be delivered.

 

EventBusReader

Pulls records from an event bus into your pipeline. It extends DataReader class and its constructor takes the source event bus, plus zero or more topics to read from. If the topic is null or missing, the reader will read all records regardless of their topic.

 

EventBusWriter

Send records from any pipeline to an event bus. When constructing an EventBusWriter, you need to supply it with the event bus you are targeting, plus a topic. The topic can be a string, enum, or other object and is used by the bus to distinguish one stream of records from another.

 

CSVReader

Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true to method setFieldNamesInFirstRow() in this class enables the CSVReader to use the names specified in the first row of the input data as field names.

 

Output XML File

<?xml version='1.0' encoding='UTF-8'?>
<records>
  <record>
    <field name="year">2014</field>
    <field name="month">4</field>
    <field name="product_name">Action Figure</field>
    <field name="unit_price">5.99</field>
    <field name="qty">5</field>
    <field name="total">29.95</field>
  </record>
  <record>
    <field name="year">2015</field>
    <field name="month">6</field>
    <field name="product_name">Teddy Bear</field>
    <field name="unit_price">9.99</field>
    <field name="qty">5</field>
    <field name="total">49.95</field>
  </record>
  <record>
    <field name="year">2014</field>
    <field name="month">5</field>
    <field name="product_name">Teddy Bear</field>
    <field name="unit_price">9.99</field>
    <field name="qty">2</field>
    <field name="total">19.98</field>
  </record>
...
</records>

 

Console Output

-----------------------------------------------
0 - Record {
    0:[year]:STRING=[2014]:String
    1:[month]:STRING=[4]:String
    2:[product_name]:STRING=[Action Figure]:String
    3:[unit_price]:STRING=[5.99]:String
    4:[qty]:STRING=[5]:String
    5:[total]:STRING=[29.95]:String
}

-----------------------------------------------
1 - Record {
    0:[year]:STRING=[2015]:String
    1:[month]:STRING=[6]:String
    2:[product_name]:STRING=[Teddy Bear]:String
    3:[unit_price]:STRING=[9.99]:String
    4:[qty]:STRING=[5]:String
    5:[total]:STRING=[49.95]:String
}

-----------------------------------------------
2 - Record {
    0:[year]:STRING=[2014]:String
    1:[month]:STRING=[5]:String
    2:[product_name]:STRING=[Teddy Bear]:String
    3:[unit_price]:STRING=[9.99]:String
    4:[qty]:STRING=[2]:String
    5:[total]:STRING=[19.98]:String
}
...
Mobile Analytics