Build DataMappingPipeline Declaratively from XML
Updated: Jun 26, 2023
This example shows how you can use Data Pipeline to process data through a pipeline, applying a source entity, mapping, and target entity from an input XML file to achieve the desired output. It provides a structured and customizable approach to data transformation, allowing users to define and execute complex data processing workflows.
By processing data through the pipeline, users can generate refined and structured datasets suitable for reporting and analytics purposes. The library enables the extraction, transformation, and loading of data into target entities that are optimized for reporting and analysis, supporting data-driven decision-making.
Input XML File
<data-mapping-pipeline multithreaded="true"> <pipeline-input allowMultiLineText="false" allowQuoteInField="false" __class__="com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput" endingQuote=""" fieldNamesInFirstRow="true" fieldSeparator="," lineSeparators="\n,\r\n,\r" skipEmptyRows="false" startingQuote=""" trimFields="true"> <file-source __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSource" path="example/data/input/call-center-inbound-call.csv"/> </pipeline-input> <source-entity addMissingOptionalFields="false" allowExtraFieldsInMapping="true" allowExtraFieldsInValidation="true" name="Raw"> <fields> <field allowBlank="false" maximumLength="25" name="event_type" required="true" type="STRING"/> <field allowBlank="false" name="id" required="true" type="STRING"/> <field allowBlank="false" name="agent_id" required="true" type="STRING"/> <field allowBlank="false" minimumLength="9" name="phone_number" required="true" type="STRING"/> <field allowBlank="false" name="start_time" required="true" type="STRING"/> <field allowBlank="false" name="end_time" required="false" type="STRING"/> <field allowBlank="false" name="disposition" required="false" type="STRING"/> </fields> </source-entity> <target-entity addMissingOptionalFields="false" allowExtraFieldsInMapping="true" allowExtraFieldsInValidation="true" name="Processed"> <fields> <field allowBlank="false" maximumLength="25" name="Event" required="true" type="STRING"/> <field name="Call ID" required="true" type="INT"/> <field name="Agent ID" required="true" type="INT"/> <field allowBlank="false" minimumLength="9" name="Caller Number" required="true" type="STRING"/> <field name="Call Start Time" required="true" type="DATETIME"/> <field name="Call End Time" required="true" type="DATETIME"/> <field allowBlank="false" defaultValueExpression="'UNKNOWN'" name="Disposition" required="true" type="STRING"/> </fields> </target-entity> <pipeline-output autoFilterColumns="false" autofitColumns="false" __class__="com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput" fieldNamesInFirstRow="true" firstColumnIndex="0" firstRowIndex="0" providerType="POI_XSSF" sheetIndex="-1"> <file-sink append="false" __class__="com.northconcepts.datapipeline.foundations.file.LocalFileSink" path="data/output/test.xlsx"/> </pipeline-output> <data-mapping> <field-mappings> <field-mapping fieldName="Event" sourceExpression="source.event_type"/> <field-mapping fieldName="Call ID" sourceExpression="source.id"/> <field-mapping fieldName="Agent ID" sourceExpression="toInt(source.agent_id)"/> <field-mapping fieldName="Caller Number" sourceExpression="source.phone_number"/> <field-mapping fieldName="Call Start Time" sourceExpression="parseDate(source.start_time, 'yyyy-MM-dd HH:mm')"/> <field-mapping defaultValueExpression="${target.Call Start Time}" fieldName="Call End Time" sourceExpression="parseDate(source.end_time, 'yyyy-MM-dd HH:mm')"/> <field-mapping fieldName="Disposition" sourceExpression="source.disposition"/> </field-mappings> </data-mapping> </data-mapping-pipeline>
Input CSV File
event_type,id,agent_id,phone_number,start_time,end_time,disposition
STARTED,1,7,(437) 689-5268,2016-03-04 22:39,,
ENDED,1,7,(437) 689-5268,2016-03-04 22:39,2016-03-04 22:39,PRODUCT_QUESTION
STARTED,2,19,(343) 8314-0603,2016-03-04 22:39,,
...
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.pipeline; import com.northconcepts.datapipeline.foundations.pipeline.DataMappingPipeline; import java.io.FileInputStream; public class BuildDataMappingPipelineDeclarativelyFromXml { public static void main(String[] args) throws Throwable { DataMappingPipeline pipeline = new DataMappingPipeline().fromXml(new FileInputStream("example/data/input/pipeline/datamappingpipeline.xml")); pipeline.run(); } }
Code Walkthrough
- DataMappingPipeline instance is created with rules imported from the input file
datamappingpipeline.xml
. - The pipeline is run.
Let's analyze some parts in the input file to better understand how the pipeline works:
- LocalFileSource class is used corresponding to the input file
call-center-inbound-call.csv
infile-source
tag. - The "Raw" entity is set as a source entity, meaning that data is validated for this entity rules before it is mapped. All of its fields are String type.
- The "Processed" entity is set as a target entity. The mapped data is validated for this entity's rules.
- LocalFileSink class is called corresponding to the new output file
test.xlsx
withinfile-sink
tag. data-mapping
tag is created to enable the field mapping rules.
Output XLSX File
Event Call ID Agent ID Caller Number Call Start Time Call End Time Disposition STARTED 1 7 (437) 689-5268 3/4/2016 22:39 3/4/2016 22:39 UNKNOWN ENDED 1 7 (437) 689-5268 3/4/2016 22:39 3/4/2016 22:39 PRODUCT_QUESTION STARTED 2 19 (343) 8314-0603 3/4/2016 22:39 3/4/2016 22:39 UNKNOWN