Build DataMappingPipeline Programmatically
Updated: Jun 17, 2023
This example shows how you can use Data Pipeline to process data through a pipeline, applying a source entity, mapping, and target entity to achieve the desired output. It provides a structured and customizable approach to data transformation, allowing users to define and execute complex data processing workflows.
By processing data through the pipeline, users can generate refined and structured datasets suitable for reporting and analytics purposes. The library enables the extraction, transformation, and loading of data into target entities that are optimized for reporting and analysis, supporting data-driven decision-making.
Input CSV File
event_type,id,agent_id,phone_number,start_time,end_time,disposition STARTED,1,7,(437) 689-5268,2016-03-04 22:39,, ENDED,1,7,(437) 689-5268,2016-03-04 22:39,2016-03-04 22:39,PRODUCT_QUESTION STARTED,2,19,(343) 8314-0603,2016-03-04 22:39,,
...
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.pipeline; import com.northconcepts.datapipeline.core.FieldType; import com.northconcepts.datapipeline.foundations.datamapping.DataMapping; import com.northconcepts.datapipeline.foundations.datamapping.FieldMapping; import com.northconcepts.datapipeline.foundations.file.LocalFileSink; import com.northconcepts.datapipeline.foundations.file.LocalFileSource; import com.northconcepts.datapipeline.foundations.pipeline.DataMappingPipeline; import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput; import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput; import com.northconcepts.datapipeline.foundations.schema.EntityDef; import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef; import com.northconcepts.datapipeline.foundations.schema.SchemaDef; import com.northconcepts.datapipeline.foundations.schema.TextFieldDef; public class BuildDataMappingPipelineProgrammatically { public static void main(String[] args) { //Create validation entities for the mapping data SchemaDef schema = new SchemaDef() .addEntity(new EntityDef().setName("Raw") .addField(new TextFieldDef("event_type", FieldType.STRING).setRequired(true).setMaximumLength(25)) .addField(new TextFieldDef("id", FieldType.STRING).setRequired(true))) .addEntity(new EntityDef().setName("Processed") .addField(new TextFieldDef("Event", FieldType.STRING).setRequired(true).setMaximumLength(25)) .addField(new NumericFieldDef("Call ID", FieldType.INT).setRequired(true))); //Map data DataMapping mapping = new DataMapping() .addFieldMapping(new FieldMapping("Event", "source.event_type")) .addFieldMapping(new FieldMapping("Call ID", "source.id")); LocalFileSource source = new LocalFileSource().setPath("example/data/input/call-center-inbound-call.csv"); LocalFileSink sink = new LocalFileSink().setPath("data/output/test.xlsx"); //Build DataMappingPipeline with source and target entities DataMappingPipeline pipeline = new DataMappingPipeline(); pipeline.setInput(new CsvPipelineInput().setFileSource(source).setFieldNamesInFirstRow(true)); pipeline.setSourceEntity(schema.getEntity("Raw")); pipeline.setDataMapping(mapping); pipeline.setTargetEntity(schema.getEntity("Processed")); pipeline.setOutput(new ExcelPipelineOutput().setFileSink(sink).setFieldNamesInFirstRow(true)); //Run the pipeline pipeline.run(); } }
Code Walkthrough
- A new schema with two entities is created.
- The first entity is called "Raw" and has two fields: event_type and id.
- The second entity is named "Processed" and has the following fields: Event, Call ID.
- DataMapping is created to enable the field mapping rules.
addFieldMapping()
method accepts FieldMapping object that has two String arguments:targetFieldName
- the name of the field in the target record.sourceExpression
- a custom expression to hold the formatting logic for a target field.
- Fields in the source records can be accessed with the dot (.) operator:
source.id
. - LocalFileSource instance is created corresponding to the input file
call-center-inbound-call.csv
. - LocalFileSink instance is created corresponding to the new output file
test.xlsx
. - DataMappingPipeline applies mapping to the CSV pipeline input created from the LocalFileSource instance.
- The "Raw" entity is set as a source entity, meaning that data is validated for this entity rules before it is mapped.
- Data mapping is applied in the pipeline.
- The "Processed" entity is set as a target entity. The mapped data is validated for this entity's rules.
- Data is transferred to ExcelPipelineOutput which is based on the LocalFileSink instance created in step 8.
- The pipeline is run.
Output XLSX File
Event Call ID STARTED 1 ENDED 1 STARTED 2
...