Use SchemaFilter to Validate Records in a Pipeline
Updated: Jun 23, 2023
In this example, you will learn how you can use DataPipeline to read data from a CSV file and perform validation checks on specific fields based on predefined rules. It filters and returns only the data that meets the validation criteria, providing users with a streamlined and error-free dataset.
Input CSV file
Handle,Title,Body (HTML),Vendor,Type,Tags,Published,Option1 Name,Option1 Value,Option2 Name,Option2 Value,Option3 Name,Option3 Value,Variant SKU,Variant Grams,Variant Inventory Tracker,Variant Inventory Qty,Variant Inventory Policy,Variant Fulfillment Service,Variant Price,Variant Compare At Price,Variant Requires Shipping,Variant Taxable,Variant Barcode,Image Src,Image Position,Image Alt Text,Gift Card,SEO Title,SEO Description,Google Shopping / Google Product Category,Google Shopping / Gender,Google Shopping / Age Group,Google Shopping / MPN,Google Shopping / AdWords Grouping,Google Shopping / AdWords Labels,Google Shopping / Condition,Google Shopping / Custom Product,Google Shopping / Custom Label 0,Google Shopping / Custom Label 1,Google Shopping / Custom Label 2,Google Shopping / Custom Label 3,Google Shopping / Custom Label 4,Variant Image,Variant Weight Unit,Variant Tax Code chain-bracelet,7 Shakra Bracelet,"7 chakra bracelet, in blue or black.",Company 123,Bracelet,Beads,true,Color,Blue,,,,,,0,,1,deny,manual,42.99,44.99,true,true,,https://burst.shopifycdn.com/photos/7-chakra-bracelet_925x.jpg,1,,false,,,,,,,,,,,,,,,,https://burst.shopifycdn.com/photos/navy-blue-chakra-bracelet_925x.jpg,kg, chain-bracelet,,,,,,,,Black,,,,,,0,,0,deny,manual,42.99,44.99,true,true,,https://burst.shopifycdn.com/photos/navy-blue-chakra-bracelet_925x.jpg,2,,,,,,,,,,,,,,,,,,https://burst.shopifycdn.com/photos/7-chakra-bracelet_925x.jpg,kg, leather-anchor,Anchor Bracelet Mens,Black leather bracelet with gold or silver anchor for men.,Company 123,Bracelet,"Anchor, Gold, Leather, Silver",true,Color,Gold,,,,,,0,,1,deny,manual,69.99,85,true,true,,https://burst.shopifycdn.com/photos/anchor-bracelet-mens_925x.jpg,1,,false,,,,,,,,,,,,,,,,https://burst.shopifycdn.com/photos/anchor-bracelet-mens_925x.jpg,kg,
...
Java Code Listing
package com.northconcepts.datapipeline.foundations.examples.schema; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.FieldType; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.foundations.schema.BooleanFieldDef; import com.northconcepts.datapipeline.foundations.schema.EntityDef; import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef; import com.northconcepts.datapipeline.foundations.schema.SchemaFilter; import com.northconcepts.datapipeline.foundations.schema.TextFieldDef; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.SelectFields; import com.northconcepts.datapipeline.transform.TransformingReader; import com.northconcepts.datapipeline.validate.ValidatingReader; public class UseSchemaFilterToValidateRecordsInAPipeline { public static void main(String[] args) { EntityDef entityDef = new EntityDef().setName("Jewelry") .addField(new NumericFieldDef("Variant Price", FieldType.DOUBLE).setMaximum(5).setMaximum(500).setRequired(true)) .addField(new BooleanFieldDef("Variant Taxable", FieldType.BOOLEAN)) .addField(new TextFieldDef("Title", FieldType.STRING).setMaximumLength(256)) .addField(new TextFieldDef("Option1 Value", FieldType.STRING)); DataReader reader = new CSVReader(new File("data/input/jewelry.csv")) .setAllowMultiLineText(true) .setFieldNamesInFirstRow(true); reader = new TransformingReader(reader) .add( new BasicFieldTransformer("Variant Price").nullToValue(100d).stringToDouble(), new BasicFieldTransformer("Variant Taxable").stringToBoolean()) .add(new SelectFields("Title", "Variant Price", "Variant Taxable", "Option1 Value")); reader = new ValidatingReader(reader).add(new SchemaFilter(entityDef)); DataWriter writer = StreamWriter.newSystemOutWriter(); Job.run(reader, writer); } }
Code walkthrough
- EntityDef object called "Jewelry" is created with four fields. Validation rules for data types, minimum and maximum values are applied for each field.
- CSVReader is created corresponding to the input file
jewelry.csv
. - The
CSVReader.setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. - TransformingReader instance is created to change the data types and order of the columns in the input data reader:
- The "Variant Price" field is parsed to double type. If its value is null, 100 is assigned as a double value.
- The "Variant Taxable" field is parsed to boolean type.
- Fields are arranged in the following order: "Title", "Variant Price", "Variant Taxable", "Option1 Value".
- A ValidatingReader is created to apply a schema filter from the entity created in Step 1 to the incoming data.
- Data is transferred from the
validatingReader
to theStreamWriter(System.out)
via Job.run() method.
Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[Title]:STRING=[7 Shakra Bracelet]:String 1:[Variant Price]:DOUBLE=[42.99]:Double 2:[Variant Taxable]:BOOLEAN=[true]:Boolean 3:[Option1 Value]:STRING=[Blue]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Title]:STRING=[null] 1:[Variant Price]:DOUBLE=[42.99]:Double 2:[Variant Taxable]:BOOLEAN=[true]:Boolean 3:[Option1 Value]:STRING=[Black]:String } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[Title]:STRING=[Anchor Bracelet Mens]:String 1:[Variant Price]:DOUBLE=[69.99]:Double 2:[Variant Taxable]:BOOLEAN=[true]:Boolean 3:[Option1 Value]:STRING=[Gold]:String }
...