Declaratively Transform Records using Schema
This example shows how you can use Data Pipeline to transform datasets based on rules and definitions provided in XML files. It enables users to define custom mappings, conversions, and manipulations for their data, allowing for seamless data transformation processes.
During data migration or ETL (Extract, Transform, Load) processes, data often needs to be transformed to match the target system or schema. This example provides the necessary tools to define the required transformations in the XML file, ensuring the successful and accurate migration or loading of data.
Input Files
credit-balance-02.csv
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1998-1-17,A 312,Butler,Gerard,90.00,1000.00,2003-8-6,B 101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B 312,Pinkett-Smith,Jada,49654.87,100000.00,2006-12-5,A 317,Murray,Bill,789.65,5000.00,2007-2-5,C 317,Murray,Bill,1,5000.00,2007-2-5,A
account-schema-definition.xml
<?xml version="1.0"?> <schema name="Account_Schema" description="This is test schema."> <entities> <entity name="SourceAccountEntity" allowExtraFieldsInValidation="true"> <fields> <field name="Account" type="LONG" required="true"/> <field name="LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/> <field name="FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/> <field name="Balance" type="DOUBLE" required="false" minimum="0" maximum="100000"/> <field name="CreditLimit" type="BIG_DECIMAL" required="false" minimum="0" maximum="1000000"/> <field name="AccountCreated" type="DATE" required="true"/> <field name="Rating" type="STRING" required="true" allowBlank="false"/> </fields> </entity> <entity name="TargetAccountEntity" allowExtraFieldsInValidation="true"> <fields> <field name="Account_ID" type="STRING" required="true"/> <field name="Account_LastName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/> <field name="Account_FirstName" type="STRING" required="true" minimumLength="0" maximumLength="50" allowBlank="false"/> <field name="Account_Balance" type="DOUBLE" required="true" minimum="0" maximum="100000"/> <field name="Account_CreditLimit" type="BIG_DECIMAL" required="true" minimum="0" maximum="1000000"/> <field name="Account_CreatedDate" type="DATE" required="true"/> <field name="Account_Rating" type="STRING" required="true" allowBlank="false"/> <field name="Account_Status" type="STRING" required="true" allowBlank="false"/> </fields> </entity> </entities> </schema>
Java code listing
package com.northconcepts.datapipeline.foundations.examples.schema; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.NullWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.foundations.schema.EntityDef; import com.northconcepts.datapipeline.foundations.schema.SchemaDef; import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.transform.TransformingReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; public class DeclarativelyTransformRecordsUsingSchema { public static void main(String... args) throws FileNotFoundException { SchemaDef schema = new SchemaDef() .fromXml(new FileInputStream("example/data/input/datamapping/account-schema-definition.xml")); EntityDef sourceAccountEntity = schema.getEntity("SourceAccountEntity"); DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02b.csv")) .setFieldNamesInFirstRow(true); reader = new TransformingReader(reader) .add(new SchemaTransformer(sourceAccountEntity)); DataWriter writer = new NullWriter(); Job job = Job.run(reader, writer); System.out.println("Records Transferred: " + job.getRecordsTransferred()); System.out.println("Running Time: " + job.getRunningTimeAsString()); } }
Code Walkthrough
- Schema and source entity definitions are imported from the input file
account-schema-definition.xml
. The entity definition in the file contains field names, data types, and constraints such as "required", minimum, maximum length, and values. - CSVReader instance is created corresponding to the input file
credit-balance-02.csv
. - TransformingReader is used to convert the dataset from the previous step based on the source entity definitions in Step 1.
- Data are transferred from
reader
to the NullWriter via Job.run() method. See how to compile and run data pipeline jobs. - The number of records and running time are printed in the console.
CSVReader
Obtains records from a Comma Separated Value (CSV) or delimited stream. It extends TextReader class and can be created using or Reader object. Passing true
to method setFieldNamesInFirstRow() in this class enables the CSVReader
to use the names specified in the first row of the input data as field names.
Console output
Records Transferred: 6 Running Time: 139 Milliseconds