Convert a Single Source DataReader into Many
Updated: Mar 16, 2023
In this example we are going to see how we can use DataPipeline to read data from a single source and write it to multiple destinations.
We are going to read data from an .avro file and write it to both a .json and .xml file simultaneously.
We can use the same concept with different scenarios -- for example, to read a Word file and write to a PDF and XML file.
Java code listing
package com.northconcepts.datapipeline.examples.cookbook;
import java.io.File;
import com.northconcepts.datapipeline.avro.AvroReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.json.SimpleJsonWriter;
import com.northconcepts.datapipeline.multiplex.SplitWriter;
import com.northconcepts.datapipeline.xml.SimpleXmlWriter;
public class ConvertASingleSourceDataReaderIntoMany {
public static void main(String[] args) {
SplitWriter splitWriter = new SplitWriter();
DataReader reader1 = splitWriter.createReader();
DataReader reader2 = splitWriter.createReader();
DataWriter target1 = new SimpleXmlWriter(new File("example/data/output/simple-xml-splitwriter.xml"))
.setPretty(true);
DataWriter target2 = new SimpleJsonWriter(new File("example/data/output/simple-json-splitwriter.json"))
.setPretty(true);
Job.runAsync(reader1, target1);
Job.runAsync(reader2, target2);
DataReader source = new AvroReader(new File("example/data/input/twitter.avro"));
Job.run(source, splitWriter);
}
}
Code walkthrough
- The SplitWriter class is used to convert a single source DataReader into many downstream sources.
- Using the splitWriter enables us to create multiple DataReaders (i.e.
reader1andreader2) that we can bundle together and write to them once using a job as we are going to see below. - We then create a SimpleXmlWriter and SimpleJsonWriter that corresponds to the output file
simple-xml-splitwriter.xmlandsimple-json-splitwriter.jsonrespectively. - The
Job.runAsync(reader1, target1)andJob.runAsync(reader2, target2)methods are then run in separate threads while waiting for the input source file. - An AvroReader is then created corresponding to the input file
twitter.avro. - Lastly Data is transferred from the AvroReader to both output files via Job.run() method.
Running the code will create two files with the following outputs.
XML Output
<?xml version="1.0" encoding="ISO-8859-1"?>
<records>
<record>
<field name="username">miguno </field >
<field name="tweet">Rock: Nerf paper, scissors is fine.</field >
<field name="timestamp">1366150681</field >
</record>
<record>
<field name="username">BlizzardCS</field >
<field name="tweet">Works as intended. Terran is IMBA.r</field >
<field name="timestamp">1366154481</field >
</record>
</records>
JSON Output
[
{
"username":"miguno"
,"tweet":"Rock: Nerf paper, scissors is fine."
,"timestamp":1366150681
}
,{
"username":"BlizzardCS"
,"tweet":"Works as intended. Terran is IMBA."
,"timestamp":1366154481
}
]
