Create Custom Pipeline Output

This example provides a flexible and customizable data pipeline with the ability to define a custom output class. By creating a custom output class, users can tailor the behavior of the pipeline's output, including how data is parsed as a record or XML, and how it is reparsed from the record or XML format. This approach proves advantageous when aiming to exert precise control over the serialization and deserialization of objects in diverse formats.

 

Input CSV

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

 

Java Code

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import org.w3c.dom.Document;
import org.w3c.dom.Element;

import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.PipelineOutput;
import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput;
import com.northconcepts.datapipeline.internal.lang.Util;

public class CreateCustomPipelineOutput {

    public static void main(String[] args) {
        CsvPipelineInput pipelineInput = new CsvPipelineInput()
                .setFileSource(new LocalFileSource().setPath("example/data/input/credit-balance-01.csv"))
                .setFieldNamesInFirstRow(true);

        CustomPipelineOutput pipelineOutput = new CustomPipelineOutput();

        Pipeline pipeline = new Pipeline()
                .setInput(pipelineInput)
                .setOutput(pipelineOutput);

        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        Record record = pipeline.toRecord();
        System.out.println(record);

        System.out.println("---------------------------------------------------------------------------------------------------------");

        pipeline = new Pipeline().fromRecord(record);
        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        System.out.println("Pipeline as JSON:");
        System.out.println(Util.formatJson(pipeline.toJson()));

        System.out.println("---------------------------------------------------------------------------------------------------------");

        pipeline = new Pipeline().fromXml(pipeline.toXml());
        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        System.out.println("Pipeline as XML:");
        System.out.println(pipeline.toXml());
    }

    public static class CustomPipelineOutput extends PipelineOutput {

        @Override
        public DataWriter createDataWriter() {
            return new ConsoleWriter();
        }

        @Override
        public String getName() {
            return "CustomPipelineOutput";
        }

        @Override
        public Record toRecord() {
            return super.toRecord();
        }

        @Override
        public PipelineOutput fromRecord(Record source) {
            super.fromRecord(source);
            return this;
        }

        @Override
        public Element toXmlElement(Document document) {
            return super.toXmlElement(document);
        }

        @Override
        public CustomPipelineOutput fromXmlElement(Element element) {
            return this;
        }

    }

    public static class ConsoleWriter extends DataWriter {

        @Override
        protected void writeImpl(Record record) throws Throwable {
            System.out.println(record);
        }

    }
}

 

Code Walkthrough

To illustrate the concept, a CustomPipelineOutput class is introduced, which extends the PipelineOutput class.

    1. Within this class, specific data transformation methods are overridden to customize the serialization and deserialization processes for specific formats.
    2. ConsoleWriter instance is used as a DataWriter for this PipelineOutput. According to its implementation, it prints the records on the console.
    3. "CustomPipelineOutput" text is returned as the name of this object.
    4. In the overridden toRecord, fromRecord and toXmlElement methods, the corresponding parent methods are called.  
  1. CsvPipelineInput object is instantiated to facilitate reading from an input file credit-balance-01.csv.
  2. An instance of the CustomPipelineOutput class is created.
  3. Two objects specified in the previous steps are respectively assigned as the input and output for a Pipeline.
  4. The pipeline is executed with run() method. As mentioned above, ConsoleWriter is used as DataWriter and hence, records are printed in the console.
  5. Next, toRecord() method of the pipeline is called. It returns the pipeline as a single Record instance. As can be seen in the console, the Record has input (CsvPipelineInput class) and output (CreateCustomPipelineOutput$CustomPipelineOutput class).
  6. A new Pipeline instance is created from the Record object in the previous step. As all the properties are saved in the pipeline, the same result as Step 4 is printed in the console.
  7. The pipeline instance is converted to JSON format and displayed in the console.
  8. In the next step, the Pipeline instance is parsed into XML and recovered from that XML format. The Records which are returned when the pipeline runs are printed in the console.
  9. The pipeline is printed in the XML-formatted version.

 

Console Output

16:17:27,127 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
16:17:27,235 DEBUG [main] datapipeline:615 - Job[1,job-1,Mon Jun 19 16:17:27 EEST 2023]::Start
Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:STRING=[9315.45]:String
    4:[CreditLimit]:STRING=[10000.00]:String
    5:[AccountCreated]:STRING=[1/17/1998]:String
    6:[Rating]:STRING=[A]:String
}

Record {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:STRING=[90.00]:String
    4:[CreditLimit]:STRING=[1000.00]:String
    5:[AccountCreated]:STRING=[8/6/2003]:String
    6:[Rating]:STRING=[B]:String
}

Record {
    0:[Account]:STRING=[868]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:STRING=[0]:String
    4:[CreditLimit]:STRING=[17000.00]:String
    5:[AccountCreated]:STRING=[5/25/1985]:String
    6:[Rating]:STRING=[B]:String
}

Record {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:STRING=[49654.87]:String
    4:[CreditLimit]:STRING=[100000.00]:String
    5:[AccountCreated]:STRING=[12/5/2006]:String
    6:[Rating]:STRING=[A]:String
}

Record {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:STRING=[789.65]:String
    4:[CreditLimit]:STRING=[5000.00]:String
    5:[AccountCreated]:STRING=[2/5/2007]:String
    6:[Rating]:STRING=[C]:String
}

16:17:27,438 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Record (MODIFIED) (has child records) {
    0:[name]:UNDEFINED=[null]
    1:[description]:UNDEFINED=[null]
    2:[input]:RECORD=[
        Record (MODIFIED) (is child record) (has child records) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput]:String
            1:[saveLineage]:BOOLEAN=[false]:Boolean
            2:[fieldSeparator]:STRING=[,]:String
            3:[startingQuote]:STRING=["]:String
            4:[endingQuote]:STRING=["]:String
            5:[lineSeparators]:STRING=[\n,\r\n,\r]:String
            6:[allowMultiLineText]:BOOLEAN=[false]:Boolean
            7:[allowQuoteInField]:BOOLEAN=[false]:Boolean
            8:[trimFields]:BOOLEAN=[true]:Boolean
            9:[skipEmptyRows]:BOOLEAN=[false]:Boolean
            10:[charset]:STRING=[UTF-8]:String
            11:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean
            12:[fileSource]:RECORD=[
                Record (MODIFIED) (is child record) {
                    0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSource]:String
                    1:[name]:UNDEFINED=[null]
                    2:[path]:STRING=[example/data/input/credit-balance-01.csv]:String
                }]:Record
        }]:Record
    3:[sourceEntity]:UNDEFINED=[null]
    4:[targetEntity]:UNDEFINED=[null]
    5:[output]:RECORD=[
        Record (MODIFIED) (is child record) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput]:String
        }]:Record
    6:[multithreaded]:BOOLEAN=[true]:Boolean
    7:[actions]:ARRAY of UNDEFINED=[[]]:ArrayValue
}

---------------------------------------------------------------------------------------------------------
16:17:27,471 DEBUG [main] datapipeline:615 - Job[2,job-2,Mon Jun 19 16:17:27 EEST 2023]::Start
Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:STRING=[9315.45]:String
    4:[CreditLimit]:STRING=[10000.00]:String
    5:[AccountCreated]:STRING=[1/17/1998]:String
    6:[Rating]:STRING=[A]:String
}

Record {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:STRING=[90.00]:String
    4:[CreditLimit]:STRING=[1000.00]:String
    5:[AccountCreated]:STRING=[8/6/2003]:String
    6:[Rating]:STRING=[B]:String
}

Record {
    0:[Account]:STRING=[868]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:STRING=[0]:String
    4:[CreditLimit]:STRING=[17000.00]:String
    5:[AccountCreated]:STRING=[5/25/1985]:String
    6:[Rating]:STRING=[B]:String
}

Record {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:STRING=[49654.87]:String
    4:[CreditLimit]:STRING=[100000.00]:String
    5:[AccountCreated]:STRING=[12/5/2006]:String
    6:[Rating]:STRING=[A]:String
}

Record {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:STRING=[789.65]:String
    4:[CreditLimit]:STRING=[5000.00]:String
    5:[AccountCreated]:STRING=[2/5/2007]:String
    6:[Rating]:STRING=[C]:String
}

16:17:27,474 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Pipeline as JSON:
{
  "name" : null,
  "description" : null,
  "input" : {
    "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput",
    "saveLineage" : false,
    "fieldSeparator" : ",",
    "startingQuote" : "\"",
    "endingQuote" : "\"",
    "lineSeparators" : "\\n,\\r\\n,\\r",
    "allowMultiLineText" : false,
    "allowQuoteInField" : false,
    "trimFields" : true,
    "skipEmptyRows" : false,
    "charset" : "UTF-8",
    "fieldNamesInFirstRow" : true,
    "fileSource" : {
      "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSource",
      "name" : null,
      "path" : "example/data/input/credit-balance-01.csv"
    }
  },
  "sourceEntity" : null,
  "targetEntity" : null,
  "output" : {
    "__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput"
  },
  "multithreaded" : true,
  "actions" : [ ]
}
---------------------------------------------------------------------------------------------------------
16:17:27,915 DEBUG [main] datapipeline:615 - Job[3,job-3,Mon Jun 19 16:17:27 EEST 2023]::Start
Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:STRING=[9315.45]:String
    4:[CreditLimit]:STRING=[10000.00]:String
    5:[AccountCreated]:STRING=[1/17/1998]:String
    6:[Rating]:STRING=[A]:String
}

Record {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:STRING=[90.00]:String
    4:[CreditLimit]:STRING=[1000.00]:String
    5:[AccountCreated]:STRING=[8/6/2003]:String
    6:[Rating]:STRING=[B]:String
}

Record {
    0:[Account]:STRING=[868]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:STRING=[0]:String
    4:[CreditLimit]:STRING=[17000.00]:String
    5:[AccountCreated]:STRING=[5/25/1985]:String
    6:[Rating]:STRING=[B]:String
}

Record {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:STRING=[49654.87]:String
    4:[CreditLimit]:STRING=[100000.00]:String
    5:[AccountCreated]:STRING=[12/5/2006]:String
    6:[Rating]:STRING=[A]:String
}

Record {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:STRING=[789.65]:String
    4:[CreditLimit]:STRING=[5000.00]:String
    5:[AccountCreated]:STRING=[2/5/2007]:String
    6:[Rating]:STRING=[C]:String
}

16:17:27,917 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Pipeline as XML:
<pipeline multithreaded="true">
  <pipeline-input __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" allowMultiLineText="false" allowQuoteInField="false" charset="UTF-8" endingQuote="&quot;" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" saveLineage="false" skipEmptyRows="false" startingQuote="&quot;" trimFields="true">
    <file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/credit-balance-01.csv"/>
  </pipeline-input>
  <pipeline-output __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineOutput$CustomPipelineOutput"/>
</pipeline>
Mobile Analytics