Declaratively Transform Records using Schema

This example shows how you can use Data Pipeline to transform datasets based on rules and definitions provided in XML files. It enables users to define custom mappings, conversions, and manipulations for their data, allowing for seamless data transformation processes.

During data migration or ETL (Extract, Transform, Load) processes, data often needs to be transformed to match the target system or schema. This example provides the necessary tools to define the required transformations in the XML file, ensuring the successful and accurate migration or loading of data.

Input Files

credit-balance-02.csv

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A
312,Butler,Gerard,90.00,1000.00,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A
317,Murray,Bill,789.65,5000.00,2007-2-5,C
317,Murray,Bill,1,5000.00,2007-2-5,A

account-schema-definition.xml

<?xml version="1.0"?>
<schema name="Account_Schema" description="This is test schema.">
  <entities>
    <entity name="SourceAccountEntity" allowExtraFieldsInValidation="true">
      <fields>
        <field name="Account" type="LONG" required="true"/>
        <field name="LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="Balance" type="DOUBLE" required="false" minimum="0" maximum="100000"/>
        <field name="CreditLimit" type="BIG_DECIMAL" required="false" minimum="0" maximum="1000000"/>
        <field name="AccountCreated" type="DATE" required="true"/>
        <field name="Rating" type="STRING" required="true" allowBlank="false"/>
      </fields>
    </entity>
    <entity name="TargetAccountEntity" allowExtraFieldsInValidation="true">
      <fields>
        <field name="Account_ID" type="STRING" required="true"/>
        <field name="Account_LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="Account_FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/>
        <field name="Account_Balance" type="DOUBLE" required="true" minimum="0" maximum="100000"/>
        <field name="Account_CreditLimit" type="BIG_DECIMAL" required="true" minimum="0" maximum="1000000"/>
        <field name="Account_CreatedDate" type="DATE" required="true"/>
        <field name="Account_Rating" type="STRING" required="true" allowBlank="false"/>
        <field name="Account_Status" type="STRING" required="true" allowBlank="false"/>
      </fields>
    </entity>
  </entities>
</schema>

 

Java code listing

package com.northconcepts.datapipeline.foundations.examples.schema;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.NullWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.transform.TransformingReader;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;

public class DeclarativelyTransformRecordsUsingSchema {

    public static void main(String... args) throws FileNotFoundException {
        SchemaDef schema = new SchemaDef()
            .fromXml(new FileInputStream("example/data/input/datamapping/account-schema-definition.xml"));
        EntityDef sourceAccountEntity = schema.getEntity("SourceAccountEntity");

        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02b.csv"))
            .setFieldNamesInFirstRow(true);

        reader = new TransformingReader(reader)
            .add(new SchemaTransformer(sourceAccountEntity));

        DataWriter writer = new NullWriter();

        Job job = Job.run(reader, writer);
        System.out.println("Records Transferred: " + job.getRecordsTransferred());
        System.out.println("Running Time: " + job.getRunningTimeAsString());
    }
}

 

Code Walkthrough

  1. Schema and source entity definitions are imported from the input file account-schema-definition.xml. The entity definition in the file contains field names, data types, and constraints such as "required", minimum, maximum length, and values. 
  2. CSVReader instance is created corresponding to the input file credit-balance-02.csv.
  3. TransformingReader is used to convert the dataset from the previous step based on the source entity definitions in Step 1.
  4. Data are transferred from reader to the NullWriter via Job.run() method. See how to compile and run data pipeline jobs.
  5. The number of records and running time are printed in the console.

 

CSVReader

Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true to method setFieldNamesInFirstRow() in this class enables the CSVReader to use the names specified in the first row of the input data as field names.

 

Console output

Records Transferred: 6
Running Time: 139 Milliseconds
Mobile Analytics