Capture Data Not Joined in a Lookup
Updated: May 30, 2022
For this example we are going to see how we can use DataPipline to capture data that is not joined in a Lookup.
Lookup mainly searches for data in another source to merge with each record passing through this lookup's transformer -- the streaming version of a join.
Lookup is similar to how SQL queries work where you have, for exampe, an id of a product and you would like to get all data that is associated with that id.
Input CSV file
credit-balance.csv
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000,17-01-1998,A 312,Butler,Gerard,90,1000,06-08-2003,B 101,Hewitt,Jennifer Love,0,17000,25-05-1985,B 312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A 317,Murray,Bill,789.65,5000,05-02-2007,C 317,Murray,Bill,1,5000,05-02-2007,D
rating-table-01.csv
rating_code,rating_description A,Class A B,Class B C,Class C
Java code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import java.util.List; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.FieldList; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryWriter; import com.northconcepts.datapipeline.transform.TransformingReader; import com.northconcepts.datapipeline.transform.lookup.DataReaderLookup; import com.northconcepts.datapipeline.transform.lookup.Lookup; import com.northconcepts.datapipeline.transform.lookup.LookupTransformer; public class CaptureDataNotJoinedInLookup { public static void main(String[] args) { DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv")) .setFieldNamesInFirstRow(true); /* * This lookup matches [credit-balance-01.csv].[Rating] to * [rating-table-01.csv].[rating_code] to return * [rating-table-01.csv].[rating_description] */ Lookup lookup = new DataReaderLookup( new CSVReader(new File("example/data/input/rating-table-01.csv")) .setFieldNamesInFirstRow(true), new FieldList("rating_code"), new FieldList("rating_description")); MemoryWriter discardWriter = new MemoryWriter(); reader = new TransformingReader(reader, discardWriter) .add(new DiscardLookupTransformer(new FieldList("Rating"), lookup) .setAllowNoResults(true)); Job.run(reader, new StreamWriter(System.out)); System.out.println("=================================================================================="); System.out.println("Not Joined Records: " + discardWriter.getRecordList()); } private static class DiscardLookupTransformer extends LookupTransformer { public DiscardLookupTransformer(FieldList fields, Lookup lookup) { super(fields, lookup); } @Override protected RecordList noResults(Record originalRecord, List> arguments) { if (isAllowNoResults()) { getReader().getDiscardWriter().write(originalRecord); } return super.noResults(originalRecord, arguments); } } }
Code walkthrough
- Two CSVReader are created using the file path
credit-balance.csv
andrating-table-01.csv
respectively. - DataReaderLookup accepts three argument i.e. reader that contains the input file, parameterFields which is the fields you want to match and resultFields which contains the result.
- MemoryWriter is used to write records to an in-memory RecordList.
- TransformingReader is a proxy that applies transformations to records passing through. it accepts the DataReader to transform and the DataWriter to send discarded records to.
- Data is transferred from the reader to StreamWriter via Job.run() method.
- DiscardLookupTransformer is used to write the data to a record.
Output
----------------------------------------------- 0 - Record { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:STRING=[9315.45]:String 4:[CreditLimit]:STRING=[10000]:String 5:[AccountCreated]:STRING=[17-01-1998]:String 6:[Rating]:STRING=[A]:String 7:[rating_description]:STRING=[Class A]:String } ----------------------------------------------- 1 - Record { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:STRING=[90]:String 4:[CreditLimit]:STRING=[1000]:String 5:[AccountCreated]:STRING=[06-08-2003]:String 6:[Rating]:STRING=[B]:String 7:[rating_description]:STRING=[Class B]:String } ----------------------------------------------- 2 - Record { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:STRING=[0]:String 4:[CreditLimit]:STRING=[17000]:String 5:[AccountCreated]:STRING=[25-05-1985]:String 6:[Rating]:STRING=[B]:String 7:[rating_description]:STRING=[Class B]:String } ----------------------------------------------- 3 - Record { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:STRING=[49654.87]:String 4:[CreditLimit]:STRING=[100000]:String 5:[AccountCreated]:STRING=[05-12-2006]:String 6:[Rating]:STRING=[A]:String 7:[rating_description]:STRING=[Class A]:String } ----------------------------------------------- 4 - Record { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:STRING=[789.65]:String 4:[CreditLimit]:STRING=[5000]:String 5:[AccountCreated]:STRING=[05-02-2007]:String 6:[Rating]:STRING=[C]:String 7:[rating_description]:STRING=[Class C]:String } ----------------------------------------------- 5 - Record { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:STRING=[1]:String 4:[CreditLimit]:STRING=[5000]:String 5:[AccountCreated]:STRING=[05-02-2007]:String 6:[Rating]:STRING=[D]:String } ----------------------------------------------- 6 records 14:17:47,247 DEBUG [main] datapipeline:661 - job::Success ================================================================================== Not Joined Records: RecordList [records=[Record { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:STRING=[1]:String 4:[CreditLimit]:STRING=[5000]:String 5:[AccountCreated]:STRING=[05-02-2007]:String 6:[Rating]:STRING=[D]:String } ]]