Capture Data Not Joined in a Lookup

Updated: May 30, 2022

For this example we are going to see how we can use DataPipline to capture data that is not joined in a Lookup.

Lookup mainly searches for data in another source to merge with each record passing through this lookup's transformer -- the streaming version of a join.

Lookup is similar to how SQL queries work where you have, for exampe, an id of a product and you would like to get all data that is associated with that id.

Input CSV file

credit-balance.csv

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000,17-01-1998,A
312,Butler,Gerard,90,1000,06-08-2003,B
101,Hewitt,Jennifer Love,0,17000,25-05-1985,B
312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A
317,Murray,Bill,789.65,5000,05-02-2007,C
317,Murray,Bill,1,5000,05-02-2007,D


rating-table-01.csv

rating_code,rating_description
A,Class A
B,Class B
C,Class C

Java code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;
import java.util.List;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.FieldList;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryWriter;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.transform.lookup.DataReaderLookup;
import com.northconcepts.datapipeline.transform.lookup.Lookup;
import com.northconcepts.datapipeline.transform.lookup.LookupTransformer;

public class CaptureDataNotJoinedInLookup {

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv"))
                .setFieldNamesInFirstRow(true);

        /*
         * This lookup matches [credit-balance-01.csv].[Rating] to
         * [rating-table-01.csv].[rating_code] to return
         * [rating-table-01.csv].[rating_description]
         */
        Lookup lookup = new DataReaderLookup(
                new CSVReader(new File("example/data/input/rating-table-01.csv"))
                        .setFieldNamesInFirstRow(true),
                new FieldList("rating_code"),
                new FieldList("rating_description"));

        MemoryWriter discardWriter = new MemoryWriter();
        
        reader = new TransformingReader(reader, discardWriter)
                .add(new DiscardLookupTransformer(new FieldList("Rating"), lookup)
                        .setAllowNoResults(true));

        Job.run(reader, new StreamWriter(System.out));
        
        System.out.println("==================================================================================");
        System.out.println("Not Joined Records: " + discardWriter.getRecordList());
    }

    private static class DiscardLookupTransformer extends LookupTransformer {

        public DiscardLookupTransformer(FieldList fields, Lookup lookup) {
            super(fields, lookup);
        }
        
        @Override
        protected RecordList noResults(Record originalRecord, List arguments) {
            if (isAllowNoResults()) {
               getReader().getDiscardWriter().write(originalRecord); 
            }
            return super.noResults(originalRecord, arguments);
        }
    }
    
}

Code walkthrough

  1. Two CSVReader are created using the file path credit-balance.csv and rating-table-01.csv respectively.
  2. DataReaderLookup accepts three argument i.e. reader that contains the input file, parameterFields which is the fields you want to match and resultFields which contains the result.
  3. MemoryWriter is used to write records to an in-memory RecordList.
  4. TransformingReader is a proxy that applies transformations to records passing through. it accepts the DataReader to transform and the DataWriter to send discarded records to.
  5. Data is transferred from the reader to StreamWriter via Job.run() method.
  6. DiscardLookupTransformer is used to write the data to a record.

Output

-----------------------------------------------
0 - Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:STRING=[9315.45]:String
    4:[CreditLimit]:STRING=[10000]:String
    5:[AccountCreated]:STRING=[17-01-1998]:String
    6:[Rating]:STRING=[A]:String
    7:[rating_description]:STRING=[Class A]:String
}

-----------------------------------------------
1 - Record {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:STRING=[90]:String
    4:[CreditLimit]:STRING=[1000]:String
    5:[AccountCreated]:STRING=[06-08-2003]:String
    6:[Rating]:STRING=[B]:String
    7:[rating_description]:STRING=[Class B]:String
}

-----------------------------------------------
2 - Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:STRING=[0]:String
    4:[CreditLimit]:STRING=[17000]:String
    5:[AccountCreated]:STRING=[25-05-1985]:String
    6:[Rating]:STRING=[B]:String
    7:[rating_description]:STRING=[Class B]:String
}

-----------------------------------------------
3 - Record {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:STRING=[49654.87]:String
    4:[CreditLimit]:STRING=[100000]:String
    5:[AccountCreated]:STRING=[05-12-2006]:String
    6:[Rating]:STRING=[A]:String
    7:[rating_description]:STRING=[Class A]:String
}

-----------------------------------------------
4 - Record {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:STRING=[789.65]:String
    4:[CreditLimit]:STRING=[5000]:String
    5:[AccountCreated]:STRING=[05-02-2007]:String
    6:[Rating]:STRING=[C]:String
    7:[rating_description]:STRING=[Class C]:String
}

-----------------------------------------------
5 - Record {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:STRING=[1]:String
    4:[CreditLimit]:STRING=[5000]:String
    5:[AccountCreated]:STRING=[05-02-2007]:String
    6:[Rating]:STRING=[D]:String
}

-----------------------------------------------
6 records
14:17:47,247 DEBUG [main] datapipeline:661 - job::Success
==================================================================================
Not Joined Records: RecordList [records=[Record {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:STRING=[1]:String
    4:[CreditLimit]:STRING=[5000]:String
    5:[AccountCreated]:STRING=[05-02-2007]:String
    6:[Rating]:STRING=[D]:String
}
]]
Mobile Analytics