Create Custom Pipeline Output
Updated: Jun 23, 2023
This example provides a flexible and customizable data pipeline with the ability to define a custom output class. By creating a custom output class, users can tailor the behavior of the pipeline's output, including how data is parsed as a record or XML, and how it is reparsed from the record or XML format. This approach proves advantageous when aiming to exert precise control over the serialization and deserialization of objects in diverse formats.
Input CSV
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C
Java Code
package com.northconcepts.datapipeline.foundations.examples.pipeline;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.PipelineOutput;
import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput;
import com.northconcepts.datapipeline.internal.lang.Util;
public class CreateCustomPipelineOutput {
public static void main(String[] args) {
CsvPipelineInput pipelineInput = new CsvPipelineInput()
.setFileSource(new LocalFileSource().setPath("example/data/input/credit-balance-01.csv"))
.setFieldNamesInFirstRow(true);
CustomPipelineOutput pipelineOutput = new CustomPipelineOutput();
Pipeline pipeline = new Pipeline()
.setInput(pipelineInput)
.setOutput(pipelineOutput);
pipeline.run();
System.out.println("---------------------------------------------------------------------------------------------------------");
Record record = pipeline.toRecord();
System.out.println(record);
System.out.println("---------------------------------------------------------------------------------------------------------");
pipeline = new Pipeline().fromRecord(record);
pipeline.run();
System.out.println("---------------------------------------------------------------------------------------------------------");
System.out.println("Pipeline as JSON:");
System.out.println(Util.formatJson(pipeline.toJson()));
System.out.println("---------------------------------------------------------------------------------------------------------");
pipeline = new Pipeline().fromXml(pipeline.toXml());
pipeline.run();
System.out.println("---------------------------------------------------------------------------------------------------------");
System.out.println("Pipeline as XML:");
System.out.println(pipeline.toXml());
}
public static class CustomPipelineOutput extends PipelineOutput {
@Override
public DataWriter createDataWriter() {
return new ConsoleWriter();
}
@Override
public String getName() {
return "CustomPipelineOutput";
}
@Override
public Record toRecord() {
return super.toRecord();
}
@Override
public PipelineOutput fromRecord(Record source) {
super.fromRecord(source);
return this;
}
@Override
public Element toXmlElement(Document document) {
return super.toXmlElement(document);
}
@Override
public CustomPipelineOutput fromXmlElement(Element element) {
return this;
}
}
public static class ConsoleWriter extends DataWriter {
@Override
protected void writeImpl(Record record) throws Throwable {
System.out.println(record);
}
}
}
Code Walkthrough
To illustrate the concept, a CustomPipelineOutput class is introduced, which extends the PipelineOutput class.
-
- Within this class, specific data transformation methods are overridden to customize the serialization and deserialization processes for specific formats.
- ConsoleWriter instance is used as a DataWriter for this PipelineOutput. According to its implementation, it prints the records on the console.
- "CustomPipelineOutput" text is returned as the name of this object.
- In the overridden
toRecord,fromRecordandtoXmlElementmethods, the corresponding parent methods are called.
- CsvPipelineInput object is instantiated to facilitate reading from an input file
credit-balance-01.csv. - An instance of the CustomPipelineOutput class is created.
- Two objects specified in the previous steps are respectively assigned as the input and output for a Pipeline.
- The pipeline is executed with
run()method. As mentioned above, ConsoleWriter is used as DataWriter and hence, records are printed in the console. - Next,
toRecord()method of the pipeline is called. It returns the pipeline as a single Record instance. As can be seen in the console, the Record has input (CsvPipelineInput class) and output (CreateCustomPipelineOutput$CustomPipelineOutput class). - A new Pipeline instance is created from the Record object in the previous step. As all the properties are saved in the pipeline, the same result as Step 4 is printed in the console.
- The pipeline instance is converted to JSON format and displayed in the console.
- In the next step, the Pipeline instance is parsed into XML and recovered from that XML format. The Records which are returned when the pipeline runs are printed in the console.
- The pipeline is printed in the XML-formatted version.
Console Output
16:17:27,127 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
16:17:27,235 DEBUG [main] datapipeline:615 - Job[1,job-1,Mon Jun 19 16:17:27 EEST 2023]::Start
Record {
0:[Account]:STRING=[101]:String
1:[LastName]:STRING=[Reeves]:String
2:[FirstName]:STRING=[Keanu]:String
3:[Balance]:STRING=[9315.45]:String
4:[CreditLimit]:STRING=[10000.00]:String
5:[AccountCreated]:STRING=[1/17/1998]:String
6:[Rating]:STRING=[A]:String
}
Record {
0:[Account]:STRING=[312]:String
1:[LastName]:STRING=[Butler]:String
2:[FirstName]:STRING=[Gerard]:String
3:[Balance]:STRING=[90.00]:String
4:[CreditLimit]:STRING=[1000.00]:String
5:[AccountCreated]:STRING=[8/6/2003]:String
6:[Rating]:STRING=[B]:String
}
Record {
0:[Account]:STRING=[868]:String
1:[LastName]:STRING=[Hewitt]:String
2:[FirstName]:STRING=[Jennifer Love]:String
3:[Balance]:STRING=[0]:String
4:[CreditLimit]:STRING=[17000.00]:String
5:[AccountCreated]:STRING=[5/25/1985]:String
6:[Rating]:STRING=[B]:String
}
Record {
0:[Account]:STRING=[761]:String
1:[LastName]:STRING=[Pinkett-Smith]:String
2:[FirstName]:STRING=[Jada]:String
3:[Balance]:STRING=[49654.87]:String
4:[CreditLimit]:STRING=[100000.00]:String
5:[AccountCreated]:STRING=[12/5/2006]:String
6:[Rating]:STRING=[A]:String
}
Record {
0:[Account]:STRING=[317]:String
1:[LastName]:STRING=[Murray]:String
2:[FirstName]:STRING=[Bill]:String
3:[Balance]:STRING=[789.65]:String
4:[CreditLimit]:STRING=[5000.00]:String
5:[AccountCreated]:STRING=[2/5/2007]:String
6:[Rating]:STRING=[C]:String
}
16:17:27,438 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Record (MODIFIED) (has child records) {
0:[name]:UNDEFINED=[null]
1:[description]:UNDEFINED=[null]
2:[input]:RECORD=[
Record (MODIFIED) (is child record) (has child records) {
0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput]:String
1:[saveLineage]:BOOLEAN=[false]:Boolean
2:[fieldSeparator]:STRING=[,]:String
3:[startingQuote]:STRING=["]:String
4:[endingQuote]:STRING=["]:String
5:[lineSeparators]:STRING=[\n,\r\n,\r]:String
6:[allowMultiLineText]:BOOLEAN=[false]:Boolean
7:[allowQuoteInField]:BOOLEAN=[false]:Boolean
8:[trimFields]:BOOLEAN=[true]:Boolean
9:[skipEmptyRows]:BOOLEAN=[false]:Boolean
10:[charset]:STRING=[UTF-8]:String
11:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean
12:[fileSource]:RECORD=[
Record (MODIFIED) (is child record) {
0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSource]:String
1:[name]:UNDEFINED=[null]
2:[path]:STRING=[example/data/input/credit-balance-01.csv]:String
}]:Record
}]:Record
3:[sourceEntity]:UNDEFINED=[null]
4:[targetEntity]:UNDEFINED=[null]
5:[output]:RECORD=[
Record (MODIFIED) (is child record) {
0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput]:String
}]:Record
6:[multithreaded]:BOOLEAN=[true]:Boolean
7:[actions]:ARRAY of UNDEFINED=[[]]:ArrayValue
}
---------------------------------------------------------------------------------------------------------
16:17:27,471 DEBUG [main] datapipeline:615 - Job[2,job-2,Mon Jun 19 16:17:27 EEST 2023]::Start
Record {
0:[Account]:STRING=[101]:String
1:[LastName]:STRING=[Reeves]:String
2:[FirstName]:STRING=[Keanu]:String
3:[Balance]:STRING=[9315.45]:String
4:[CreditLimit]:STRING=[10000.00]:String
5:[AccountCreated]:STRING=[1/17/1998]:String
6:[Rating]:STRING=[A]:String
}
Record {
0:[Account]:STRING=[312]:String
1:[LastName]:STRING=[Butler]:String
2:[FirstName]:STRING=[Gerard]:String
3:[Balance]:STRING=[90.00]:String
4:[CreditLimit]:STRING=[1000.00]:String
5:[AccountCreated]:STRING=[8/6/2003]:String
6:[Rating]:STRING=[B]:String
}
Record {
0:[Account]:STRING=[868]:String
1:[LastName]:STRING=[Hewitt]:String
2:[FirstName]:STRING=[Jennifer Love]:String
3:[Balance]:STRING=[0]:String
4:[CreditLimit]:STRING=[17000.00]:String
5:[AccountCreated]:STRING=[5/25/1985]:String
6:[Rating]:STRING=[B]:String
}
Record {
0:[Account]:STRING=[761]:String
1:[LastName]:STRING=[Pinkett-Smith]:String
2:[FirstName]:STRING=[Jada]:String
3:[Balance]:STRING=[49654.87]:String
4:[CreditLimit]:STRING=[100000.00]:String
5:[AccountCreated]:STRING=[12/5/2006]:String
6:[Rating]:STRING=[A]:String
}
Record {
0:[Account]:STRING=[317]:String
1:[LastName]:STRING=[Murray]:String
2:[FirstName]:STRING=[Bill]:String
3:[Balance]:STRING=[789.65]:String
4:[CreditLimit]:STRING=[5000.00]:String
5:[AccountCreated]:STRING=[2/5/2007]:String
6:[Rating]:STRING=[C]:String
}
16:17:27,474 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Pipeline as JSON:
{
"name" : null,
"description" : null,
"input" : {
"__class__" : "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput",
"saveLineage" : false,
"fieldSeparator" : ",",
"startingQuote" : "\"",
"endingQuote" : "\"",
"lineSeparators" : "\\n,\\r\\n,\\r",
"allowMultiLineText" : false,
"allowQuoteInField" : false,
"trimFields" : true,
"skipEmptyRows" : false,
"charset" : "UTF-8",
"fieldNamesInFirstRow" : true,
"fileSource" : {
"__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSource",
"name" : null,
"path" : "example/data/input/credit-balance-01.csv"
}
},
"sourceEntity" : null,
"targetEntity" : null,
"output" : {
"__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput"
},
"multithreaded" : true,
"actions" : [ ]
}
---------------------------------------------------------------------------------------------------------
16:17:27,915 DEBUG [main] datapipeline:615 - Job[3,job-3,Mon Jun 19 16:17:27 EEST 2023]::Start
Record {
0:[Account]:STRING=[101]:String
1:[LastName]:STRING=[Reeves]:String
2:[FirstName]:STRING=[Keanu]:String
3:[Balance]:STRING=[9315.45]:String
4:[CreditLimit]:STRING=[10000.00]:String
5:[AccountCreated]:STRING=[1/17/1998]:String
6:[Rating]:STRING=[A]:String
}
Record {
0:[Account]:STRING=[312]:String
1:[LastName]:STRING=[Butler]:String
2:[FirstName]:STRING=[Gerard]:String
3:[Balance]:STRING=[90.00]:String
4:[CreditLimit]:STRING=[1000.00]:String
5:[AccountCreated]:STRING=[8/6/2003]:String
6:[Rating]:STRING=[B]:String
}
Record {
0:[Account]:STRING=[868]:String
1:[LastName]:STRING=[Hewitt]:String
2:[FirstName]:STRING=[Jennifer Love]:String
3:[Balance]:STRING=[0]:String
4:[CreditLimit]:STRING=[17000.00]:String
5:[AccountCreated]:STRING=[5/25/1985]:String
6:[Rating]:STRING=[B]:String
}
Record {
0:[Account]:STRING=[761]:String
1:[LastName]:STRING=[Pinkett-Smith]:String
2:[FirstName]:STRING=[Jada]:String
3:[Balance]:STRING=[49654.87]:String
4:[CreditLimit]:STRING=[100000.00]:String
5:[AccountCreated]:STRING=[12/5/2006]:String
6:[Rating]:STRING=[A]:String
}
Record {
0:[Account]:STRING=[317]:String
1:[LastName]:STRING=[Murray]:String
2:[FirstName]:STRING=[Bill]:String
3:[Balance]:STRING=[789.65]:String
4:[CreditLimit]:STRING=[5000.00]:String
5:[AccountCreated]:STRING=[2/5/2007]:String
6:[Rating]:STRING=[C]:String
}
16:17:27,917 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Pipeline as XML:
<pipeline multithreaded="true">
<pipeline-input __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" allowMultiLineText="false" allowQuoteInField="false" charset="UTF-8" endingQuote=""" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" saveLineage="false" skipEmptyRows="false" startingQuote=""" trimFields="true">
<file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/credit-balance-01.csv"/>
</pipeline-input>
<pipeline-output __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput"/>
</pipeline>
