Use Predicates in Filters

Data Pipeline supports filtering records using either built-in filters or your own custom logic.

This example demonstrates how to use a Java lambda Predicate to create a user-defined filter that selects records from a CSV file and writes them to the console. The filter is applied using FilteringReader, which passes through only the records that match the predicate.

 

Input CSV File

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

Java Code

package com.northconcepts.datapipeline.examples.cookbook;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.filter.Filter;
import com.northconcepts.datapipeline.filter.FilteringReader;
import com.northconcepts.datapipeline.job.Job;

import java.io.File;

public class UsePredicatesInFilters {

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
                .setFieldNamesInFirstRow(true);

        FilteringReader filteringReader = new FilteringReader(reader);
        filteringReader.add(Filter.of(record -> record.getField("Rating").getValue().equals("A")));

        DataWriter writer = new StreamWriter(System.out);

        Job.run(filteringReader, writer);
    }
}

Code Walkthrough

  1. CSVReader is created using the file path of the input file credit-balance-01.csv.
  2. The filter is created using Filter.of(). This predicate returns true only for records where the Rating field is "A".
  3. A new FilteringReader is created using the CSVReader, and the custom predicate-based filter is applied to it.
  4. Job.run(reader, writer) is used to transfer the data from FilteringReader to the console. See how to compile and run data pipeline jobs. 

Console Output

-----------------------------------------------
0 - Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:STRING=[9315.45]:String
    4:[CreditLimit]:STRING=[10000.00]:String
    5:[AccountCreated]:STRING=[1/17/1998]:String
    6:[Rating]:STRING=[A]:String
}

-----------------------------------------------
1 - Record {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:STRING=[49654.87]:String
    4:[CreditLimit]:STRING=[100000.00]:String
    5:[AccountCreated]:STRING=[12/5/2006]:String
    6:[Rating]:STRING=[A]:String
}

-----------------------------------------------
2 records

]]>
Mobile Analytics