Create Custom Pipeline Output
Updated: Jun 23, 2023
This example provides a flexible and customizable data pipeline with the ability to define a custom output class. By creating a custom output class, users can tailor the behavior of the pipeline's output, including how data is parsed as a record or XML, and how it is reparsed from the record or XML format. This approach proves advantageous when aiming to exert precise control over the serialization and deserialization of objects in diverse formats.
Input CSV
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C
Java Code
package com.northconcepts.datapipeline.foundations.examples.pipeline; import org.w3c.dom.Document; import org.w3c.dom.Element; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.foundations.file.LocalFileSource; import com.northconcepts.datapipeline.foundations.pipeline.Pipeline; import com.northconcepts.datapipeline.foundations.pipeline.PipelineOutput; import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput; import com.northconcepts.datapipeline.internal.lang.Util; public class CreateCustomPipelineOutput { public static void main(String[] args) { CsvPipelineInput pipelineInput = new CsvPipelineInput() .setFileSource(new LocalFileSource().setPath("example/data/input/credit-balance-01.csv")) .setFieldNamesInFirstRow(true); CustomPipelineOutput pipelineOutput = new CustomPipelineOutput(); Pipeline pipeline = new Pipeline() .setInput(pipelineInput) .setOutput(pipelineOutput); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); Record record = pipeline.toRecord(); System.out.println(record); System.out.println("---------------------------------------------------------------------------------------------------------"); pipeline = new Pipeline().fromRecord(record); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); System.out.println("Pipeline as JSON:"); System.out.println(Util.formatJson(pipeline.toJson())); System.out.println("---------------------------------------------------------------------------------------------------------"); pipeline = new Pipeline().fromXml(pipeline.toXml()); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); System.out.println("Pipeline as XML:"); System.out.println(pipeline.toXml()); } public static class CustomPipelineOutput extends PipelineOutput { @Override public DataWriter createDataWriter() { return new ConsoleWriter(); } @Override public String getName() { return "CustomPipelineOutput"; } @Override public Record toRecord() { return super.toRecord(); } @Override public PipelineOutput fromRecord(Record source) { super.fromRecord(source); return this; } @Override public Element toXmlElement(Document document) { return super.toXmlElement(document); } @Override public CustomPipelineOutput fromXmlElement(Element element) { return this; } } public static class ConsoleWriter extends DataWriter { @Override protected void writeImpl(Record record) throws Throwable { System.out.println(record); } } }
Code Walkthrough
To illustrate the concept, a CustomPipelineOutput
class is introduced, which extends the PipelineOutput class.
-
- Within this class, specific data transformation methods are overridden to customize the serialization and deserialization processes for specific formats.
- ConsoleWriter instance is used as a DataWriter for this PipelineOutput. According to its implementation, it prints the records on the console.
- "CustomPipelineOutput" text is returned as the name of this object.
- In the overridden
toRecord
,fromRecord
andtoXmlElement
methods, the corresponding parent methods are called.
- CsvPipelineInput object is instantiated to facilitate reading from an input file
credit-balance-01.csv
. - An instance of the CustomPipelineOutput class is created.
- Two objects specified in the previous steps are respectively assigned as the input and output for a Pipeline.
- The pipeline is executed with
run()
method. As mentioned above, ConsoleWriter is used as DataWriter and hence, records are printed in the console. - Next,
toRecord()
method of the pipeline is called. It returns the pipeline as a single Record instance. As can be seen in the console, the Record has input (CsvPipelineInput class) and output (CreateCustomPipelineOutput$CustomPipelineOutput class). - A new Pipeline instance is created from the Record object in the previous step. As all the properties are saved in the pipeline, the same result as Step 4 is printed in the console.
- The pipeline instance is converted to JSON format and displayed in the console.
- In the next step, the Pipeline instance is parsed into XML and recovered from that XML format. The Records which are returned when the pipeline runs are printed in the console.
- The pipeline is printed in the XML-formatted version.
Console Output
16:17:27,127 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. 16:17:27,235 DEBUG [main] datapipeline:615 - Job[1,job-1,Mon Jun 19 16:17:27 EEST 2023]::Start Record { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:STRING=[9315.45]:String 4:[CreditLimit]:STRING=[10000.00]:String 5:[AccountCreated]:STRING=[1/17/1998]:String 6:[Rating]:STRING=[A]:String } Record { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:STRING=[90.00]:String 4:[CreditLimit]:STRING=[1000.00]:String 5:[AccountCreated]:STRING=[8/6/2003]:String 6:[Rating]:STRING=[B]:String } Record { 0:[Account]:STRING=[868]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:STRING=[0]:String 4:[CreditLimit]:STRING=[17000.00]:String 5:[AccountCreated]:STRING=[5/25/1985]:String 6:[Rating]:STRING=[B]:String } Record { 0:[Account]:STRING=[761]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:STRING=[49654.87]:String 4:[CreditLimit]:STRING=[100000.00]:String 5:[AccountCreated]:STRING=[12/5/2006]:String 6:[Rating]:STRING=[A]:String } Record { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:STRING=[789.65]:String 4:[CreditLimit]:STRING=[5000.00]:String 5:[AccountCreated]:STRING=[2/5/2007]:String 6:[Rating]:STRING=[C]:String } 16:17:27,438 DEBUG [main] datapipeline:661 - job::Success --------------------------------------------------------------------------------------------------------- Record (MODIFIED) (has child records) { 0:[name]:UNDEFINED=[null] 1:[description]:UNDEFINED=[null] 2:[input]:RECORD=[ Record (MODIFIED) (is child record) (has child records) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput]:String 1:[saveLineage]:BOOLEAN=[false]:Boolean 2:[fieldSeparator]:STRING=[,]:String 3:[startingQuote]:STRING=["]:String 4:[endingQuote]:STRING=["]:String 5:[lineSeparators]:STRING=[\n,\r\n,\r]:String 6:[allowMultiLineText]:BOOLEAN=[false]:Boolean 7:[allowQuoteInField]:BOOLEAN=[false]:Boolean 8:[trimFields]:BOOLEAN=[true]:Boolean 9:[skipEmptyRows]:BOOLEAN=[false]:Boolean 10:[charset]:STRING=[UTF-8]:String 11:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean 12:[fileSource]:RECORD=[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSource]:String 1:[name]:UNDEFINED=[null] 2:[path]:STRING=[example/data/input/credit-balance-01.csv]:String }]:Record }]:Record 3:[sourceEntity]:UNDEFINED=[null] 4:[targetEntity]:UNDEFINED=[null] 5:[output]:RECORD=[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput]:String }]:Record 6:[multithreaded]:BOOLEAN=[true]:Boolean 7:[actions]:ARRAY of UNDEFINED=[[]]:ArrayValue } --------------------------------------------------------------------------------------------------------- 16:17:27,471 DEBUG [main] datapipeline:615 - Job[2,job-2,Mon Jun 19 16:17:27 EEST 2023]::Start Record { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:STRING=[9315.45]:String 4:[CreditLimit]:STRING=[10000.00]:String 5:[AccountCreated]:STRING=[1/17/1998]:String 6:[Rating]:STRING=[A]:String } Record { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:STRING=[90.00]:String 4:[CreditLimit]:STRING=[1000.00]:String 5:[AccountCreated]:STRING=[8/6/2003]:String 6:[Rating]:STRING=[B]:String } Record { 0:[Account]:STRING=[868]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:STRING=[0]:String 4:[CreditLimit]:STRING=[17000.00]:String 5:[AccountCreated]:STRING=[5/25/1985]:String 6:[Rating]:STRING=[B]:String } Record { 0:[Account]:STRING=[761]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:STRING=[49654.87]:String 4:[CreditLimit]:STRING=[100000.00]:String 5:[AccountCreated]:STRING=[12/5/2006]:String 6:[Rating]:STRING=[A]:String } Record { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:STRING=[789.65]:String 4:[CreditLimit]:STRING=[5000.00]:String 5:[AccountCreated]:STRING=[2/5/2007]:String 6:[Rating]:STRING=[C]:String } 16:17:27,474 DEBUG [main] datapipeline:661 - job::Success --------------------------------------------------------------------------------------------------------- Pipeline as JSON: { "name" : null, "description" : null, "input" : { "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput", "saveLineage" : false, "fieldSeparator" : ",", "startingQuote" : "\"", "endingQuote" : "\"", "lineSeparators" : "\\n,\\r\\n,\\r", "allowMultiLineText" : false, "allowQuoteInField" : false, "trimFields" : true, "skipEmptyRows" : false, "charset" : "UTF-8", "fieldNamesInFirstRow" : true, "fileSource" : { "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSource", "name" : null, "path" : "example/data/input/credit-balance-01.csv" } }, "sourceEntity" : null, "targetEntity" : null, "output" : { "__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput" }, "multithreaded" : true, "actions" : [ ] } --------------------------------------------------------------------------------------------------------- 16:17:27,915 DEBUG [main] datapipeline:615 - Job[3,job-3,Mon Jun 19 16:17:27 EEST 2023]::Start Record { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:STRING=[9315.45]:String 4:[CreditLimit]:STRING=[10000.00]:String 5:[AccountCreated]:STRING=[1/17/1998]:String 6:[Rating]:STRING=[A]:String } Record { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:STRING=[90.00]:String 4:[CreditLimit]:STRING=[1000.00]:String 5:[AccountCreated]:STRING=[8/6/2003]:String 6:[Rating]:STRING=[B]:String } Record { 0:[Account]:STRING=[868]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:STRING=[0]:String 4:[CreditLimit]:STRING=[17000.00]:String 5:[AccountCreated]:STRING=[5/25/1985]:String 6:[Rating]:STRING=[B]:String } Record { 0:[Account]:STRING=[761]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:STRING=[49654.87]:String 4:[CreditLimit]:STRING=[100000.00]:String 5:[AccountCreated]:STRING=[12/5/2006]:String 6:[Rating]:STRING=[A]:String } Record { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:STRING=[789.65]:String 4:[CreditLimit]:STRING=[5000.00]:String 5:[AccountCreated]:STRING=[2/5/2007]:String 6:[Rating]:STRING=[C]:String } 16:17:27,917 DEBUG [main] datapipeline:661 - job::Success --------------------------------------------------------------------------------------------------------- Pipeline as XML: <pipeline multithreaded="true"> <pipeline-input __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" allowMultiLineText="false" allowQuoteInField="false" charset="UTF-8" endingQuote=""" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" saveLineage="false" skipEmptyRows="false" startingQuote=""" trimFields="true"> <file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/credit-balance-01.csv"/> </pipeline-input> <pipeline-output __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput"/> </pipeline>