Validate Root Record Using Schema
This example validates records based on predefined rules specified in a schema or entity definition. It allows users to ensure the integrity and quality of their data by enforcing rules such as data type constraints, null values, and custom business logic validations.
This can be used to validate financial transactions against regulatory compliance rules or detect anomalies in transaction data. This example provides a flexible and customizable framework for data validation, empowering users to maintain data quality and adhere to specific validation requirements in their respective settings.
Java Code
package com.northconcepts.datapipeline.foundations.examples.schema; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.FieldType; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.filter.FilterExpression; import com.northconcepts.datapipeline.foundations.schema.EntityDef; import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef; import com.northconcepts.datapipeline.foundations.schema.SchemaFilter; import com.northconcepts.datapipeline.foundations.schema.TextFieldDef; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.validate.ValidatingReader; public class ValidateRootRecordUsingSchema { public static void main(String[] args) { EntityDef entityDef = new EntityDef("customers") .addField(new NumericFieldDef("customer_id", FieldType.LONG)) .addField(new TextFieldDef("customer_name", FieldType.STRING)) .addField(new TextFieldDef("address", FieldType.STRING)) .addField(new TextFieldDef("city", FieldType.STRING)) .addField(new TextFieldDef("country", FieldType.STRING)); entityDef.addValidation(new FilterExpression("recordContainsNonNullValue(this, 'customer_id')")) .addValidation(new FilterExpression("recordContainsNonNullField(this, 'customer_name')")) .addValidation(new FilterExpression("getValue(this, 'customer_name', 'no name') != 'Lord Voldemort'")) .addValidation(new FilterExpression("recordContainsValue(this, 'address')")) .addValidation(new FilterExpression("!recordContainsField(this, 'zipcode')")); RecordList records = new RecordList() .add(new Record() .setField("customer_id", 1000L) .setField("customer_name", "Harry Potter") .setField("address", "4 Privet Drive") .setField("city", "Hogwarts") .setField("country", "Scotland")) .add(new Record() .setField("customer_id", null) .setField("customer_name", "Lord Voldemort") .setField("city", "Burrow") .setField("country", "England") .setField("zipcode", "00000")); DataReader reader = new MemoryReader(records); reader = new ValidatingReader(reader).add(new SchemaFilter(entityDef)); DataWriter writer = StreamWriter.newSystemOutWriter(); Job.run(reader, writer); } }
Code Walkthrough
This code defines a schema for a set of customer records, adds validations to the schema, creates a list of sample records, reads the records, validates them against the schema, and writes the results to the console output.
- Import necessary classes from the DataPipeline library.
- Define a new entity called
customers
and add five fields to it:customer_id (long), customer_name (string), address (string), city (string), and country (string)
. - Add five validations to the
customers
entity usingFilterExpression
objects:- Check that the
customer_id
field is not null. - Check that the
customer_name
field is not null. - Check that the
customer_name
field is not equal to "Lord Voldemort". - Check that the
address
field has a value. - Check that the
zipcode
field does not exist.
- Check that the
- Create a RecordList object containing two sample records. Each record has a different set of field values.
- Create a DataReader object using the MemoryReader class and pass it the RecordList object.
- Create a ValidatingReader with an argument of
reader
object. - Add a SchemaFilter object with an attribute of
customers
entity to the ValidatingReader. - Create a DataWriter object using the StreamWriter class and set it to write to the console output.
- Run the Job by passing the DataReader and DataWriter objects to the Job.run() method. The ValidatingReader will read the records from the RecordList, validate them against the schema, and write the results to the console output.
Console Output
14:21:36,784 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. 14:21:36,836 DEBUG [main] datapipeline:615 - Job[1,job-1,Thu Jun 01 14:21:36 EEST 2023]::Start ----------------------------------------------- 0 - Record (MODIFIED) { 0:[customer_id]:LONG=[1000]:Long 1:[customer_name]:STRING=[Harry Potter]:String 2:[address]:STRING=[4 Privet Drive]:String 3:[city]:STRING=[Hogwarts]:String 4:[country]:STRING=[Scotland]:String } ----------------------------------------------- 1 records 14:21:36,979 DEBUG [main] datapipeline:664 - job::Failure Exception in thread "main" com.northconcepts.datapipeline.core.DataException: validation [Ensure record matches "customers" entity definition] failed on record #1 Record (MODIFIED) { 0:[customer_id]:UNDEFINED=[null] 1:[customer_name]:STRING=[Lord Voldemort]:String 2:[city]:STRING=[Burrow]:String 3:[country]:STRING=[England]:String 4:[zipcode]:STRING=[00000]:String } ------------------------------- EntityDef=[{ "addMissingOptionalFields" : false, "allowExtraFieldsInMapping" : true, "allowExtraFieldsInValidation" : true, "attributes" : { }, "fields" : [ { "array" : false, "attributes" : { }, "entityQualifiedName" : "customers.customer_id", ...] MemoryReader-1.bufferSize=[0] MemoryReader-1.closedOn=[null] MemoryReader-1.description=[null] MemoryReader-1.exhausted=[false] MemoryReader-1.id=[1] MemoryReader-1.lastRecord=[Record (MODIFIED) { 0:[customer_id]:UNDEFINED=[null] 1:[customer_name]:STRING=[Lord Voldemort]:String 2:[city]:STRING=[Burrow]:String 3:[country]:STRING=[England]:String 4:[zipcode]:STRING=[00000]:String } ] MemoryReader-1.lineageSupported=[false] MemoryReader-1.name=[MemoryReader-1] MemoryReader-1.openedOn=[2023.06.01-14:21:36.837] MemoryReader-1.recordCount=[2] MemoryReader-1.saveLineage=[false] MemoryReader-1.state=[OPENED] MemoryReader-1.thread=[main] MemoryReader-1.timestamp=[2023.06.01-14:21:36.978] ValidatingReader-2.bufferSize=[0] ValidatingReader-2.closedOn=[null] ValidatingReader-2.currentFilterIndex=[0] ValidatingReader-2.description=[null] ValidatingReader-2.discardReasonFieldName=[null] ValidatingReader-2.discardWriter=[null] ValidatingReader-2.exceptionOnFailure=[true] ValidatingReader-2.exhausted=[false] ValidatingReader-2.filter=[Ensure record matches "customers" entity definition] ValidatingReader-2.filterClass=[class com.northconcepts.datapipeline.foundations.schema.SchemaFilter] ValidatingReader-2.id=[2] ValidatingReader-2.lastRecord=[Record (MODIFIED) { 0:[customer_id]:LONG=[1000]:Long 1:[customer_name]:STRING=[Harry Potter]:String 2:[address]:STRING=[4 Privet Drive]:String 3:[city]:STRING=[Hogwarts]:String 4:[country]:STRING=[Scotland]:String } ] ValidatingReader-2.lineageSupported=[false] ValidatingReader-2.name=[ValidatingReader-2] ValidatingReader-2.openedOn=[2023.06.01-14:21:36.837] ValidatingReader-2.recordCount=[1] ValidatingReader-2.recordMessageStackTrace=[true] ValidatingReader-2.saveLineage=[false] ValidatingReader-2.state=[OPENED] ValidatingReader-2.thread=[main] ValidatingReader-2.timestamp=[2023.06.01-14:21:36.977] record=[Record (MODIFIED) { 0:[customer_id]:UNDEFINED=[null] 1:[customer_name]:STRING=[Lord Voldemort]:String 2:[city]:STRING=[Burrow]:String 3:[country]:STRING=[England]:String 4:[zipcode]:STRING=[00000]:String } ] validationMessages.length=[4] validationMessages[0]=[Record failed validation rule, expected: record satisfies expression: recordContainsNonNullValue(this, 'customer_id')] validationMessages[1]=[Record failed validation rule, expected: record satisfies expression: getValue(this, 'customer_name', 'no name') != 'Lord Voldemort'] validationMessages[2]=[Record failed validation rule, expected: record satisfies expression: recordContainsValue(this, 'address')] validationMessages[3]=[Record failed validation rule, expected: record satisfies expression: !recordContainsField(this, 'zipcode')] validationTargetFields.length=[4] validationTargetFields[0]=[null] validationTargetFields[1]=[null] validationTargetFields[2]=[null] validationTargetFields[3]=[null] ------------------------------- at com.northconcepts.datapipeline.core.DataObject.exception(DataObject.java:126) at com.northconcepts.datapipeline.validate.ValidatingReader.discard(ValidatingReader.java:83) at com.northconcepts.datapipeline.filter.FilteringReader.interceptRecord(FilteringReader.java:145) at com.northconcepts.datapipeline.core.ProxyReader.readImpl(ProxyReader.java:121) at com.northconcepts.datapipeline.core.DataReader.read(DataReader.java:172) at com.northconcepts.datapipeline.job.Job.run(Job.java:623) at com.northconcepts.datapipeline.job.Job.run(Job.java:52) at com.northconcepts.datapipeline.foundations.examples.schema.ValidateRootRecordUsingSchema.main(ValidateRootRecordUsingSchema.java:54)