Build DataMappingPipeline Programmatically

This example shows how you can use Data Pipeline to process data through a pipeline, applying a source entity, mapping, and target entity to achieve the desired output. It provides a structured and customizable approach to data transformation, allowing users to define and execute complex data processing workflows.

By processing data through the pipeline, users can generate refined and structured datasets suitable for reporting and analytics purposes. The library enables the extraction, transformation, and loading of data into target entities that are optimized for reporting and analysis, supporting data-driven decision-making.

Input CSV File

event_type,id,agent_id,phone_number,start_time,end_time,disposition
STARTED,1,7,(437) 689-5268,2016-03-04 22:39,,
ENDED,1,7,(437) 689-5268,2016-03-04 22:39,2016-03-04 22:39,PRODUCT_QUESTION
STARTED,2,19,(343) 8314-0603,2016-03-04 22:39,,
...

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import com.northconcepts.datapipeline.core.FieldType;
import com.northconcepts.datapipeline.foundations.datamapping.DataMapping;
import com.northconcepts.datapipeline.foundations.datamapping.FieldMapping;
import com.northconcepts.datapipeline.foundations.file.LocalFileSink;
import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.DataMappingPipeline;
import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput;
import com.northconcepts.datapipeline.foundations.pipeline.output.ExcelPipelineOutput;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaDef;
import com.northconcepts.datapipeline.foundations.schema.TextFieldDef;

public class BuildDataMappingPipelineProgrammatically {

    public static void main(String[] args) {

        //Create validation entities for the mapping data
        SchemaDef schema = new SchemaDef()
            .addEntity(new EntityDef().setName("Raw")
                .addField(new TextFieldDef("event_type", FieldType.STRING).setRequired(true).setMaximumLength(25))
                .addField(new TextFieldDef("id", FieldType.STRING).setRequired(true)))
            .addEntity(new EntityDef().setName("Processed")
                .addField(new TextFieldDef("Event", FieldType.STRING).setRequired(true).setMaximumLength(25))
                .addField(new NumericFieldDef("Call ID", FieldType.INT).setRequired(true)));

        //Map data
        DataMapping mapping = new DataMapping()
                .addFieldMapping(new FieldMapping("Event", "source.event_type"))
                .addFieldMapping(new FieldMapping("Call ID", "source.id"));

        LocalFileSource source = new LocalFileSource().setPath("example/data/input/call-center-inbound-call.csv");
        LocalFileSink sink = new LocalFileSink().setPath("data/output/test.xlsx");

        //Build DataMappingPipeline with source and target entities
        DataMappingPipeline pipeline = new DataMappingPipeline();
        pipeline.setInput(new CsvPipelineInput().setFileSource(source).setFieldNamesInFirstRow(true));
        pipeline.setSourceEntity(schema.getEntity("Raw"));
        pipeline.setDataMapping(mapping);
        pipeline.setTargetEntity(schema.getEntity("Processed"));
        pipeline.setOutput(new ExcelPipelineOutput().setFileSink(sink).setFieldNamesInFirstRow(true));

        //Run the pipeline
        pipeline.run();
    }

}

Code Walkthrough

  1. A new schema with two entities is created.
  2. The first entity is called "Raw" and has two fields: event_type and id.
  3. The second entity is named "Processed" and has the following fields: Event, Call ID.
  4. DataMapping is created to enable the field mapping rules.
  5. addFieldMapping() method accepts FieldMapping object that has two String arguments:
    1. targetFieldName - the name of the field in the target record.
    2. sourceExpression - a custom expression to hold the formatting logic for a target field.
  6. Fields in the source records can be accessed with the dot (.) operator: source.id.
  7. LocalFileSource instance is created corresponding to the input file call-center-inbound-call.csv.
  8. LocalFileSink instance is created corresponding to the new output file test.xlsx.
  9. DataMappingPipeline applies mapping to the CSV pipeline input created from the LocalFileSource instance.
  10. The "Raw" entity is set as a source entity, meaning that data is validated for this entity rules before it is mapped.
  11. Data mapping is applied in the pipeline.
  12. The "Processed" entity is set as a target entity. The mapped data is validated for this entity's rules.
  13. Data is transferred to ExcelPipelineOutput which is based on the LocalFileSink instance created in step 8.
  14. The pipeline is run.

Output XLSX File

Event	Call ID
STARTED	1
ENDED	1
STARTED	2
...
Mobile Analytics