Handle Filtered Records
Updated: Jul 5, 2023
In this example, you will learn how you can use DataPipeline to handle filtered records by applying a filtration rule to the incoming data. It identifies records that fail the filter criteria and adds an additional field to each record to store the reason for the failure. This allows users to effectively manage and analyze the filtered data based on the defined rules.
Organizations often need to segment their data based on specific criteria or conditions. This allows users to filter out records that do not meet the desired criteria, allowing them to create targeted data subsets for further analysis or processing.
Input FW file
Account LastName FirstName Balance CreditLimit AccountCreated Rating 101 Reeves Keanu 9315.45 10000.00 1/17/1998 A 312 Butler Gerard 90.00 1000.00 8/6/2003 B 868 Hewitt Jennifer Love 0 17000.00 5/25/1985 B 761 Pinkett-Smith Jada 49654.87 100000.00 12/5/2006 A 317 Murray Bill 789.65 5000.00 2/5/2007 C
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.filter.FilterExpression; import com.northconcepts.datapipeline.filter.FilteringReader; import com.northconcepts.datapipeline.fixedwidth.FixedWidthReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.memory.MemoryWriter; import com.northconcepts.datapipeline.transform.SetCalculatedField; import com.northconcepts.datapipeline.transform.TransformingReader; public class HandleFilteredRecords { public static void main(String[] args) { // save discarded records in memory, but can be any DataWriter MemoryWriter discardWriter = new MemoryWriter(); // add a "failed_filter_rule" field to each record containing the reason it was discarded String discardReasonFieldName = "failed_filter_rule"; DataReader reader = new FixedWidthReader(new File("example/data/input/credit-balance-01.fw")) .addFields(8) .addFields(16) .addFields(16) .addFields(12) .skipField(14) // ignore CreditLimit field .skipField(16) // ignore AccountCreated field .skipField(7) // ignore Rating field .setFieldNamesInFirstRow(true); // Since Balance field is a string, need to convert it to double reader = new TransformingReader(reader) .add(new SetCalculatedField("Balance", "parseDouble(Balance)")); reader = new FilteringReader(reader, discardWriter, discardReasonFieldName) .add(new FilterExpression("Balance >= 1000")); DataWriter writer = new StreamWriter(System.out); // write the filtered records to STDOUT Job.run(reader, writer); // write the discarded records to STDOUT System.out.println("\n---- The discarded records ----"); Job.run(new MemoryReader(discardWriter.getRecordList()), writer); } }
Code walkthrough
- MemoryWriter instance is created to save discarded records in memory.
- We may need an additional field in the record to hold the failure message. Therefore,
"failed_filter_rule"
String field is initialized. - FixedWidthReader is used to read the data from
credit-balance-01.fw
file. The numbers which are given as a parameter foraddFields()
andskipFields()
methods mean the width of fields. - TransformingReader is used with SetCalculatedField to parse the "Balance" field from String to Double type.
- Filtration of records is implemented with FilteringReader. In the given example, it takes data from
reader
, filters records if the balance is over 1000, and saves the ones that fail the filtration todiscardWriter
with an additional field about the reason for failure. - Data is transferred from the
reader
to thewriter
via Job.run() method. - The records of discarded writers are transferred to StreamWriter(System.out) instance in order to be shown in the console.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:DOUBLE=[9315.45]:Double } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Account]:STRING=[761]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:DOUBLE=[49654.87]:Double } ----------------------------------------------- 2 records ---- The discarded records ---- ----------------------------------------------- 0 - Record (MODIFIED) { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:DOUBLE=[90.0]:Double 4:[failed_filter_rule]:STRING=[record satisfies expression: Balance >= 1000]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Account]:STRING=[868]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:DOUBLE=[0.0]:Double 4:[failed_filter_rule]:STRING=[record satisfies expression: Balance >= 1000]:String } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:DOUBLE=[789.65]:Double 4:[failed_filter_rule]:STRING=[record satisfies expression: Balance >= 1000]:String } ----------------------------------------------- 3 records