Read and Write to an EventBus

Updated: Feb 21, 2022
package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.eventbus.EventBus;
import com.northconcepts.datapipeline.eventbus.EventBusReader;
import com.northconcepts.datapipeline.eventbus.EventBusWriter;
import com.northconcepts.datapipeline.job.Job;

public class ReadWriteEventBus {

    public static void main(String[] args) throws Throwable {
        DataReader reader;
        DataWriter writer;

        final String PURCHASES_TOPIC = "purchases";

        EventBus bus = new EventBus().setName("Purchases Bus");
        
        // read purchases from bus, write purchases to console
        reader = new EventBusReader(bus, PURCHASES_TOPIC); 
        writer = new StreamWriter(System.out);
        Job.runAsync(reader, writer);
        
        // read CSV file, write purchases to bus (for above reader to consume)
        reader = new CSVReader(new File("example/data/input/purchases.csv"))
                .setFieldNamesInFirstRow(true);
        writer = new EventBusWriter(bus, PURCHASES_TOPIC);
        Job.run(reader, writer);

        bus.shutdown();
    }
}

Mobile Analytics