Capture Data Not Joined in a Lookup
Updated: May 30, 2022
For this example we are going to see how we can use DataPipline to capture data that is not joined in a Lookup.
Lookup mainly searches for data in another source to merge with each record passing through this lookup's transformer -- the streaming version of a join.
Lookup is similar to how SQL queries work where you have, for exampe, an id of a product and you would like to get all data that is associated with that id.
Input CSV file
credit-balance.csv
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000,17-01-1998,A 312,Butler,Gerard,90,1000,06-08-2003,B 101,Hewitt,Jennifer Love,0,17000,25-05-1985,B 312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A 317,Murray,Bill,789.65,5000,05-02-2007,C 317,Murray,Bill,1,5000,05-02-2007,D
rating-table-01.csv
rating_code,rating_description A,Class A B,Class B C,Class C
Java code listing
package com.northconcepts.datapipeline.examples.cookbook;
import java.io.File;
import java.util.List;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.FieldList;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryWriter;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.transform.lookup.DataReaderLookup;
import com.northconcepts.datapipeline.transform.lookup.Lookup;
import com.northconcepts.datapipeline.transform.lookup.LookupTransformer;
public class CaptureDataNotJoinedInLookup {
public static void main(String[] args) {
DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv"))
.setFieldNamesInFirstRow(true);
/*
* This lookup matches [credit-balance-01.csv].[Rating] to
* [rating-table-01.csv].[rating_code] to return
* [rating-table-01.csv].[rating_description]
*/
Lookup lookup = new DataReaderLookup(
new CSVReader(new File("example/data/input/rating-table-01.csv"))
.setFieldNamesInFirstRow(true),
new FieldList("rating_code"),
new FieldList("rating_description"));
MemoryWriter discardWriter = new MemoryWriter();
reader = new TransformingReader(reader, discardWriter)
.add(new DiscardLookupTransformer(new FieldList("Rating"), lookup)
.setAllowNoResults(true));
Job.run(reader, new StreamWriter(System.out));
System.out.println("==================================================================================");
System.out.println("Not Joined Records: " + discardWriter.getRecordList());
}
private static class DiscardLookupTransformer extends LookupTransformer {
public DiscardLookupTransformer(FieldList fields, Lookup lookup) {
super(fields, lookup);
}
@Override
protected RecordList noResults(Record originalRecord, List> arguments) {
if (isAllowNoResults()) {
getReader().getDiscardWriter().write(originalRecord);
}
return super.noResults(originalRecord, arguments);
}
}
}
Code walkthrough
- Two CSVReader are created using the file path
credit-balance.csvandrating-table-01.csvrespectively. - DataReaderLookup accepts three argument i.e. reader that contains the input file, parameterFields which is the fields you want to match and resultFields which contains the result.
- MemoryWriter is used to write records to an in-memory RecordList.
- TransformingReader is a proxy that applies transformations to records passing through. it accepts the DataReader to transform and the DataWriter to send discarded records to.
- Data is transferred from the reader to StreamWriter via Job.run() method.
- DiscardLookupTransformer is used to write the data to a record.
Output
-----------------------------------------------
0 - Record {
0:[Account]:STRING=[101]:String
1:[LastName]:STRING=[Reeves]:String
2:[FirstName]:STRING=[Keanu]:String
3:[Balance]:STRING=[9315.45]:String
4:[CreditLimit]:STRING=[10000]:String
5:[AccountCreated]:STRING=[17-01-1998]:String
6:[Rating]:STRING=[A]:String
7:[rating_description]:STRING=[Class A]:String
}
-----------------------------------------------
1 - Record {
0:[Account]:STRING=[312]:String
1:[LastName]:STRING=[Butler]:String
2:[FirstName]:STRING=[Gerard]:String
3:[Balance]:STRING=[90]:String
4:[CreditLimit]:STRING=[1000]:String
5:[AccountCreated]:STRING=[06-08-2003]:String
6:[Rating]:STRING=[B]:String
7:[rating_description]:STRING=[Class B]:String
}
-----------------------------------------------
2 - Record {
0:[Account]:STRING=[101]:String
1:[LastName]:STRING=[Hewitt]:String
2:[FirstName]:STRING=[Jennifer Love]:String
3:[Balance]:STRING=[0]:String
4:[CreditLimit]:STRING=[17000]:String
5:[AccountCreated]:STRING=[25-05-1985]:String
6:[Rating]:STRING=[B]:String
7:[rating_description]:STRING=[Class B]:String
}
-----------------------------------------------
3 - Record {
0:[Account]:STRING=[312]:String
1:[LastName]:STRING=[Pinkett-Smith]:String
2:[FirstName]:STRING=[Jada]:String
3:[Balance]:STRING=[49654.87]:String
4:[CreditLimit]:STRING=[100000]:String
5:[AccountCreated]:STRING=[05-12-2006]:String
6:[Rating]:STRING=[A]:String
7:[rating_description]:STRING=[Class A]:String
}
-----------------------------------------------
4 - Record {
0:[Account]:STRING=[317]:String
1:[LastName]:STRING=[Murray]:String
2:[FirstName]:STRING=[Bill]:String
3:[Balance]:STRING=[789.65]:String
4:[CreditLimit]:STRING=[5000]:String
5:[AccountCreated]:STRING=[05-02-2007]:String
6:[Rating]:STRING=[C]:String
7:[rating_description]:STRING=[Class C]:String
}
-----------------------------------------------
5 - Record {
0:[Account]:STRING=[317]:String
1:[LastName]:STRING=[Murray]:String
2:[FirstName]:STRING=[Bill]:String
3:[Balance]:STRING=[1]:String
4:[CreditLimit]:STRING=[5000]:String
5:[AccountCreated]:STRING=[05-02-2007]:String
6:[Rating]:STRING=[D]:String
}
-----------------------------------------------
6 records
14:17:47,247 DEBUG [main] datapipeline:661 - job::Success
==================================================================================
Not Joined Records: RecordList [records=[Record {
0:[Account]:STRING=[317]:String
1:[LastName]:STRING=[Murray]:String
2:[FirstName]:STRING=[Bill]:String
3:[Balance]:STRING=[1]:String
4:[CreditLimit]:STRING=[5000]:String
5:[AccountCreated]:STRING=[05-02-2007]:String
6:[Rating]:STRING=[D]:String
}
]]
