Build DataMappingPipeline Declaratively from XML

This example shows how you can use Data Pipeline to process data through a pipeline, applying a source entity, mapping, and target entity from an input XML file to achieve the desired output. It provides a structured and customizable approach to data transformation, allowing users to define and execute complex data processing workflows.

By processing data through the pipeline, users can generate refined and structured datasets suitable for reporting and analytics purposes. The library enables the extraction, transformation, and loading of data into target entities that are optimized for reporting and analysis, supporting data-driven decision-making.

 

Input XML File

<data-mapping-pipeline multithreaded="true">
    <pipeline-input allowMultiLineText="false" allowQuoteInField="false" __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" endingQuote="&quot;" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" skipEmptyRows="false" startingQuote="&quot;" trimFields="true">
        <file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/call-center-inbound-call.csv"/>
    </pipeline-input>
    <source-entity addMissingOptionalFields="false" allowExtraFieldsInMapping="true" allowExtraFieldsInValidation="true" name="Raw">
        <fields>
            <field allowBlank="false" maximumLength="25" name="event_type" required="true" type="STRING"/>
            <field allowBlank="false" name="id" required="true" type="STRING"/>
            <field allowBlank="false" name="agent_id" required="true" type="STRING"/>
            <field allowBlank="false" minimumLength="9" name="phone_number" required="true" type="STRING"/>
            <field allowBlank="false" name="start_time" required="true" type="STRING"/>
            <field allowBlank="false" name="end_time" required="false" type="STRING"/>
            <field allowBlank="false" name="disposition" required="false" type="STRING"/>
        </fields>
    </source-entity>
    <target-entity addMissingOptionalFields="false" allowExtraFieldsInMapping="true" allowExtraFieldsInValidation="true" name="Processed">
        <fields>
            <field allowBlank="false" maximumLength="25" name="Event" required="true" type="STRING"/>
            <field name="Call ID" required="true" type="INT"/>
            <field name="Agent ID" required="true" type="INT"/>
            <field allowBlank="false" minimumLength="9" name="Caller Number" required="true" type="STRING"/>
            <field name="Call Start Time" required="true" type="DATETIME"/>
            <field name="Call End Time" required="true" type="DATETIME"/>
            <field allowBlank="false" defaultValueExpression="'UNKNOWN'" name="Disposition" required="true" type="STRING"/>
        </fields>
    </target-entity>
    <pipeline-output autoFilterColumns="false" autofitColumns="false" __class__="com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput" fieldNamesInFirstRow="true" firstColumnIndex="0" firstRowIndex="0" providerType="POI_XSSF" sheetIndex="-1">
        <file-sink append="false" __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSink" path="data/output/test.xlsx"/>
    </pipeline-output>
    <data-mapping>
        <field-mappings>
            <field-mapping fieldName="Event" sourceExpression="source.event_type"/>
            <field-mapping fieldName="Call ID" sourceExpression="source.id"/>
            <field-mapping fieldName="Agent ID" sourceExpression="toInt(source.agent_id)"/>
            <field-mapping fieldName="Caller Number" sourceExpression="source.phone_number"/>
            <field-mapping fieldName="Call Start Time" sourceExpression="parseDate(source.start_time, 'yyyy-MM-dd HH:mm')"/>
            <field-mapping defaultValueExpression="${target.Call Start Time}" fieldName="Call End Time" sourceExpression="parseDate(source.end_time, 'yyyy-MM-dd HH:mm')"/>
            <field-mapping fieldName="Disposition" sourceExpression="source.disposition"/>
        </field-mappings>
    </data-mapping>
</data-mapping-pipeline>

 

Input CSV File

event_type,id,agent_id,phone_number,start_time,end_time,disposition
STARTED,1,7,(437) 689-5268,2016-03-04 22:39,,
ENDED,1,7,(437) 689-5268,2016-03-04 22:39,2016-03-04 22:39,PRODUCT_QUESTION
STARTED,2,19,(343) 8314-0603,2016-03-04 22:39,,
...

 

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.pipeline;


import com.northconcepts.datapipeline.foundations.pipeline.DataMappingPipeline;

import java.io.FileInputStream;

public class BuildDataMappingPipelineDeclarativelyFromXml {

    public static void main(String[] args) throws Throwable {
        DataMappingPipeline pipeline = new DataMappingPipeline().fromXml(new FileInputStream("example/data/input/pipeline/datamappingpipeline.xml"));
        pipeline.run();
    }

}

 

Code Walkthrough

  1. DataMappingPipeline instance is created with rules imported from the input file datamappingpipeline.xml.
  2. The pipeline is run.

Let's analyze some parts in the input file to better understand how the pipeline works:

  1. LocalFileSource class is used corresponding to the input file call-center-inbound-call.csv in file-source tag.
  2. The "Raw" entity is set as a source entity, meaning that data is validated for this entity rules before it is mapped. All of its fields are String type.
  3. The "Processed" entity is set as a target entity. The mapped data is validated for this entity's rules.
  4. LocalFileSink class is called corresponding to the new output file test.xlsx within file-sink tag.
  5. data-mapping tag is created to enable the field mapping rules.

 

Output XLSX File

Event	Call ID	Agent ID	Caller Number	Call Start Time	Call End Time	Disposition
STARTED	1	7	(437) 689-5268	3/4/2016 22:39	3/4/2016 22:39	UNKNOWN
ENDED	1	7	(437) 689-5268	3/4/2016 22:39	3/4/2016 22:39	PRODUCT_QUESTION
STARTED	2	19	(343) 8314-0603	3/4/2016 22:39	3/4/2016 22:39	UNKNOWN

 

Mobile Analytics