Read and Write to an EventBus
Updated: Feb 21, 2022
package com.northconcepts.datapipeline.examples.cookbook;
import java.io.File;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.eventbus.EventBus;
import com.northconcepts.datapipeline.eventbus.EventBusReader;
import com.northconcepts.datapipeline.eventbus.EventBusWriter;
import com.northconcepts.datapipeline.job.Job;
public class ReadWriteEventBus {
public static void main(String[] args) throws Throwable {
DataReader reader;
DataWriter writer;
final String PURCHASES_TOPIC = "purchases";
EventBus bus = new EventBus().setName("Purchases Bus");
// read purchases from bus, write purchases to console
reader = new EventBusReader(bus, PURCHASES_TOPIC);
writer = new StreamWriter(System.out);
Job.runAsync(reader, writer);
// read CSV file, write purchases to bus (for above reader to consume)
reader = new CSVReader(new File("example/data/input/purchases.csv"))
.setFieldNamesInFirstRow(true);
writer = new EventBusWriter(bus, PURCHASES_TOPIC);
Job.run(reader, writer);
bus.shutdown();
}
}
