Create Custom Pipeline Input

This example provides a flexible and customizable data pipeline with the ability to define a custom input class. By creating a custom input class, users can tailor the behavior of the pipeline's input, including how data should be parsed as a record or XML, and how it can be recovered from that record or XML format. 

Real-life use cases for this library can include data ingestion, data integration, and data processing scenarios. Users can define their own logic and rules in the custom input class to handle specific data formats, perform data validation, or apply data transformations before feeding the data into the pipeline. This flexibility allows users to seamlessly integrate diverse data sources and adapt the input processing to their specific needs, enabling efficient and reliable data processing workflows.

 

Java Code

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import static com.northconcepts.datapipeline.core.XmlSerializable.getAttribute;
import static com.northconcepts.datapipeline.core.XmlSerializable.setAttribute;

import org.w3c.dom.Document;
import org.w3c.dom.Element;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.foundations.file.LocalFileSink;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.PipelineInput;
import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput;
import com.northconcepts.datapipeline.internal.lang.Util;

public class CreateCustomPipelineInput {

    public static void main(String[] args) {
        CustomPipelineInput pipelineInput = new CustomPipelineInput();

        ExcelPipelineOutput pipelineOutput = new ExcelPipelineOutput()
                .setFileSink(new LocalFileSink().setPath("example/data/output/custom_package_pipeline_input.xlsx"))
                .setFieldNamesInFirstRow(true);

        Pipeline pipeline = new Pipeline()
                .setInput(pipelineInput)
                .setOutput(pipelineOutput);

        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        Record record = pipeline.toRecord();
        System.out.println(record);

        System.out.println("---------------------------------------------------------------------------------------------------------");

        pipeline = new Pipeline().fromRecord(record);
        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        System.out.println("Pipeline as JSON:");
        System.out.println(Util.formatJson(pipeline.toJson()));

        System.out.println("---------------------------------------------------------------------------------------------------------");

        pipeline = new Pipeline().fromXml(pipeline.toXml());
        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        System.out.println("Pipeline as XML:");
        System.out.println(pipeline.toXml());
    }

    public static class CustomPipelineInput extends PipelineInput {

        private int maxTrucks = 10;
        private int maxPackages = 20;
        private int recordDelayMS = 250;

        public int getMaxTrucks() {
            return maxTrucks;
        }

        public void setMaxTrucks(int maxTrucks) {
            this.maxTrucks = maxTrucks;
        }

        public int getMaxPackages() {
            return maxPackages;
        }

        public void setMaxPackages(int maxPackages) {
            this.maxPackages = maxPackages;
        }

        public int getRecordDelayMS() {
            return recordDelayMS;
        }

        public void setRecordDelayMS(int recordDelayMS) {
            this.recordDelayMS = recordDelayMS;
        }

        @Override
        public String getName() {
            return "CustomPipelineInput";
        }

        @Override
        public Record toRecord() {
            return super.toRecord()
                    .setField("maxTrucks", maxTrucks)
                    .setField("maxPackages", maxPackages)
                    .setField("recordDelayMS", recordDelayMS);
        }

        @Override
        public CustomPipelineInput fromRecord(Record source) {
            super.fromRecord(source);
            setMaxTrucks(source.getFieldValueAsInteger("maxTrucks", 10));
            setMaxPackages(source.getFieldValueAsInteger("maxPackages", 20));
            setRecordDelayMS(source.getFieldValueAsInteger("recordDelayMS", 250));

            return this;
        }

        @Override
        public Element toXmlElement(Document document) {
            Element element = super.toXmlElement(document);
            setAttribute(element, "maxTrucks", maxTrucks);
            setAttribute(element, "maxPackages", maxPackages);
            setAttribute(element, "recordDelayMS", recordDelayMS);
            return element;
        }

        @Override
        public CustomPipelineInput fromXmlElement(Element element) {
            setMaxTrucks(getAttribute(element, "maxTrucks", maxTrucks));
            setMaxPackages(getAttribute(element, "maxPackages", maxPackages));
            setRecordDelayMS(getAttribute(element, "recordDelayMS", recordDelayMS));
            return this;
        }

        @Override
        public DataReader createDataReader() {
            return new FakePackageReader(maxTrucks, maxPackages, recordDelayMS);
        }

    }

    public static class FakePackageReader extends DataReader {

        private final int maxTrucks;
        private final long maxPackages;
        private long nextPackageId;
        private final long recordDelay;

        public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) {
            this.maxTrucks = maxTrucks;
            this.maxPackages = maxPackages;
            this.recordDelay = recordDelay;
        }

        @Override
        protected Record readImpl() throws Throwable {
            if (nextPackageId >= maxPackages) {
                return null;
            }

            if (recordDelay > 0) {
                Thread.sleep(recordDelay);
            }

            Record record = new Record();
            record.setField("package_id", nextPackageId++);
            record.setField("truck_id", "truck" + nextPackageId % maxTrucks);
            record.setField("amount", nextPackageId + 0.01);
            return record;
        }
    }

}

 

Code Walkthrough

To illustrate the concept, a CustomPipelineInput which extends from the PipelineInput class is introduced.

    1. This class has three integer fields with default values: maxTrucks, maxPackages and recordDelayMS
    2. Within this class, specific data transformation methods are overridden to customize the serialization and deserialization processes for specific formats.
    3. "CustomPipelineInput" text is returned as the name of this object.
    4. In the overridden toRecordfromRecord and toXmlElement methods, the corresponding parent methods are calaed. In addition to parent method implementations, attribute fields in Step 1 are also assigned to the result instance.
    5. FakePackageReader instance is used as a DataReader for this PipelineInput.
    6. FakePageReader class extends from DataReader. It has maxTrucksmaxPackages and recordDelayMS as constant (final) variables and the nextPackageId attribute.
    7. According to its overridden method readImpl(), Record instances with "package_id", "truck_id" and "amount" fields are returned.     
  1. An instance of the CustomPipelineInput class is created.
  2. ExcelPipelineOutput object is instantiated to facilitate writing to an output file custom_package_pipeline_input.xlsx.
  3. Two objects specified in the previous steps are respectively assigned as the input and output for a Pipeline.
  4. The pipeline is executed with run() method. As mentioned above, FakePackageReader is used as DataReader, and hence, records with three fields (package_id, truck_id, amount) are written to the output XLSX file.
  5. Next, toRecord() method of the pipeline is called. It returns the pipeline as a single Record instance. As can be seen in the console, the Record has input (CreateCustomPipelineInput$CustomPipelineInput class) with attribute field members, and output (ExcelPipelineOutput class).
  6. A new Pipeline instance is created from the Record object in the previous step. As all the properties are saved in the pipeline, the same result as Step 4 is extracted to the output file.
  7. The pipeline instance is converted to JSON format and displayed in the console.
  8. In the next step, the Pipeline instance is parsed into XML and recovered from that XML format. The Records which are returned when the pipeline runs are exported to the file.
  9. The pipeline is printed in the XML-formatted version.

 

Output File

package_id  truck_id  amount
0	    truck1	1.01
1	    truck2	2.01
2	    truck3	3.01
3	    truck4	4.01
4	    truck5	5.01
5	    truck6	6.01
6	    truck7	7.01
7	    truck8	8.01
8	    truck9	9.01
9	    truck0	10.01
10	    truck1	11.01
11	    truck2	12.01
12	    truck3	13.01
13	    truck4	14.01
14	    truck5	15.01
15	    truck6	16.01
16	    truck7	17.01
17	    truck8	18.01
18	    truck9	19.01
19	    truck0	20.01

 

Console Output

14:48:58,504 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
14:48:59,875 DEBUG [main] datapipeline:615 - Job[1,job-1,Mon Jun 19 14:48:59 EEST 2023]::Start
14:49:05,972 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Record (MODIFIED) (has child records) {
    0:[name]:UNDEFINED=[null]
    1:[description]:UNDEFINED=[null]
    2:[input]:RECORD=[
        Record (MODIFIED) (is child record) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineInput$CustomPipelineInput]:String
            1:[saveLineage]:BOOLEAN=[false]:Boolean
            2:[maxTrucks]:INT=[10]:Integer
            3:[maxPackages]:INT=[20]:Integer
            4:[recordDelayMS]:INT=[250]:Integer
        }]:Record
    3:[sourceEntity]:UNDEFINED=[null]
    4:[targetEntity]:UNDEFINED=[null]
    5:[output]:RECORD=[
        Record (MODIFIED) (is child record) (has child records) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput]:String
            1:[providerType]:UNDEFINED=[POI_XSSF]:ExcelDocument$ProviderType$2
            2:[sheetName]:UNDEFINED=[null]
            3:[sheetIndex]:INT=[-1]:Integer
            4:[firstRowIndex]:INT=[0]:Integer
            5:[firstColumnIndex]:INT=[0]:Integer
            6:[autofitColumns]:BOOLEAN=[false]:Boolean
            7:[autoFilterColumns]:BOOLEAN=[false]:Boolean
            8:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean
            9:[fileSink]:RECORD=[
                Record (MODIFIED) (is child record) {
                    0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSink]:String
                    1:[name]:UNDEFINED=[null]
                    2:[path]:STRING=[example/data/output/custom_package_pipeline_input.xlsx]:String
                    3:[append]:BOOLEAN=[false]:Boolean
                }]:Record
        }]:Record
    6:[multithreaded]:BOOLEAN=[true]:Boolean
    7:[actions]:ARRAY of UNDEFINED=[[]]:ArrayValue
}

---------------------------------------------------------------------------------------------------------
14:49:06,041 DEBUG [main] datapipeline:615 - Job[2,job-2,Mon Jun 19 14:49:06 EEST 2023]::Start
14:49:11,159 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Pipeline as JSON:
{
  "name" : null,
  "description" : null,
  "input" : {
    "__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineInput$CustomPipelineInput",
    "saveLineage" : false,
    "maxTrucks" : 10,
    "maxPackages" : 20,
    "recordDelayMS" : 250
  },
  "sourceEntity" : null,
  "targetEntity" : null,
  "output" : {
    "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput",
    "providerType" : "POI_XSSF",
    "sheetName" : null,
    "sheetIndex" : -1,
    "firstRowIndex" : 0,
    "firstColumnIndex" : 0,
    "autofitColumns" : false,
    "autoFilterColumns" : false,
    "fieldNamesInFirstRow" : true,
    "fileSink" : {
      "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSink",
      "name" : null,
      "path" : "example/data/output/custom_package_pipeline_input.xlsx",
      "append" : false
    }
  },
  "multithreaded" : true,
  "actions" : [ ]
}
---------------------------------------------------------------------------------------------------------
14:49:11,319 DEBUG [main] datapipeline:615 - Job[3,job-3,Mon Jun 19 14:49:11 EEST 2023]::Start
14:49:16,392 DEBUG [main] datapipeline:661 - job::Success
---------------------------------------------------------------------------------------------------------
Pipeline as XML:
<pipeline multithreaded="true">
  <pipeline-input __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineInput$CustomPipelineInput" maxPackages="20" maxTrucks="10" recordDelayMS="250" saveLineage="false"/>
  <pipeline-output __class__="com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput" autoFilterColumns="false" autofitColumns="false" fieldNamesInFirstRow="true" firstColumnIndex="0" firstRowIndex="0" providerType="POI_XSSF" sheetIndex="-1">
    <file-sink __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSink" append="false" path="example/data/output/custom_package_pipeline_input.xlsx"/>
  </pipeline-output>
</pipeline>
Mobile Analytics