Convert a Single Source DataReader into Many

In this example we are going to see how we can use DataPipeline to read data from a single source and write it to multiple destinations.

We are going to read data from an .avro file and write it to both a .json and .xml file simultaneously.

We can use the same concept with different scenarios -- for example, to read a Word file and write to a PDF and XML file.

Java code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.avro.AvroReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.json.SimpleJsonWriter;
import com.northconcepts.datapipeline.multiplex.SplitWriter;
import com.northconcepts.datapipeline.xml.SimpleXmlWriter;

public class ConvertASingleSourceDataReaderIntoMany {

    public static void main(String[] args) {
        SplitWriter splitWriter = new SplitWriter();
        
        DataReader reader1 = splitWriter.createReader();
        DataReader reader2 = splitWriter.createReader();
        
        DataWriter target1 = new SimpleXmlWriter(new File("example/data/output/simple-xml-splitwriter.xml"))
                .setPretty(true);
        
        DataWriter target2 = new SimpleJsonWriter(new File("example/data/output/simple-json-splitwriter.json"))
                .setPretty(true);
        
        Job.runAsync(reader1, target1);
        Job.runAsync(reader2, target2);
        
        DataReader source = new AvroReader(new File("example/data/input/twitter.avro"));
        Job.run(source, splitWriter);
    }
}

Code walkthrough

  1. The SplitWriter class is used to convert a single source DataReader into many downstream sources.
  2. Using the splitWriter enables us to create multiple DataReaders (i.e. reader1 and reader2) that we can bundle together and write to them once using a job as we are going to see below.
  3. We then create a SimpleXmlWriter and SimpleJsonWriter that corresponds to the output file simple-xml-splitwriter.xml and simple-json-splitwriter.json respectively.
  4. The Job.runAsync(reader1, target1) and Job.runAsync(reader2, target2) methods are then run in separate threads while waiting for the input source file.
  5. An AvroReader is then created corresponding to the input file twitter.avro.
  6. Lastly Data is transferred from the AvroReader to both output files via Job.run() method.

  7. Running the code will create two files with the following outputs.

    XML Output

    <?xml version="1.0" encoding="ISO-8859-1"?>
    <records>
     <record>
        <field name="username">miguno </field >
        <field name="tweet">Rock: Nerf paper, scissors is fine.</field >
        <field name="timestamp">1366150681</field >
      </record>
    
      <record>
        <field name="username">BlizzardCS</field >
        <field name="tweet">Works as intended.  Terran is IMBA.r</field >
        <field name="timestamp">1366154481</field >
      </record>
    </records>
    

    JSON Output

    [
      {
        "username":"miguno"
        ,"tweet":"Rock: Nerf paper, scissors is fine."
        ,"timestamp":1366150681
      }
      ,{
        "username":"BlizzardCS"
        ,"tweet":"Works as intended.  Terran is IMBA."
        ,"timestamp":1366154481
      }
    ]
    
Mobile Analytics