Convert a Single Source DataReader into Many
Updated: Mar 16, 2023
In this example we are going to see how we can use DataPipeline to read data from a single source and write it to multiple destinations.
We are going to read data from an .avro file and write it to both a .json and .xml file simultaneously.
We can use the same concept with different scenarios -- for example, to read a Word file and write to a PDF and XML file.
Java code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.avro.AvroReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.json.SimpleJsonWriter; import com.northconcepts.datapipeline.multiplex.SplitWriter; import com.northconcepts.datapipeline.xml.SimpleXmlWriter; public class ConvertASingleSourceDataReaderIntoMany { public static void main(String[] args) { SplitWriter splitWriter = new SplitWriter(); DataReader reader1 = splitWriter.createReader(); DataReader reader2 = splitWriter.createReader(); DataWriter target1 = new SimpleXmlWriter(new File("example/data/output/simple-xml-splitwriter.xml")) .setPretty(true); DataWriter target2 = new SimpleJsonWriter(new File("example/data/output/simple-json-splitwriter.json")) .setPretty(true); Job.runAsync(reader1, target1); Job.runAsync(reader2, target2); DataReader source = new AvroReader(new File("example/data/input/twitter.avro")); Job.run(source, splitWriter); } }
Code walkthrough
- The SplitWriter class is used to convert a single source DataReader into many downstream sources.
- Using the splitWriter enables us to create multiple DataReaders (i.e.
reader1
andreader2
) that we can bundle together and write to them once using a job as we are going to see below. - We then create a SimpleXmlWriter and SimpleJsonWriter that corresponds to the output file
simple-xml-splitwriter.xml
andsimple-json-splitwriter.json
respectively. - The
Job.runAsync(reader1, target1)
andJob.runAsync(reader2, target2)
methods are then run in separate threads while waiting for the input source file. - An AvroReader is then created corresponding to the input file
twitter.avro
. - Lastly Data is transferred from the AvroReader to both output files via Job.run() method.
Running the code will create two files with the following outputs.
XML Output
<?xml version="1.0" encoding="ISO-8859-1"?> <records> <record> <field name="username">miguno </field > <field name="tweet">Rock: Nerf paper, scissors is fine.</field > <field name="timestamp">1366150681</field > </record> <record> <field name="username">BlizzardCS</field > <field name="tweet">Works as intended. Terran is IMBA.r</field > <field name="timestamp">1366154481</field > </record> </records>
JSON Output
[ { "username":"miguno" ,"tweet":"Rock: Nerf paper, scissors is fine." ,"timestamp":1366150681 } ,{ "username":"BlizzardCS" ,"tweet":"Works as intended. Terran is IMBA." ,"timestamp":1366154481 } ]