Declaratively Map Data

This example shows how you can use Data Pipeline to convert and validate data based on source and target entity definitions, using mapping instructions. It enables the transformation of input data to a desired format based on specified rules.

The potential use cases of this example would include the mapping and conversion of financial transaction data from different banks or accounting systems, facilitating accurate financial reporting and analysis.

Input Files

account-schema-definition.xml

<?xml version="1.0"?>
<schema name="Account_Schema" description="This is test schema.">
  <entities>
    <entity name="SourceAccountEntity" allowExtraFieldsInValidation="true">
      <fields>
        <field name="Account" type="LONG" required="true"/>
        <field name="LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="Balance" type="DOUBLE" required="false" minimum="0" maximum="100000"/>
        <field name="CreditLimit" type="BIG_DECIMAL" required="false" minimum="0" maximum="1000000"/>
        <field name="AccountCreated" type="DATE" required="true"/>
        <field name="Rating" type="STRING" required="true" allowBlank="false"/>
      </fields>
    </entity>
    <entity name="TargetAccountEntity" allowExtraFieldsInValidation="true">
      <fields>
        <field name="Account_ID" type="STRING" required="true"/>
        <field name="Account_LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="Account_FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="Account_Balance" type="DOUBLE" required="true" minimum="0" maximum="100000"/>
        <field name="Account_CreditLimit" type="BIG_DECIMAL" required="true" minimum="0" maximum="1000000"/>
        <field name="Account_CreatedDate" type="DATE" required="true"/>
        <field name="Account_Rating" type="STRING" required="true" allowBlank="false"/>
        <field name="Account_Status" type="STRING" required="true" allowBlank="false"/>
      </fields>
    </entity>
  </entities>
</schema>

credit-balance-mapping.xml

<?xml version="1.0"?>
<data-mapping>
  <field-mappings>
    <field-mapping fieldName="Account_ID" sourceExpression="'A' + source.Account"/>
    <field-mapping fieldName="Account_LastName" sourceExpression="${source.LastName}"/>
    <field-mapping fieldName="Account_FirstName" sourceExpression="source.FirstName"/>
    <field-mapping fieldName="Account_Balance" sourceExpression="coalesce(source.Balance, 10)"/>
    <field-mapping fieldName="Account_CreditLimit" sourceExpression="coalesce(source.CreditLimit * 1.10, 20)"/>
    <field-mapping fieldName="Account_CreatedDate" sourceExpression="source.AccountCreated"/>
    <field-mapping fieldName="Account_Rating" sourceExpression="source.Rating"/>
    <field-mapping fieldName="Account_Status" sourceExpression="lookup(0, statusLookup, source.Rating)"/>
  </field-mappings>
</data-mapping>

credit-balance-02-100000.csv

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,,,1998-1-17,A
312,Butler,Gerard,90.00,,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
...

Java Code Listing

package com.northconcepts.datapipeline.foundations.examples.datamapping;

import java.io.File;
import java.io.FileInputStream;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.FieldList;
import com.northconcepts.datapipeline.core.NullWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.foundations.datamapping.DataMapping;
import com.northconcepts.datapipeline.foundations.datamapping.DataMappingReader;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.transform.lookup.BasicLookup;
import com.northconcepts.datapipeline.transform.lookup.Lookup;

public class DeclarativelyMapData {

    public static void main(String... args) throws Throwable {
        // Load source & target schema
        SchemaDef schema = new SchemaDef()
                .fromXml(new FileInputStream(new File("example/data/input/datamapping/account-schema-definition.xml")));
        EntityDef sourceAccountEntity = schema.getEntity("SourceAccountEntity");
        EntityDef targetAccountEntity = schema.getEntity("TargetAccountEntity");

        // Create hard-coded lookup
        Lookup statusLookup = new BasicLookup(new FieldList("status"))
            .add("A", "Updated")
            .add("B", "Late")
            .add("C", "Overdue")
            .add("D", "Default");

        // Define job
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02-100000.csv"))  // 1mm -> credit-balance-02-1000000.csv
                .setFieldNamesInFirstRow(true);
        
        reader = new TransformingReader(reader)
                .add(new SchemaTransformer(sourceAccountEntity));
        
//        reader = new AsyncReader(reader);  // Threading
        
        DataMapping mapping = new DataMapping()
                .fromXml(new FileInputStream("example/data/input/datamapping/credit-balance-mapping.xml"));
        mapping.setValue("statusLookup", statusLookup);
        reader = new DataMappingReader(reader, mapping);

//        reader = new AsyncReader(reader);  // Threading
        
        reader = new TransformingReader(reader)
                .add(new SchemaTransformer(targetAccountEntity));
        
//        reader = new AsyncReader(reader);  // Threading

        DataWriter writer = new NullWriter();
//        DataWriter writer = StreamWriter.newSystemOutWriter();
        
        Job job = Job.run(reader, writer);
        System.out.println("Records Transferred: " + job.getRecordsTransferred());
        System.out.println("Running Time: " + job.getRunningTimeAsString());
    }
}

Code Walkthrough

  1. Schema is imported from the input file account-schema-definition.xml. Source and target entities are declared from that schema.
  2. BasicLookup is created to specify additional information about Ratings. A new field status is added with four rows.
  3. CSVReader is created corresponding to the input file credit-balance-02-100000.csv.
  4. The CSVReader.setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  5. Data from the input file is converted and validated based on the source entity using TransformingReader and SchemaTransformer.
  6. DataMapping is created to enable the field mapping rules from the input file credit-balance-mapping.xml. The lookup declared in step 2 is applied in the mapping.
  7. DataMappingReader instance is created to apply mapping for the input.
  8. Mapped data is then validated and transformed based on the target entity using SchemaTransformer.
  9. Data is transferred from the reader to NullWriter which serves to discard records via Job.run().
  10. The number of transferred records and the running time are printed in the console.

Console output

Records Transferred: 100000
Running Time: 2 Seconds, 938 Milliseconds

Example Records

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account_ID]:STRING=[A101]:String
    1:[Account_LastName]:STRING=[Reeves]:String
    2:[Account_FirstName]:STRING=[Keanu]:String
    3:[Account_Balance]:DOUBLE=[10.0]:Double
    4:[Account_CreditLimit]:BIG_DECIMAL=[20]:BigDecimal
    5:[Account_CreatedDate]:DATE=[1998-01-17]:Date
    6:[Account_Rating]:STRING=[A]:String
    7:[Account_Status]:STRING=[Updated]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account_ID]:STRING=[A312]:String
    1:[Account_LastName]:STRING=[Butler]:String
    2:[Account_FirstName]:STRING=[Gerard]:String
    3:[Account_Balance]:DOUBLE=[90.0]:Double
    4:[Account_CreditLimit]:BIG_DECIMAL=[20]:BigDecimal
    5:[Account_CreatedDate]:DATE=[2003-08-06]:Date
    6:[Account_Rating]:STRING=[B]:String
    7:[Account_Status]:STRING=[Late]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[Account_ID]:STRING=[A101]:String
    1:[Account_LastName]:STRING=[Hewitt]:String
    2:[Account_FirstName]:STRING=[Jennifer Love]:String
    3:[Account_Balance]:DOUBLE=[0.0]:Double
    4:[Account_CreditLimit]:BIG_DECIMAL=[18700]:BigDecimal
    5:[Account_CreatedDate]:DATE=[1985-05-25]:Date
    6:[Account_Rating]:STRING=[B]:String
    7:[Account_Status]:STRING=[Late]:String
}
...

 

Mobile Analytics