Write my own Filter or Validator
Updated: Aug 24, 2023
This example shows how to modify and personalize the default Filter component in DataPipeline. By offering the capability to tailor the filter behavior, it allows you to control how data is selected or excluded in your processing pipeline.
This can be used when refining data extraction from large datasets, implementing custom data inclusion criteria, and enhancing data quality checks for specific use cases.
Input CSV File
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C
Java Code Listing
WriteMyOwnFilterOrValidator.java
package com.northconcepts.datapipeline.examples.cookbook.customization; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.filter.FilteringReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.TransformingReader; import com.northconcepts.datapipeline.validate.ValidatingReader; public class WriteMyOwnFilterOrValidator { public static void main(String[] args) throws Throwable { DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv")) .setFieldNamesInFirstRow(true); // convert balance from string to double reader = new TransformingReader(reader) .add(new BasicFieldTransformer("Balance").stringToDouble()); // ensure all balances are 0.0 or more reader = new ValidatingReader(reader) .setExceptionOnFailure(true) .add(new MyFilter(0.0)); // retain balances over $1000.00 reader = new FilteringReader(reader) .add(new MyFilter(1000.0)); Job.run(reader, new StreamWriter(System.out)); } }
MyFilter.java
package com.northconcepts.datapipeline.examples.cookbook.customization; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.filter.Filter; public class MyFilter extends Filter { private final double minimumBalance; public MyFilter(double minimumBalance) { this.minimumBalance = minimumBalance; } public boolean allow(Record record) { return record.getField("Balance").getValueAsDouble() >= minimumBalance; } public String toString() { return "Balance >= " + minimumBalance; } }
Code Walkthrough
- CSVReader is created corresponding to the input file
credit-balance-01.csv
. - The
setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. - TransformingReader is created to sequentially apply changes to the incoming data.
- "Balance" field is parsed from string to double type using BasicFieldTransformer.
- MyFilter is a custom class to apply validation/filtration rules to process the data. It is first called with ValidatingReader instance to ensure that the balances of all records are equal or greater than 0.
- Next, MyFilter is used with FilteringReader to retain the records with a balance of at least 1000.
- Job.run() is used to transfer the data from
reader
toStreamWriter(System.out)
. See how to compile and run data pipeline jobs.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:DOUBLE=[9315.45]:Double 4:[CreditLimit]:STRING=[10000.00]:String 5:[AccountCreated]:STRING=[1/17/1998]:String 6:[Rating]:STRING=[A]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Account]:STRING=[761]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:DOUBLE=[49654.87]:Double 4:[CreditLimit]:STRING=[100000.00]:String 5:[AccountCreated]:STRING=[12/5/2006]:String 6:[Rating]:STRING=[A]:String } ----------------------------------------------- 2 records