Declaratively Map Data Using Positions

This example shows how you can use Data Pipeline to process data by declaring a source entity from a schema file and mapping data from a CSV file based on predefined rules in an XML file. It enables users to define the structure and attributes of the source entity, and then perform data mapping operations using the specified rules to transform the CSV data into the desired format.

The example can assist in data validation and cleansing by enforcing data integrity and quality checks based on the declared source entity. Users can define rules in the XML file to validate and cleanse the input data, ensuring that it adheres to the expected structure, format, and data quality standards.

Input Files

account-schema-definition-position.xml

<?xml version="1.0"?>
<schema name="Account_Schema" description="This is test schema.">
  <entities>
    <entity name="SourceAccountEntity" allowExtraFieldsInValidation="true">
      <fields>
        <field name="Balance" type="DOUBLE" required="false" minimum="0" maximum="100000" position="3"/>
        <field name="Rating" type="STRING" required="true" allowBlank="false" position="6"/>
        <field name="LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false" position="1"/>
        <field name="FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false" position="2"/>
        <field name="CreditLimit" type="BIG_DECIMAL" required="false" minimum="0" maximum="1000000" position="4"/>
        <field name="AccountCreated" type="DATE" required="true" position="5"/>
        <field name="Account" type="LONG" required="true" position="0"/>
      </fields>
    </entity>
  </entities>
</schema>

credit-balance-mapping-2.xml

<?xml version="1.0"?>
<data-mapping>
  <field-mappings>
    <field-mapping fieldName="Account_ID" sourceExpression="'A' + source.Account"/>
    <field-mapping fieldName="Account_LastName" sourceExpression="source.LastName"/>
    <field-mapping fieldName="Account_FirstName" sourceExpression="source.FirstName"/>
    <field-mapping fieldName="Account_Balance" sourceExpression="coalesce(source.Balance, 10)"/>
    <field-mapping fieldName="Account_CreditLimit" sourceExpression="coalesce(source.CreditLimit * 1.10, 20)"/>
    <field-mapping fieldName="Account_CreatedDate" sourceExpression="source.AccountCreated"/>
    <field-mapping fieldName="Account_Rating" sourceExpression="source.Rating"/>
  </field-mappings>
</data-mapping>

credit-balance-02-100000-no-header.csv

101,Reeves,Keanu,,,1998-1-17,A
312,Butler,Gerard,90.00,,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
...

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.datamapping;

import com.northconcepts.datapipeline.core.AsyncReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.NullWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.foundations.datamapping.DataMapping;
import com.northconcepts.datapipeline.foundations.datamapping.DataMappingReader;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.transform.TransformingReader;

import java.io.File;
import java.io.FileInputStream;

public class DeclarativelyMapDataUsingPositions {

    public static void main(String... args) throws Throwable {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02-100000-no-header.csv"))
                .setFieldNamesInFirstRow(false);
        
        SchemaDef schema = new SchemaDef()
                .fromXml(new FileInputStream(new File("example/data/input/datamapping/account-schema-definition-position.xml")));
        EntityDef sourceAccountEntity = schema.getEntity("SourceAccountEntity");
        reader = new TransformingReader(reader)
                .add(new SchemaTransformer(sourceAccountEntity));
        
//        reader = new AsyncReader(reader);  // Threading

        DataMapping mapping = new DataMapping()
                .fromXml(new FileInputStream("example/data/input/datamapping/credit-balance-mapping-2.xml"));
        reader = new DataMappingReader(reader, mapping);

//        DataWriter writer = new NullWriter();
        DataWriter writer = StreamWriter.newSystemOutWriter();
        
        Job job = Job.run(reader, writer);
        System.out.println("Records Transferred: " + job.getRecordsTransferred());
        System.out.println("Running Time: " + job.getRunningTimeAsString());
    }
}

Code Walkthrough

  1. CSVReader is created corresponding to the input file credit-balance-02-100000-no-header.csv.
  2. Schema is imported from the input file account-schema-definition-position.xml. The source entity is declared from that schema.
  3. TransformingReader instance is created to add field names with validation rules from the source entity declared in the previous step.
  4. DataMapping is created to enable the field mapping rules from the input file credit-balance-mapping.xml
  5. DataMappingReader instance is created to apply mapping for the input.
  6. Data is transferred from the reader to StreamWriter.newSystemOutWriter() via Job.run().
  7. The number of transferred records and the running time are printed in the console.

Console output

 
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account_ID]:STRING=[A101]:String
    1:[Account_LastName]:STRING=[Reeves]:String
    2:[Account_FirstName]:STRING=[Keanu]:String
    3:[Account_Balance]:LONG=[10]:Long
    4:[Account_CreditLimit]:LONG=[20]:Long
    5:[Account_CreatedDate]:DATE=[1998-01-17]:Date
    6:[Account_Rating]:STRING=[A]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account_ID]:STRING=[A312]:String
    1:[Account_LastName]:STRING=[Butler]:String
    2:[Account_FirstName]:STRING=[Gerard]:String
    3:[Account_Balance]:DOUBLE=[90.0]:Double
    4:[Account_CreditLimit]:LONG=[20]:Long
    5:[Account_CreatedDate]:DATE=[2003-08-06]:Date
    6:[Account_Rating]:STRING=[B]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[Account_ID]:STRING=[A101]:String
    1:[Account_LastName]:STRING=[Hewitt]:String
    2:[Account_FirstName]:STRING=[Jennifer Love]:String
    3:[Account_Balance]:DOUBLE=[0.0]:Double
    4:[Account_CreditLimit]:BIG_DECIMAL=[18700]:BigDecimal
    5:[Account_CreatedDate]:DATE=[1985-05-25]:Date
    6:[Account_Rating]:STRING=[B]:String
}
...
Mobile Analytics