Use an EventBus in a Pipeline
This example uses DataPipeline's EventBus class in a pipeline to process data in a loosely coupled manner. The EventBus class is a delivery service for applications wishing to publish and subscribe to events. More detailed information about the EventBus can be found in the user guide.
EventBus
EventBus is an asynchronous, in-memory, event delivery service. When used with EventBusReader
and EventBusWriter
, the bus allows pipelines to be loosely coupled and used in one-to-many (publish-subscribe) scenarios. The bus handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shutdown.
Input CSV File
year,month,product_name,unit_price,qty,total 2014,4,Action Figure,5.99,5,29.95 2015,6,Teddy Bear,9.99,5,49.95 2014,5,Teddy Bear,9.99,2,19.98 ...
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.eventbus; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.eventbus.EventBus; import com.northconcepts.datapipeline.eventbus.EventBusReader; import com.northconcepts.datapipeline.eventbus.EventBusWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.xml.SimpleXmlWriter; public class UseAnEventBusInAPipeline { public static void main(String[] args) throws Throwable { DataReader reader ; DataWriter writer; final String PURCHASES_TOPIC = "purchases"; EventBus bus = new EventBus().setName("App1 Bus"); // read bus, write purchases to console reader = new EventBusReader(bus, PURCHASES_TOPIC); writer = new StreamWriter(System.out); Job.runAsync(reader, writer); // read bus, write purchases to file reader = new EventBusReader(bus, PURCHASES_TOPIC); writer = new SimpleXmlWriter(new File("example/data/output/purchases.xml")) .setPretty(true); Job.runAsync(reader, writer); // write purchases to bus reader = new CSVReader(new File("example/data/input/purchases.csv")) .setFieldNamesInFirstRow(true); writer = new EventBusWriter(bus, PURCHASES_TOPIC); Job.run(reader, writer); bus.shutdown(); } }
Code Walkthrough
- EventBus is created with a name
App1 Bus
. - EventBusReader is created to pull records from the bus to a pipeline. It is created with a topic name
purchases
and this topic name is used later when writing records to the bus. - Data is transferred from EventBusReader to StreamWriter via Job.runAsync() method. The reason
Job.runAsync()
is used instead of Job.run() is the need for executing the job in a new thread so that it doesn't block the current thread from running other jobs. Read more about running asynchronous Jobs. - Next, data is again transferred from EventBusReader to the output XML file
purchases.xml
via SimpleXmlWriter andJob.runAsync()
method. - CSVReader is created corresponding to an input file
purchases.csv
. - EventBusWriter is created to send records from any pipeline to an event bus. In this example, records from the input CSV file are going to be written to the event bus. Notice how it uses the same topic name as
EventBusReader
. - Data is transferred from CSVReader to EventBusWriter via Job.run() method. See how to compile and run data pipeline jobs.
bus.shutdown()
terminates the bus gracefully, allowing queued events up to 30 seconds to be delivered.
EventBusReader
Pulls records from an event bus into your pipeline. It extends DataReader class and its constructor takes the source event bus, plus zero or more topics to read from. If the topic is null or missing, the reader will read all records regardless of their topic.
EventBusWriter
Send records from any pipeline to an event bus. When constructing an EventBusWriter, you need to supply it with the event bus you are targeting, plus a topic. The topic can be a string, enum, or other object and is used by the bus to distinguish one stream of records from another.
CSVReader
Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true
to method setFieldNamesInFirstRow() in this class enables the CSVReader to use the names specified in the first row of the input data as field names.
Output XML File
<?xml version='1.0' encoding='UTF-8'?> <records> <record> <field name="year">2014</field> <field name="month">4</field> <field name="product_name">Action Figure</field> <field name="unit_price">5.99</field> <field name="qty">5</field> <field name="total">29.95</field> </record> <record> <field name="year">2015</field> <field name="month">6</field> <field name="product_name">Teddy Bear</field> <field name="unit_price">9.99</field> <field name="qty">5</field> <field name="total">49.95</field> </record> <record> <field name="year">2014</field> <field name="month">5</field> <field name="product_name">Teddy Bear</field> <field name="unit_price">9.99</field> <field name="qty">2</field> <field name="total">19.98</field> </record>
...
</records>
Console Output
----------------------------------------------- 0 - Record { 0:[year]:STRING=[2014]:String 1:[month]:STRING=[4]:String 2:[product_name]:STRING=[Action Figure]:String 3:[unit_price]:STRING=[5.99]:String 4:[qty]:STRING=[5]:String 5:[total]:STRING=[29.95]:String } ----------------------------------------------- 1 - Record { 0:[year]:STRING=[2015]:String 1:[month]:STRING=[6]:String 2:[product_name]:STRING=[Teddy Bear]:String 3:[unit_price]:STRING=[9.99]:String 4:[qty]:STRING=[5]:String 5:[total]:STRING=[49.95]:String } ----------------------------------------------- 2 - Record { 0:[year]:STRING=[2014]:String 1:[month]:STRING=[5]:String 2:[product_name]:STRING=[Teddy Bear]:String 3:[unit_price]:STRING=[9.99]:String 4:[qty]:STRING=[2]:String 5:[total]:STRING=[19.98]:String } ...