Create Custom Pipeline Action

This example shows you how to extend the functionality of your data processing pipelines by defining custom actions to be applied during data transformation. These custom actions can encompass various data manipulations, such as converting column values to uppercase, parsing data, enriching records, and more.

This can be used for data cleansing and standardization, data enrichment with additional information from external sources, and data augmentation for machine learning tasks.

 

Input CSV File

Country,Country code
Afghanistan,AF
Egypt,EG
Åland Islands,AX
Albania,AL
...

 

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import static com.northconcepts.datapipeline.core.XmlSerializable.getAttribute;
import static com.northconcepts.datapipeline.core.XmlSerializable.getChildElement;
import static com.northconcepts.datapipeline.core.XmlSerializable.getChildElements;
import static com.northconcepts.datapipeline.core.XmlSerializable.setAttribute;

import java.util.ArrayList;
import java.util.List;

import org.w3c.dom.Document;
import org.w3c.dom.Element;

import com.northconcepts.datapipeline.core.ArrayValue;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.foundations.file.LocalFileSink;
import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.action.PipelineAction;
import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput;
import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput;
import com.northconcepts.datapipeline.internal.lang.Util;
import com.northconcepts.datapipeline.transform.BasicFieldTransformer;
import com.northconcepts.datapipeline.transform.TransformingReader;

public class CreateCustomPipelineAction {

    public static void main(String[] args) {
        CsvPipelineInput pipelineInput = new CsvPipelineInput()
                .setFileSource(new LocalFileSource().setPath("example/data/input/countries_with_country-code.csv"))
                .setFieldNamesInFirstRow(true);

        ExcelPipelineOutput pipelineOutput = new ExcelPipelineOutput()
                .setFileSink(new LocalFileSink().setPath("example/data/output/countries_with_country-code.xlsx"))
                .setFieldNamesInFirstRow(true);

        Pipeline pipeline = new Pipeline()
                .setInput(pipelineInput)
                .setOutput(pipelineOutput)
                .addAction(new UpcaseFieldsAction().addField("Country"));

        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        Record record = pipeline.toRecord();
        System.out.println(record);

        System.out.println("---------------------------------------------------------------------------------------------------------");

        pipeline = new Pipeline().fromRecord(record);
        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        System.out.println("Pipeline as JSON:");
        System.out.println(Util.formatJson(pipeline.toJson()));

        System.out.println("---------------------------------------------------------------------------------------------------------");

        pipeline = new Pipeline().fromXml(pipeline.toXml());
        pipeline.run();

        System.out.println("---------------------------------------------------------------------------------------------------------");

        System.out.println("Pipeline as XML:");
        System.out.println(pipeline.toXml());

    }

    public static class UpcaseFieldsAction extends PipelineAction {

        private final List fieldNameList = new ArrayList<>();

        public UpcaseFieldsAction() {
            super("transform-upcase-fields", "UpperCase Fields");
        }

        @Override
        public DataReader apply(DataReader reader) throws Throwable {
            TransformingReader transformingReader = new TransformingReader(reader)
                    .add(new BasicFieldTransformer(fieldNameList).upperCase());
            return transformingReader;
        }

        public UpcaseFieldsAction addField(String fieldName) {
            fieldNameList.add(fieldName);
            return this;
        }

        @Override
        public Record toRecord() {
            Record record = super.toRecord();

            ArrayValue array = new ArrayValue();
            for (String fieldName : fieldNameList) {
                array.addValue(fieldName);
            }
            record.setField("fieldNameList", array);
            return record;
        }

        @Override
        public UpcaseFieldsAction fromRecord(Record source) {
            super.fromRecord(source);

            if (source.containsField("fieldNameList")) {
                ArrayValue array = source.getField("fieldNameList").getValueAsArray();
                for (int i = 0; i < array.size(); i++) {
                    addField(array.getValueAsString(i));
                }
            }
            return this;
        }

        @Override
        public Element toXmlElement(Document document) {
            Element element = super.toXmlElement(document);

            Element fieldsNameListElement = document.createElement("fieldNameList");
            for(String fieldName : fieldNameList) {
                Element fieldNameElement = document.createElement("field");
                setAttribute(fieldNameElement, "name", fieldName);
                fieldsNameListElement.appendChild(fieldNameElement);
            }
            element.appendChild(fieldsNameListElement);

            return element;
        }

        @Override
        public UpcaseFieldsAction fromXmlElement(Element element) {
            super.fromXmlElement(element);

            Element fieldsNameListElement = getChildElement(element, "fieldNameList");
            if(fieldsNameListElement != null) {
                List fieldsElementList = getChildElements(fieldsNameListElement, "field");
                for(Element field : fieldsElementList) {
                    fieldNameList.add(getAttribute(field, "name"));
                }
            }

            return this;
        }

    }

}

 

Code Walkthrough

  1. CsvPipelineInput instance is created to read data from the input file countries_with_country-code.csv.
  2. The setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  3. ExcelPipelineOutput instance is created to specify the output Excel file countries_with_country-code.xlsx and its path.
  4. A Pipeline instance is created and input and output are declared for that pipeline. A custom pipeline action UpcaseFieldsAction is assigned as an action of the pipeline.
  5. The pipeline is run, meaning that input data is processed based on the specified action and stored in the output file.
  6. The pipeline is converted to Record object, JSON, and XML formats and then reloaded from them. The results are then printed on the console.

 

UpcaseFieldsAction

This class extends from PipelineAction and is specified as a custom action for the pipeline. The action which is converting the specified field names into the upper case is written inside apply() method. In addition to that, toRecord(), fromRecord(), toXmlElement(), fromXMLElement() methods are overridden for that particular action class.

 

Output XLSX File

Country	        Country code
AFGHANISTAN	AF
EGYPT    	EG
ÅLAND ISLANDS	AX
ALBANIA 	AL
...

 

Console Output

---------------------------------------------------------------------------------------------------------
Record (MODIFIED) (has child records) {
    0:[name]:UNDEFINED=[null]
    1:[description]:UNDEFINED=[null]
    2:[input]:RECORD=[
        Record (MODIFIED) (is child record) (has child records) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput]:String
            1:[saveLineage]:BOOLEAN=[false]:Boolean
            2:[fieldSeparator]:STRING=[,]:String
            3:[startingQuote]:STRING=["]:String
            4:[endingQuote]:STRING=["]:String
            5:[lineSeparators]:STRING=[\n,\r\n,\r]:String
            6:[allowMultiLineText]:BOOLEAN=[false]:Boolean
            7:[allowQuoteInField]:BOOLEAN=[false]:Boolean
            8:[trimFields]:BOOLEAN=[true]:Boolean
            9:[skipEmptyRows]:BOOLEAN=[false]:Boolean
            10:[charset]:STRING=[UTF-8]:String
            11:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean
            12:[fileSource]:RECORD=[
                Record (MODIFIED) (is child record) {
                    0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSource]:String
                    1:[name]:UNDEFINED=[null]
                    2:[path]:STRING=[example/data/input/countries_with_country-code.csv]:String
                }]:Record
        }]:Record
    3:[sourceEntity]:UNDEFINED=[null]
    4:[targetEntity]:UNDEFINED=[null]
    5:[output]:RECORD=[
        Record (MODIFIED) (is child record) (has child records) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput]:String
            1:[providerType]:UNDEFINED=[POI_XSSF]:ExcelDocument$ProviderType$2
            2:[sheetName]:UNDEFINED=[null]
            3:[sheetIndex]:INT=[-1]:Integer
            4:[firstRowIndex]:INT=[0]:Integer
            5:[firstColumnIndex]:INT=[0]:Integer
            6:[autofitColumns]:BOOLEAN=[false]:Boolean
            7:[autoFilterColumns]:BOOLEAN=[false]:Boolean
            8:[fieldNamesInFirstRow]:BOOLEAN=[true]:Boolean
            9:[fileSink]:RECORD=[
                Record (MODIFIED) (is child record) {
                    0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.file.LocalFileSink]:String
                    1:[name]:UNDEFINED=[null]
                    2:[path]:STRING=[example/data/output/countries_with_country-code.xlsx]:String
                    3:[append]:BOOLEAN=[false]:Boolean
                }]:Record
        }]:Record
    6:[multithreaded]:BOOLEAN=[true]:Boolean
    7:[actions]:ARRAY of RECORD=[[
        Record (MODIFIED) (is child record) {
            0:[__class__]:STRING=[com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineAction$UpcaseFieldsAction]:String
            1:[description]:UNDEFINED=[null]
            2:[fieldNameList]:ARRAY of STRING=[[Country]]:ArrayValue
        }]]:ArrayValue
}

---------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------
Pipeline as JSON:
{
  "name" : null,
  "description" : null,
  "input" : {
    "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput",
    "saveLineage" : false,
    "fieldSeparator" : ",",
    "startingQuote" : "\"",
    "endingQuote" : "\"",
    "lineSeparators" : "\\n,\\r\\n,\\r",
    "allowMultiLineText" : false,
    "allowQuoteInField" : false,
    "trimFields" : true,
    "skipEmptyRows" : false,
    "charset" : "UTF-8",
    "fieldNamesInFirstRow" : true,
    "fileSource" : {
      "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSource",
      "name" : null,
      "path" : "example/data/input/countries_with_country-code.csv"
    }
  },
  "sourceEntity" : null,
  "targetEntity" : null,
  "output" : {
    "__class__" : "com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput",
    "providerType" : "POI_XSSF",
    "sheetName" : null,
    "sheetIndex" : -1,
    "firstRowIndex" : 0,
    "firstColumnIndex" : 0,
    "autofitColumns" : false,
    "autoFilterColumns" : false,
    "fieldNamesInFirstRow" : true,
    "fileSink" : {
      "__class__" : "com.northconcepts.datapipeline.foundations.file.LocalFileSink",
      "name" : null,
      "path" : "example/data/output/countries_with_country-code.xlsx",
      "append" : false
    }
  },
  "multithreaded" : true,
  "actions" : [ {
    "__class__" : "com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineAction$UpcaseFieldsAction",
    "description" : null,
    "fieldNameList" : [ "Country" ]
  } ]
}
---------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------
Pipeline as XML:
<pipeline multithreaded="true">
  <pipeline-input __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" allowMultiLineText="false" allowQuoteInField="false" charset="UTF-8" endingQuote="&quot;" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" saveLineage="false" skipEmptyRows="false" startingQuote="&quot;" trimFields="true">
    <file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/countries_with_country-code.csv"/>
  </pipeline-input>
  <pipeline-output __class__="com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput" autoFilterColumns="false" autofitColumns="false" fieldNamesInFirstRow="true" firstColumnIndex="0" firstRowIndex="0" providerType="POI_XSSF" sheetIndex="-1">
    <file-sink __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSink" append="false" path="example/data/output/countries_with_country-code.xlsx"/>
  </pipeline-output>
  <actions>
    <action __class__="com.northconcepts.datapipeline.foundations.examples.pipeline.CreateCustomPipelineAction$UpcaseFieldsAction">
      <fieldNameList>
        <field name="Country"/>
      </fieldNameList>
    </action>
  </actions>
</pipeline>

Mobile Analytics