Create Custom Pipeline Action
This example shows you how to extend the functionality of your data processing pipelines by defining custom actions to be applied during data transformation. These custom actions can encompass various data manipulations, such as converting column values to uppercase, parsing data, enriching records, and more.
This can be used for data cleansing and standardization, data enrichment with additional information from external sources, and data augmentation for machine learning tasks.
Input CSV File
Country,Country code Afghanistan,AF Egypt,EG Åland Islands,AX Albania,AL
...
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.pipeline; import static com.northconcepts.datapipeline.core.XmlSerializable.getAttribute; import static com.northconcepts.datapipeline.core.XmlSerializable.getChildElement; import static com.northconcepts.datapipeline.core.XmlSerializable.getChildElements; import static com.northconcepts.datapipeline.core.XmlSerializable.setAttribute; import java.util.ArrayList; import java.util.List; import org.w3c.dom.Document; import org.w3c.dom.Element; import com.northconcepts.datapipeline.core.ArrayValue; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.foundations.file.LocalFileSink; import com.northconcepts.datapipeline.foundations.file.LocalFileSource; import com.northconcepts.datapipeline.foundations.pipeline.Pipeline; import com.northconcepts.datapipeline.foundations.pipeline.action.PipelineAction; import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput; import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput; import com.northconcepts.datapipeline.internal.lang.Util; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.TransformingReader; public class CreateCustomPipelineAction { public static void main(String[] args) { CsvPipelineInput pipelineInput = new CsvPipelineInput() .setFileSource(new LocalFileSource().setPath("example/data/input/countries_with_country-code.csv")) .setFieldNamesInFirstRow(true); ExcelPipelineOutput pipelineOutput = new ExcelPipelineOutput() .setFileSink(new LocalFileSink().setPath("example/data/output/countries_with_country-code.xlsx")) .setFieldNamesInFirstRow(true); Pipeline pipeline = new Pipeline() .setInput(pipelineInput) .setOutput(pipelineOutput) .addAction(new UpcaseFieldsAction().addField("Country")); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); Record record = pipeline.toRecord(); System.out.println(record); System.out.println("---------------------------------------------------------------------------------------------------------"); pipeline = new Pipeline().fromRecord(record); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); System.out.println("Pipeline as JSON:"); System.out.println(Util.formatJson(pipeline.toJson())); System.out.println("---------------------------------------------------------------------------------------------------------"); pipeline = new Pipeline().fromXml(pipeline.toXml()); pipeline.run(); System.out.println("---------------------------------------------------------------------------------------------------------"); System.out.println("Pipeline as XML:"); System.out.println(pipeline.toXml()); } public static class UpcaseFieldsAction extends PipelineAction { private final ListfieldNameList = new ArrayList<>(); public UpcaseFieldsAction() { super("transform-upcase-fields", "UpperCase Fields"); } @Override public DataReader apply(DataReader reader) throws Throwable { TransformingReader transformingReader = new TransformingReader(reader) .add(new BasicFieldTransformer(fieldNameList).upperCase()); return transformingReader; } public UpcaseFieldsAction addField(String fieldName) { fieldNameList.add(fieldName); return this; } @Override public Record toRecord() { Record record = super.toRecord(); ArrayValue array = new ArrayValue(); for (String fieldName : fieldNameList) { array.addValue(fieldName); } record.setField("fieldNameList", array); return record; } @Override public UpcaseFieldsAction fromRecord(Record source) { super.fromRecord(source); if (source.containsField("fieldNameList")) { ArrayValue array = source.getField("fieldNameList").getValueAsArray(); for (int i = 0; i < array.size(); i++) { addField(array.getValueAsString(i)); } } return this; } @Override public Element toXmlElement(Document document) { Element element = super.toXmlElement(document); Element fieldsNameListElement = document.createElement("fieldNameList"); for(String fieldName : fieldNameList) { Element fieldNameElement = document.createElement("field"); setAttribute(fieldNameElement, "name", fieldName); fieldsNameListElement.appendChild(fieldNameElement); } element.appendChild(fieldsNameListElement); return element; } @Override public UpcaseFieldsAction fromXmlElement(Element element) { super.fromXmlElement(element); Element fieldsNameListElement = getChildElement(element, "fieldNameList"); if(fieldsNameListElement != null) { List fieldsElementList = getChildElements(fieldsNameListElement, "field"); for(Element field : fieldsElementList) { fieldNameList.add(getAttribute(field, "name")); } } return this; } } }
Code Walkthrough
- CsvPipelineInput instance is created to read data from the input file
countries_with_country-code.csv
. - The
setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. - ExcelPipelineOutput instance is created to specify the output Excel file
countries_with_country-code.xlsx
and its path. - A Pipeline instance is created and input and output are declared for that pipeline. A custom pipeline action
UpcaseFieldsAction
is assigned as an action of the pipeline. - The pipeline is run, meaning that input data is processed based on the specified action and stored in the output file.
- The pipeline is converted to Record object, JSON, and XML formats and then reloaded from them. The results are then printed on the console.
UpcaseFieldsAction
This class extends from PipelineAction and is specified as a custom action for the pipeline. The action which is converting the specified field names into the upper case is written inside apply()
method. In addition to that, toRecord()
, fromRecord()
, toXmlElement()
, fromXMLElement()
methods are overridden for that particular action class.
Output XLSX File
Country Country code AFGHANISTAN AF EGYPT EG ÅLAND ISLANDS AX ALBANIA AL
...
Console Output
--------------------------------------------------------------------------------------------------------- Record (MODIFIED) (has child records) { 0:[name]:UNDEFINED=[null] 1:[description]:UNDEFINED=[null] 2:[input]:RECORD=[ Record (MODIFIED) (is child record) (has child records) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput]:String 1:[saveLineage]:BOOLEAN=[false]:Boolean 2:[fieldSeparator]:STRING=[,]:String 3:[startingQuote]:STRING=["]:String 4:[endingQuote]:STRING=["]:String 5:[lineSeparators]:STRING=[\n,\r\n,\r]:String 6:[allowMultiLineText]:BOOLEAN=[false]:Boolean 7:[allowQuoteInField]:BOOLEAN=[false]:Boolean 8:[trimFields]:BOOLEAN=[true]:Boolean 9:[skipEmptyRows]:BOOLEAN=[false]:Boolean 10:[charset]:STRING=[UTF-8]:String 11:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean 12:[fileSource]:RECORD=[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSource]:String 1:[name]:UNDEFINED=[null] 2:[path]:STRING=[example/data/input/countries_with_country-code.csv]:String }]:Record }]:Record 3:[sourceEntity]:UNDEFINED=[null] 4:[targetEntity]:UNDEFINED=[null] 5:[output]:RECORD=[ Record (MODIFIED) (is child record) (has child records) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput]:String 1:[providerType]:UNDEFINED=[POI_XSSF]:ExcelDocument$ProviderType$2 2:[sheetName]:UNDEFINED=[null] 3:[sheetIndex]:INT=[-1]:Integer 4:[firstRowIndex]:INT=[0]:Integer 5:[firstColumnIndex]:INT=[0]:Integer 6:[autofitColumns]:BOOLEAN=[false]:Boolean 7:[autoFilterColumns]:BOOLEAN=[false]:Boolean 8:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean 9:[fileSink]:RECORD=[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSink]:String 1:[name]:UNDEFINED=[null] 2:[path]:STRING=[example/data/output/countries_with_country-code.xlsx]:String 3:[append]:BOOLEAN=[false]:Boolean }]:Record }]:Record 6:[multithreaded]:BOOLEAN=[true]:Boolean 7:[actions]:ARRAY of RECORD=[[ Record (MODIFIED) (is child record) { 0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineAction$UpcaseFieldsAction]:String 1:[description]:UNDEFINED=[null] 2:[fieldNameList]:ARRAY of STRING=[[Country]]:ArrayValue }]]:ArrayValue } --------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------- Pipeline as JSON: { "name" : null, "description" : null, "input" : { "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput", "saveLineage" : false, "fieldSeparator" : ",", "startingQuote" : "\"", "endingQuote" : "\"", "lineSeparators" : "\\n,\\r\\n,\\r", "allowMultiLineText" : false, "allowQuoteInField" : false, "trimFields" : true, "skipEmptyRows" : false, "charset" : "UTF-8", "fieldNamesInFirstRow" : true, "fileSource" : { "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSource", "name" : null, "path" : "example/data/input/countries_with_country-code.csv" } }, "sourceEntity" : null, "targetEntity" : null, "output" : { "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput", "providerType" : "POI_XSSF", "sheetName" : null, "sheetIndex" : -1, "firstRowIndex" : 0, "firstColumnIndex" : 0, "autofitColumns" : false, "autoFilterColumns" : false, "fieldNamesInFirstRow" : true, "fileSink" : { "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSink", "name" : null, "path" : "example/data/output/countries_with_country-code.xlsx", "append" : false } }, "multithreaded" : true, "actions" : [ { "__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineAction$UpcaseFieldsAction", "description" : null, "fieldNameList" : [ "Country" ] } ] } --------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------- Pipeline as XML: <pipeline multithreaded="true"> <pipeline-input __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" allowMultiLineText="false" allowQuoteInField="false" charset="UTF-8" endingQuote=""" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" saveLineage="false" skipEmptyRows="false" startingQuote=""" trimFields="true"> <file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/countries_with_country-code.csv"/> </pipeline-input> <pipeline-output __class__="com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput" autoFilterColumns="false" autofitColumns="false" fieldNamesInFirstRow="true" firstColumnIndex="0" firstRowIndex="0" providerType="POI_XSSF" sheetIndex="-1"> <file-sink __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSink" append="false" path="example/data/output/countries_with_country-code.xlsx"/> </pipeline-output> <actions> <action __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineAction$UpcaseFieldsAction"> <fieldNameList> <field name="Country"/> </fieldNameList> </action> </actions> </pipeline>