Validate Root Record Using Schema

Updated: Jun 27, 2023

This example validates records based on predefined rules specified in a schema or entity definition. It allows users to ensure the integrity and quality of their data by enforcing rules such as data type constraints, null values, and custom business logic validations.

This can be used to validate financial transactions against regulatory compliance rules or detect anomalies in transaction data. This example provides a flexible and customizable framework for data validation, empowering users to maintain data quality and adhere to specific validation requirements in their respective settings.

 

Java Code

package com.northconcepts.datapipeline.foundations.examples.schema;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.FieldType;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.filter.FilterExpression;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef;
import com.northconcepts.datapipeline.foundations.schema.SchemaFilter;
import com.northconcepts.datapipeline.foundations.schema.TextFieldDef;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.validate.ValidatingReader;

public class ValidateRootRecordUsingSchema {

    public static void main(String[] args) {
        EntityDef entityDef =  new EntityDef("customers")
                .addField(new NumericFieldDef("customer_id", FieldType.LONG))
                .addField(new TextFieldDef("customer_name", FieldType.STRING))
                .addField(new TextFieldDef("address", FieldType.STRING))
                .addField(new TextFieldDef("city", FieldType.STRING))
                .addField(new TextFieldDef("country", FieldType.STRING));

        entityDef.addValidation(new FilterExpression("recordContainsNonNullValue(this, 'customer_id')"))
                .addValidation(new FilterExpression("recordContainsNonNullField(this, 'customer_name')"))
                .addValidation(new FilterExpression("getValue(this, 'customer_name', 'no name') != 'Lord Voldemort'"))
                .addValidation(new FilterExpression("recordContainsValue(this, 'address')"))
                .addValidation(new FilterExpression("!recordContainsField(this, 'zipcode')"));

        RecordList records = new RecordList()
                .add(new Record()
                        .setField("customer_id", 1000L)
                        .setField("customer_name", "Harry Potter")
                        .setField("address", "4 Privet Drive")
                        .setField("city", "Hogwarts")
                        .setField("country", "Scotland"))
                .add(new Record()
                        .setField("customer_id", null)
                        .setField("customer_name", "Lord Voldemort")
                        .setField("city", "Burrow")
                        .setField("country", "England")
                        .setField("zipcode", "00000"));


        DataReader reader = new MemoryReader(records);
        reader = new ValidatingReader(reader).add(new SchemaFilter(entityDef));

        DataWriter writer = StreamWriter.newSystemOutWriter();

        Job.run(reader, writer);
    }

}


Code Walkthrough

This code defines a schema for a set of customer records, adds validations to the schema, creates a list of sample records, reads the records, validates them against the schema, and writes the results to the console output.

  1. Import necessary classes from the DataPipeline library.
  2. Define a new entity called customers and add five fields to it: customer_id (long), customer_name (string), address (string), city (string), and country (string).
  3. Add five validations to the customers entity using FilterExpression objects:
    1. Check that the customer_id field is not null.
    2. Check that the customer_name field is not null.
    3. Check that the customer_name field is not equal to "Lord Voldemort".
    4. Check that the address field has a value.
    5. Check that the zipcode field does not exist.
  4. Create a RecordList object containing two sample records. Each record has a different set of field values.
  5. Create a DataReader object using the MemoryReader class and pass it the RecordList object.
  6. Create a ValidatingReader with an argument of reader object.
  7. Add a SchemaFilter object with an attribute of customers entity to the ValidatingReader.
  8. Create a DataWriter object using the StreamWriter class and set it to write to the console output.
  9. Run the Job by passing the DataReader and DataWriter objects to the Job.run() method. The ValidatingReader will read the records from the RecordList, validate them against the schema, and write the results to the console output.

Console Output

14:21:36,784 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
14:21:36,836 DEBUG [main] datapipeline:615 - Job[1,job-1,Thu Jun 01 14:21:36 EEST 2023]::Start
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[customer_id]:LONG=[1000]:Long
    1:[customer_name]:STRING=[Harry Potter]:String
    2:[address]:STRING=[4 Privet Drive]:String
    3:[city]:STRING=[Hogwarts]:String
    4:[country]:STRING=[Scotland]:String
}

-----------------------------------------------
1 records
14:21:36,979 DEBUG [main] datapipeline:664 - job::Failure
Exception in thread "main" com.northconcepts.datapipeline.core.DataException:  validation [Ensure record matches "customers" entity definition] failed on record #1 Record (MODIFIED) {
    0:[customer_id]:UNDEFINED=[null]
    1:[customer_name]:STRING=[Lord Voldemort]:String
    2:[city]:STRING=[Burrow]:String
    3:[country]:STRING=[England]:String
    4:[zipcode]:STRING=[00000]:String
}

-------------------------------
EntityDef=[{
  "addMissingOptionalFields" : false,
  "allowExtraFieldsInMapping" : true,
  "allowExtraFieldsInValidation" : true,
  "attributes" : { },
  "fields" : [ {
    "array" : false,
    "attributes" : { },
    "entityQualifiedName" : "customers.customer_id",
...]
MemoryReader-1.bufferSize=[0]
MemoryReader-1.closedOn=[null]
MemoryReader-1.description=[null]
MemoryReader-1.exhausted=[false]
MemoryReader-1.id=[1]
MemoryReader-1.lastRecord=[Record (MODIFIED) {
    0:[customer_id]:UNDEFINED=[null]
    1:[customer_name]:STRING=[Lord Voldemort]:String
    2:[city]:STRING=[Burrow]:String
    3:[country]:STRING=[England]:String
    4:[zipcode]:STRING=[00000]:String
}
]
MemoryReader-1.lineageSupported=[false]
MemoryReader-1.name=[MemoryReader-1]
MemoryReader-1.openedOn=[2023.06.01-14:21:36.837]
MemoryReader-1.recordCount=[2]
MemoryReader-1.saveLineage=[false]
MemoryReader-1.state=[OPENED]
MemoryReader-1.thread=[main]
MemoryReader-1.timestamp=[2023.06.01-14:21:36.978]
ValidatingReader-2.bufferSize=[0]
ValidatingReader-2.closedOn=[null]
ValidatingReader-2.currentFilterIndex=[0]
ValidatingReader-2.description=[null]
ValidatingReader-2.discardReasonFieldName=[null]
ValidatingReader-2.discardWriter=[null]
ValidatingReader-2.exceptionOnFailure=[true]
ValidatingReader-2.exhausted=[false]
ValidatingReader-2.filter=[Ensure record matches "customers" entity definition]
ValidatingReader-2.filterClass=[class com.northconcepts.datapipeline.foundations.schema.SchemaFilter]
ValidatingReader-2.id=[2]
ValidatingReader-2.lastRecord=[Record (MODIFIED) {
    0:[customer_id]:LONG=[1000]:Long
    1:[customer_name]:STRING=[Harry Potter]:String
    2:[address]:STRING=[4 Privet Drive]:String
    3:[city]:STRING=[Hogwarts]:String
    4:[country]:STRING=[Scotland]:String
}
]
ValidatingReader-2.lineageSupported=[false]
ValidatingReader-2.name=[ValidatingReader-2]
ValidatingReader-2.openedOn=[2023.06.01-14:21:36.837]
ValidatingReader-2.recordCount=[1]
ValidatingReader-2.recordMessageStackTrace=[true]
ValidatingReader-2.saveLineage=[false]
ValidatingReader-2.state=[OPENED]
ValidatingReader-2.thread=[main]
ValidatingReader-2.timestamp=[2023.06.01-14:21:36.977]
record=[Record (MODIFIED) {
    0:[customer_id]:UNDEFINED=[null]
    1:[customer_name]:STRING=[Lord Voldemort]:String
    2:[city]:STRING=[Burrow]:String
    3:[country]:STRING=[England]:String
    4:[zipcode]:STRING=[00000]:String
}
]
validationMessages.length=[4]
validationMessages[0]=[Record failed validation rule, expected: record satisfies expression: recordContainsNonNullValue(this, 'customer_id')]
validationMessages[1]=[Record failed validation rule, expected: record satisfies expression: getValue(this, 'customer_name', 'no name') != 'Lord Voldemort']
validationMessages[2]=[Record failed validation rule, expected: record satisfies expression: recordContainsValue(this, 'address')]
validationMessages[3]=[Record failed validation rule, expected: record satisfies expression: !recordContainsField(this, 'zipcode')]
validationTargetFields.length=[4]
validationTargetFields[0]=[null]
validationTargetFields[1]=[null]
validationTargetFields[2]=[null]
validationTargetFields[3]=[null]
-------------------------------
	at com.northconcepts.datapipeline.core.DataObject.exception(DataObject.java:126)
	at com.northconcepts.datapipeline.validate.ValidatingReader.discard(ValidatingReader.java:83)
	at com.northconcepts.datapipeline.filter.FilteringReader.interceptRecord(FilteringReader.java:145)
	at com.northconcepts.datapipeline.core.ProxyReader.readImpl(ProxyReader.java:121)
	at com.northconcepts.datapipeline.core.DataReader.read(DataReader.java:172)
	at com.northconcepts.datapipeline.job.Job.run(Job.java:623)
	at com.northconcepts.datapipeline.job.Job.run(Job.java:52)
	at com.northconcepts.datapipeline.foundations.examples.schema.ValidateRootRecordUsingSchema.main(ValidateRootRecordUsingSchema.java:54)
Mobile Analytics