Transform Records using Schema
Updated: Aug 10, 2023
This example shows how to process and transform data based on predefined schema definitions or rules. It provides you with a flexible and customizable framework to map, manipulate, and convert data from one format to another. This can be used in data integration projects, ETL (Extract, Transform, Load) processes, data migration tasks, and data preparation for analytics or reporting. By leveraging the power of schema-based transformations, you can easily standardize, clean, and restructure your data to meet specific requirements or align with target systems.
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.schema; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.FieldType; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.foundations.schema.BooleanFieldDef; import com.northconcepts.datapipeline.foundations.schema.EntityDef; import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef; import com.northconcepts.datapipeline.foundations.schema.SchemaTransformer; import com.northconcepts.datapipeline.foundations.schema.TemporalFieldDef; import com.northconcepts.datapipeline.foundations.schema.TextFieldDef; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.transform.TransformingReader; public class TransformRecordsUsingSchema { public static void main(String[] args) { EntityDef entityDef = new EntityDef(); entityDef.addField(new TextFieldDef("name", FieldType.STRING).setRequired(true).setAllowBlank(false).setMaximumLength(100)); entityDef.addField(new NumericFieldDef("age", FieldType.INT).setRequired(true).setMinimum(25).setMaximum(75)); entityDef.addField(new NumericFieldDef("balance", FieldType.BIG_DECIMAL)); entityDef.addField(new BooleanFieldDef("active", FieldType.BOOLEAN).setAllowedValues(null)); entityDef.addField(new TemporalFieldDef("lastUpdated", FieldType.DATE).setPattern("yyyy-MM-dd")); Record record1 = new Record(); record1.addField("name", "John Smith"); record1.addField("age", "72"); record1.addField("balance", "31.05"); record1.addField("active", "true"); // "yes" and non-zero numbers also map to true record1.addField("lastUpdated", "2019-12-19"); Record record2 = new Record(); record2.addField("name", "Jane Powers"); record2.addField("age", "26"); record2.addField("balance", null); record2.addField("active", "false"); // "yes" and non-zero numbers also map to true record2.addField("lastUpdated", "2020-10-30"); DataReader reader = new MemoryReader(new RecordList(record1, record2)); reader = new TransformingReader(reader).add(new SchemaTransformer(entityDef)); DataWriter writer = StreamWriter.newSystemOutWriter(); Job.run(reader, writer); } }
Code Walkthrough
- A new EntityDef instance is created with five fields. Data types and validation rules (constraints) are also specified in this entity definition.
- Two Records are then created with the same field names as the EntityDef. They contain different values for each field.
- MemoryReader is used to obtain records from an in-memory RecordList i.e. in this case
record1
andrecord2
. - TransformingReader is created to apply changes to incoming data. Its internal object (SchemaTransformer) uses
entityDef
definitions/rules to validate data. - Data is transferred from
reader
toStreamWriter.newSystemOutWriter()
via Job.run() method. See how to compile and run data pipeline jobs.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[name]:STRING=[John Smith]:String 1:[age]:INT=[72]:Integer 2:[balance]:BIG_DECIMAL=[31.05]:BigDecimal 3:[active]:BOOLEAN=[true]:Boolean 4:[lastUpdated]:DATE=[2019-12-19]:Date } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[name]:STRING=[Jane Powers]:String 1:[age]:INT=[26]:Integer 2:[balance]:BIG_DECIMAL=[null] 3:[active]:BOOLEAN=[false]:Boolean 4:[lastUpdated]:DATE=[2020-10-30]:Date } ----------------------------------------------- 2 records