Handle Filtered Records

In this example, you will learn how you can use DataPipeline to handle filtered records by applying a filtration rule to the incoming data. It identifies records that fail the filter criteria and adds an additional field to each record to store the reason for the failure. This allows users to effectively manage and analyze the filtered data based on the defined rules.

Organizations often need to segment their data based on specific criteria or conditions. This allows users to filter out records that do not meet the desired criteria, allowing them to create targeted data subsets for further analysis or processing.

 

Input FW file

Account LastName        FirstName       Balance     CreditLimit   AccountCreated  Rating 
101     Reeves          Keanu           9315.45     10000.00      1/17/1998       A      
312     Butler          Gerard          90.00       1000.00       8/6/2003        B      
868     Hewitt          Jennifer Love   0           17000.00      5/25/1985       B      
761     Pinkett-Smith   Jada            49654.87    100000.00     12/5/2006       A      
317     Murray          Bill            789.65      5000.00       2/5/2007        C      

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.filter.FilterExpression;
import com.northconcepts.datapipeline.filter.FilteringReader;
import com.northconcepts.datapipeline.fixedwidth.FixedWidthReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.memory.MemoryWriter;
import com.northconcepts.datapipeline.transform.SetCalculatedField;
import com.northconcepts.datapipeline.transform.TransformingReader;

public class HandleFilteredRecords {

    public static void main(String[] args) {
        // save discarded records in memory, but can be any DataWriter
        MemoryWriter discardWriter = new MemoryWriter();
        
        // add a "failed_filter_rule" field to each record containing the reason it was discarded 
        String discardReasonFieldName = "failed_filter_rule";
        
        DataReader reader = new FixedWidthReader(new File("example/data/input/credit-balance-01.fw"))
                .addFields(8)
                .addFields(16)
                .addFields(16)
                .addFields(12)
                .skipField(14)  // ignore CreditLimit field
                .skipField(16)  // ignore AccountCreated field
                .skipField(7)   // ignore Rating field
                .setFieldNamesInFirstRow(true);
        
        // Since Balance field is a string, need to convert it to double
        reader = new TransformingReader(reader)
                .add(new SetCalculatedField("Balance", "parseDouble(Balance)"));
        
        reader = new FilteringReader(reader, discardWriter, discardReasonFieldName)
                .add(new FilterExpression("Balance >= 1000"));

        DataWriter writer = new StreamWriter(System.out);
        
        // write the filtered records to STDOUT
        Job.run(reader, writer);
        
        // write the discarded records to STDOUT
        System.out.println("\n---- The discarded records ----");
        Job.run(new MemoryReader(discardWriter.getRecordList()), writer);
    }
}

 

Code walkthrough

  1. MemoryWriter instance is created to save discarded records in memory. 
  2. We may need an additional field in the record to hold the failure message. Therefore, "failed_filter_rule" String field is initialized.
  3. FixedWidthReader is used to read the data from credit-balance-01.fw file. The numbers which are given as a parameter for addFields() and skipFields() methods mean the width of fields.
  4. TransformingReader is used with SetCalculatedField to parse the "Balance" field from String to Double type.
  5. Filtration of records is implemented with FilteringReader. In the given example, it takes data from reader, filters records if the balance is over 1000, and saves the ones that fail the filtration to discardWriter with an additional field about the reason for failure.
  6. Data is transferred from the reader to the writer via Job.run() method.
  7. The records of discarded writers are transferred to StreamWriter(System.out) instance in order to be shown in the console.

 

Console Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:DOUBLE=[9315.45]:Double
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:DOUBLE=[49654.87]:Double
}

-----------------------------------------------
2 records

---- The discarded records ----
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:DOUBLE=[90.0]:Double
    4:[failed_filter_rule]:STRING=[record satisfies expression: Balance >= 1000]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account]:STRING=[868]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:DOUBLE=[0.0]:Double
    4:[failed_filter_rule]:STRING=[record satisfies expression: Balance >= 1000]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:DOUBLE=[789.65]:Double
    4:[failed_filter_rule]:STRING=[record satisfies expression: Balance >= 1000]:String
}

-----------------------------------------------
3 records
Mobile Analytics