Declaratively Map Data with Source and Target Schema

This example shows how you can use Data Pipeline to convert and validate data based on source and target entity definitions, using mapping instructions. It enables the transformation of input data to a desired format based on specified rules.

The potential use cases of this example would include the integration and transformation of diverse data sources from government agencies, facilitating centralized data analysis, policy-making, and reporting.

Input Files

account-schema-definition.xml

  1. <?xml version="1.0"?>
  2. <schema name="Account_Schema" description="This is test schema.">
  3. <entities>
  4. <entity name="SourceAccountEntity" allowExtraFieldsInValidation="true">
  5. <fields>
  6. <field name="Account" type="LONG" required="true"/>
  7. <field name="LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
  8. <field name="FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
  9. <field name="Balance" type="DOUBLE" required="false" minimum="0" maximum="100000"/>
  10. <field name="CreditLimit" type="BIG_DECIMAL" required="false" minimum="0" maximum="1000000"/>
  11. <field name="AccountCreated" type="DATE" required="true"/>
  12. <field name="Rating" type="STRING" required="true" allowBlank="false"/>
  13. </fields>
  14. </entity>
  15. <entity name="TargetAccountEntity" allowExtraFieldsInValidation="true">
  16. <fields>
  17. <field name="Account_ID" type="STRING" required="true"/>
  18. <field name="Account_LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
  19. <field name="Account_FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
  20. <field name="Account_Balance" type="DOUBLE" required="true" minimum="0" maximum="100000"/>
  21. <field name="Account_CreditLimit" type="BIG_DECIMAL" required="true" minimum="0" maximum="1000000"/>
  22. <field name="Account_CreatedDate" type="DATE" required="true"/>
  23. <field name="Account_Rating" type="STRING" required="true" allowBlank="false"/>
  24. <field name="Account_Status" type="STRING" required="true" allowBlank="false"/>
  25. </fields>
  26. </entity>
  27. </entities>
  28. </schema>

credit-balance-mapping.xml

  1. <?xml version="1.0"?>
  2. <data-mapping>
  3. <field-mappings>
  4. <field-mapping fieldName="Account_ID" sourceExpression="'A' + source.Account"/>
  5. <field-mapping fieldName="Account_LastName" sourceExpression="${source.LastName}"/>
  6. <field-mapping fieldName="Account_FirstName" sourceExpression="source.FirstName"/>
  7. <field-mapping fieldName="Account_Balance" sourceExpression="coalesce(source.Balance, 10)"/>
  8. <field-mapping fieldName="Account_CreditLimit" sourceExpression="coalesce(source.CreditLimit * 1.10, 20)"/>
  9. <field-mapping fieldName="Account_CreatedDate" sourceExpression="source.AccountCreated"/>
  10. <field-mapping fieldName="Account_Rating" sourceExpression="source.Rating"/>
  11. <field-mapping fieldName="Account_Status" sourceExpression="lookup(0, statusLookup, source.Rating)"/>
  12. </field-mappings>
  13. </data-mapping>

credit-balance-02-100000.csv

  1. Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
  2. 101,Reeves,Keanu,,,1998-1-17,A
  3. 312,Butler,Gerard,90.00,,2003-8-6,B
  4. 101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
  5. ...

Java Code Listing

  1. package com.northconcepts.datapipeline.foundations.examples.datamapping;
  2.  
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5.  
  6. import com.northconcepts.datapipeline.core.DataReader;
  7. import com.northconcepts.datapipeline.core.DataWriter;
  8. import com.northconcepts.datapipeline.core.FieldList;
  9. import com.northconcepts.datapipeline.core.NullWriter;
  10. import com.northconcepts.datapipeline.core.StreamWriter;
  11. import com.northconcepts.datapipeline.csv.CSVReader;
  12. import com.northconcepts.datapipeline.foundations.datamapping.DataMapping;
  13. import com.northconcepts.datapipeline.foundations.datamapping.DataMappingReader;
  14. import com.northconcepts.datapipeline.foundations.schema.EntityDef;
  15. import com.northconcepts.datapipeline.foundations.schema.SchemaDef;
  16. import com.northconcepts.datapipeline.job.Job;
  17. import com.northconcepts.datapipeline.transform.lookup.BasicLookup;
  18. import com.northconcepts.datapipeline.transform.lookup.Lookup;
  19.  
  20. public class DeclarativelyMapDataWithSourceAndTargetSchema {
  21.  
  22. public static void main(String... args) throws Throwable {
  23. // Load source & target schema
  24. SchemaDef schema = new SchemaDef()
  25. .fromXml(new FileInputStream(new File("example/data/input/datamapping/account-schema-definition.xml")));
  26. EntityDef sourceAccountEntity = schema.getEntity("SourceAccountEntity");
  27. EntityDef targetAccountEntity = schema.getEntity("TargetAccountEntity");
  28.  
  29. // Create hard-coded lookup
  30. Lookup statusLookup = new BasicLookup(new FieldList("status"))
  31. .add("A", "Updated")
  32. .add("B", "Late")
  33. .add("C", "Overdue")
  34. .add("D", "Default");
  35.  
  36. DataMapping mapping = new DataMapping()
  37. .fromXml(new FileInputStream("example/data/input/datamapping/credit-balance-mapping.xml"))
  38. .setSourceEntity(sourceAccountEntity)
  39. .setTargetEntity(targetAccountEntity)
  40. .setValue("statusLookup", statusLookup);
  41. DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02-100000.csv")) // 1mm -> credit-balance-02-1000000.csv
  42. .setFieldNamesInFirstRow(true);
  43. reader = new DataMappingReader(reader, mapping);
  44.  
  45. DataWriter writer = new NullWriter();
  46. // DataWriter writer = StreamWriter.newSystemOutWriter();
  47. Job job = Job.run(reader, writer);
  48. System.out.println("Records Transferred: " + job.getRecordsTransferred());
  49. System.out.println("Running Time: " + job.getRunningTimeAsString());
  50. }
  51. }
  52.  

Code Walkthrough

  1. Schema is imported from the input file account-schema-definition.xml. Source and target entities are declared from that schema.
  2. BasicLookup is created to specify additional information about Ratings. A new field status is added with four rows.
  3. DataMapping is created to enable the field mapping rules from the input file credit-balance-mapping.xml. Source, target entities and lookup declared in the previous steps are applied in the mapping.
  4. CSVReader is created corresponding to the input file credit-balance-02-100000.csv.
  5. The CSVReader.setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  6. DataMappingReader instance is created to apply mapping for the input.
  7. Data is transferred from the reader to NullWriter which serves to discard records via Job.run().
  8. The number of transferred records and the running time are printed in the console.

Console output

  1. Records Transferred: 100000
  2. Running Time: 2 Seconds, 938 Milliseconds

Example Records

  1. -----------------------------------------------
  2. 0 - Record (MODIFIED) {
  3. 0:[Account_ID]:STRING=[A101]:String
  4. 1:[Account_LastName]:STRING=[Reeves]:String
  5. 2:[Account_FirstName]:STRING=[Keanu]:String
  6. 3:[Account_Balance]:DOUBLE=[10.0]:Double
  7. 4:[Account_CreditLimit]:BIG_DECIMAL=[20]:BigDecimal
  8. 5:[Account_CreatedDate]:DATE=[1998-01-17]:Date
  9. 6:[Account_Rating]:STRING=[A]:String
  10. 7:[Account_Status]:STRING=[Updated]:String
  11. }
  12.  
  13. -----------------------------------------------
  14. 1 - Record (MODIFIED) {
  15. 0:[Account_ID]:STRING=[A312]:String
  16. 1:[Account_LastName]:STRING=[Butler]:String
  17. 2:[Account_FirstName]:STRING=[Gerard]:String
  18. 3:[Account_Balance]:DOUBLE=[90.0]:Double
  19. 4:[Account_CreditLimit]:BIG_DECIMAL=[20]:BigDecimal
  20. 5:[Account_CreatedDate]:DATE=[2003-08-06]:Date
  21. 6:[Account_Rating]:STRING=[B]:String
  22. 7:[Account_Status]:STRING=[Late]:String
  23. }
  24.  
  25. -----------------------------------------------
  26. 2 - Record (MODIFIED) {
  27. 0:[Account_ID]:STRING=[A101]:String
  28. 1:[Account_LastName]:STRING=[Hewitt]:String
  29. 2:[Account_FirstName]:STRING=[Jennifer Love]:String
  30. 3:[Account_Balance]:DOUBLE=[0.0]:Double
  31. 4:[Account_CreditLimit]:BIG_DECIMAL=[18700]:BigDecimal
  32. 5:[Account_CreatedDate]:DATE=[1985-05-25]:Date
  33. 6:[Account_Rating]:STRING=[B]:String
  34. 7:[Account_Status]:STRING=[Late]:String
  35. }
  36. ...
Mobile Analytics