Create Custom Pipeline Input
This example provides a flexible and customizable data pipeline with the ability to define a custom input class. By creating a custom input class, users can tailor the behavior of the pipeline's input, including how data should be parsed as a record or XML, and how it can be recovered from that record or XML format.
Real-life use cases for this library can include data ingestion, data integration, and data processing scenarios. Users can define their own logic and rules in the custom input class to handle specific data formats, perform data validation, or apply data transformations before feeding the data into the pipeline. This flexibility allows users to seamlessly integrate diverse data sources and adapt the input processing to their specific needs, enabling efficient and reliable data processing workflows.
Java Code
package com.northconcepts.datapipeline.foundations.examples.pipeline; import static com.northconcepts.datapipeline.core.XmlSerializable.getAttribute; import static com.northconcepts.datapipeline.core.XmlSerializable.setAttribute; import org.w3c.dom.Document; import org.w3c.dom.Element; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.foundations.file.LocalFileSink; import com.northconcepts.datapipeline.foundations.pipeline.Pipeline; import com.northconcepts.datapipeline.foundations.pipeline.PipelineInput; import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput; import com.northconcepts.datapipeline.internal.lang.Util; public class CreateCustomPipelineInput { public static void main(String[] args) { CustomPipelineInput pipelineInput = new CustomPipelineInput(); ExcelPipelineOutput pipelineOutput = new ExcelPipelineOutput() .setFileSink(new LocalFileSink().setPath("example/data/output/custom_package_pipeline_input.xlsx")) .setFieldNamesInFirstRow(true); Pipeline pipeline = new Pipeline() .setInput(pipelineInput) .setOutput(pipelineOutput); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); Record record = pipeline.toRecord(); System.out.println(record); System.out.println("---------------------------------------------------------------------------------------------------------"); pipeline = new Pipeline().fromRecord(record); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); System.out.println("Pipeline as JSON:"); System.out.println(Util.formatJson(pipeline.toJson())); System.out.println("---------------------------------------------------------------------------------------------------------"); pipeline = new Pipeline().fromXml(pipeline.toXml()); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); System.out.println("Pipeline as XML:"); System.out.println(pipeline.toXml()); } public static class CustomPipelineInput extends PipelineInput { private int maxTrucks = 10; private int maxPackages = 20; private int recordDelayMS = 250; public int getMaxTrucks() { return maxTrucks; } public void setMaxTrucks(int maxTrucks) { this.maxTrucks = maxTrucks; } public int getMaxPackages() { return maxPackages; } public void setMaxPackages(int maxPackages) { this.maxPackages = maxPackages; } public int getRecordDelayMS() { return recordDelayMS; } public void setRecordDelayMS(int recordDelayMS) { this.recordDelayMS = recordDelayMS; } @Override public String getName() { return "CustomPipelineInput"; } @Override public Record toRecord() { return super.toRecord() .setField("maxTrucks", maxTrucks) .setField("maxPackages", maxPackages) .setField("recordDelayMS", recordDelayMS); } @Override public CustomPipelineInput fromRecord(Record source) { super.fromRecord(source); setMaxTrucks(source.getFieldValueAsInteger("maxTrucks", 10)); setMaxPackages(source.getFieldValueAsInteger("maxPackages", 20)); setRecordDelayMS(source.getFieldValueAsInteger("recordDelayMS", 250)); return this; } @Override public Element toXmlElement(Document document) { Element element = super.toXmlElement(document); setAttribute(element, "maxTrucks", maxTrucks); setAttribute(element, "maxPackages", maxPackages); setAttribute(element, "recordDelayMS", recordDelayMS); return element; } @Override public CustomPipelineInput fromXmlElement(Element element) { setMaxTrucks(getAttribute(element, "maxTrucks", maxTrucks)); setMaxPackages(getAttribute(element, "maxPackages", maxPackages)); setRecordDelayMS(getAttribute(element, "recordDelayMS", recordDelayMS)); return this; } @Override public DataReader createDataReader() { return new FakePackageReader(maxTrucks, maxPackages, recordDelayMS); } } public static class FakePackageReader extends DataReader { private final int maxTrucks; private final long maxPackages; private long nextPackageId; private final long recordDelay; public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) { this.maxTrucks = maxTrucks; this.maxPackages = maxPackages; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextPackageId >= maxPackages) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("package_id", nextPackageId++); record.setField("truck_id", "truck" + nextPackageId % maxTrucks); record.setField("amount", nextPackageId + 0.01); return record; } } }
Code Walkthrough
To illustrate the concept, a CustomPipelineInput
which extends from the PipelineInput class is introduced.
-
- This class has three integer fields with default values:
maxTrucks
,maxPackages
andrecordDelayMS
. - Within this class, specific data transformation methods are overridden to customize the serialization and deserialization processes for specific formats.
- "CustomPipelineInput" text is returned as the name of this object.
- In the overridden
toRecord
,fromRecord
andtoXmlElement
methods, the corresponding parent methods are calaed. In addition to parent method implementations, attribute fields in Step 1 are also assigned to the result instance. - FakePackageReader instance is used as a DataReader for this PipelineInput.
- FakePageReader class extends from DataReader. It has
maxTrucks
,maxPackages
andrecordDelayMS
as constant (final) variables and the nextPackageId attribute. - According to its overridden method
readImpl()
, Record instances with "package_id", "truck_id" and "amount" fields are returned.
- This class has three integer fields with default values:
- An instance of the CustomPipelineInput class is created.
- ExcelPipelineOutput object is instantiated to facilitate writing to an output file
custom_package_pipeline_input.xlsx
. - Two objects specified in the previous steps are respectively assigned as the input and output for a Pipeline.
- The pipeline is executed with
run()
method. As mentioned above, FakePackageReader is used as DataReader, and hence, records with three fields (package_id
,truck_id
,amount
) are written to the output XLSX file. - Next,
toRecord()
method of the pipeline is called. It returns the pipeline as a single Record instance. As can be seen in the console, the Record has input (CreateCustomPipelineInput$CustomPipelineInput class) with attribute field members, and output (ExcelPipelineOutput class). - A new Pipeline instance is created from the Record object in the previous step. As all the properties are saved in the pipeline, the same result as Step 4 is extracted to the output file.
- The pipeline instance is converted to JSON format and displayed in the console.
- In the next step, the Pipeline instance is parsed into XML and recovered from that XML format. The Records which are returned when the pipeline runs are exported to the file.
- The pipeline is printed in the XML-formatted version.
Output File
package_id truck_id amount 0 truck1 1.01 1 truck2 2.01 2 truck3 3.01 3 truck4 4.01 4 truck5 5.01 5 truck6 6.01 6 truck7 7.01 7 truck8 8.01 8 truck9 9.01 9 truck0 10.01 10 truck1 11.01 11 truck2 12.01 12 truck3 13.01 13 truck4 14.01 14 truck5 15.01 15 truck6 16.01 16 truck7 17.01 17 truck8 18.01 18 truck9 19.01 19 truck0 20.01
Console Output
14:48:58,504 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. 14:48:59,875 DEBUG [main] datapipeline:615 - Job[1,job-1,Mon Jun 19 14:48:59 EEST 2023]::Start 14:49:05,972 DEBUG [main] datapipeline:661 - job::Success --------------------------------------------------------------------------------------------------------- Record (MODIFIED) (has child records) { 0:[name]:UNDEFINED=[null] 1:[description]:UNDEFINED=[null] 2:[input]:RECORD=[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineInput$CustomPipelineInput]:String 1:[saveLineage]:BOOLEAN=[false]:Boolean 2:[maxTrucks]:INT=[10]:Integer 3:[maxPackages]:INT=[20]:Integer 4:[recordDelayMS]:INT=[250]:Integer }]:Record 3:[sourceEntity]:UNDEFINED=[null] 4:[targetEntity]:UNDEFINED=[null] 5:[output]:RECORD=[ Record (MODIFIED) (is child record) (has child records) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput]:String 1:[providerType]:UNDEFINED=[POI_XSSF]:ExcelDocument$ProviderType$2 2:[sheetName]:UNDEFINED=[null] 3:[sheetIndex]:INT=[-1]:Integer 4:[firstRowIndex]:INT=[0]:Integer 5:[firstColumnIndex]:INT=[0]:Integer 6:[autofitColumns]:BOOLEAN=[false]:Boolean 7:[autoFilterColumns]:BOOLEAN=[false]:Boolean 8:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean 9:[fileSink]:RECORD=[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSink]:String 1:[name]:UNDEFINED=[null] 2:[path]:STRING=[example/data/output/custom_package_pipeline_input.xlsx]:String 3:[append]:BOOLEAN=[false]:Boolean }]:Record }]:Record 6:[multithreaded]:BOOLEAN=[true]:Boolean 7:[actions]:ARRAY of UNDEFINED=[[]]:ArrayValue } --------------------------------------------------------------------------------------------------------- 14:49:06,041 DEBUG [main] datapipeline:615 - Job[2,job-2,Mon Jun 19 14:49:06 EEST 2023]::Start 14:49:11,159 DEBUG [main] datapipeline:661 - job::Success --------------------------------------------------------------------------------------------------------- Pipeline as JSON: { "name" : null, "description" : null, "input" : { "__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineInput$CustomPipelineInput", "saveLineage" : false, "maxTrucks" : 10, "maxPackages" : 20, "recordDelayMS" : 250 }, "sourceEntity" : null, "targetEntity" : null, "output" : { "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput", "providerType" : "POI_XSSF", "sheetName" : null, "sheetIndex" : -1, "firstRowIndex" : 0, "firstColumnIndex" : 0, "autofitColumns" : false, "autoFilterColumns" : false, "fieldNamesInFirstRow" : true, "fileSink" : { "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSink", "name" : null, "path" : "example/data/output/custom_package_pipeline_input.xlsx", "append" : false } }, "multithreaded" : true, "actions" : [ ] } --------------------------------------------------------------------------------------------------------- 14:49:11,319 DEBUG [main] datapipeline:615 - Job[3,job-3,Mon Jun 19 14:49:11 EEST 2023]::Start 14:49:16,392 DEBUG [main] datapipeline:661 - job::Success --------------------------------------------------------------------------------------------------------- Pipeline as XML: <pipeline multithreaded="true"> <pipeline-input __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineInput$CustomPipelineInput" maxPackages="20" maxTrucks="10" recordDelayMS="250" saveLineage="false"/> <pipeline-output __class__="com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput" autoFilterColumns="false" autofitColumns="false" fieldNamesInFirstRow="true" firstColumnIndex="0" firstRowIndex="0" providerType="POI_XSSF" sheetIndex="-1"> <file-sink __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSink" append="false" path="example/data/output/custom_package_pipeline_input.xlsx"/> </pipeline-output> </pipeline>