Lookup Using RecordList
Updated: Nov 22, 2025
In data processing, it's common to need to enrich a primary data stream with information from a secondary data source. This is analogous to a JOIN operation in a database. Data Pipeline provides a flexible way to perform these lookups using transformers.
This example demonstrates how to enrich a stream of sales orders with product details. It uses a LookupTransformer in conjunction with a RecordListLookup to merge product names and prices into each sales order record based on a matching product ID. The lookup data (product details) is held in an in-memory RecordList.
Java Code
package com.northconcepts.datapipeline.examples.cookbook;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.FieldList;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.transform.lookup.LookupTransformer;
import com.northconcepts.datapipeline.transform.lookup.RecordListLookup;
public class LookupUsingRecordList {
public static void main(String[] args) {
RecordList salesOrders = new RecordList();
salesOrders.add(new Record().setField("order_id", 101).setField("product_id", 1).setField("quantity", 2));
salesOrders.add(new Record().setField("order_id", 102).setField("product_id", 3).setField("quantity", 5));
salesOrders.add(new Record().setField("order_id", 103).setField("product_id", 1).setField("quantity", 1));
salesOrders.add(new Record().setField("order_id", 104).setField("product_id", 99).setField("quantity", 10)); // No matching product
DataReader reader = new MemoryReader(salesOrders);
RecordList productData = new RecordList();
productData.add(new Record().setField("id", 1).setField("name", "Laptop").setField("price", 1200.00));
productData.add(new Record().setField("id", 2).setField("name", "Mouse").setField("price", 25.50));
productData.add(new Record().setField("id", 3).setField("name", "Keyboard").setField("price", 75.00));
FieldList keyFieldsInLookup = new FieldList("id");
FieldList valueFieldsToReturn = new FieldList("name", "price");
RecordListLookup productLookup = new RecordListLookup(productData, keyFieldsInLookup, valueFieldsToReturn);
FieldList keyFieldsInStream = new FieldList("product_id");
LookupTransformer lookupTransformer = new LookupTransformer(keyFieldsInStream, productLookup);
lookupTransformer.setAllowNoResults(true); // Prevent job failure on no match.
DataReader transformingReader = new TransformingReader(reader).add(lookupTransformer);
DataWriter writer = new StreamWriter(System.out);
Job.run(transformingReader, writer);
}
}
Code Walkthrough
- Two in-memory datasets are created using
RecordList:salesOrders(the primary data stream) andproductData(the lookup data). - A MemoryReader is created to read records from the
salesOrderslist. - A RecordListLookup is configured. It's initialized with the
productData. It is instructed to use the "id" field as the lookup key and to return the "name" and "price" fields when a match is found. Internally, it builds a hash map for efficient lookups. - A LookupTransformer is created. It's configured to use the "product_id" field from each incoming record as the key to search for in the
productLookup. - The method
setAllowNoResults(true)is called on the transformer. This is important as it prevents the job from failing if aproduct_idfrom the sales orders (like99in the example) does not exist in the product data. The record will simply pass through without being enriched. - The
MemoryReaderis wrapped in a TransformingReader, and thelookupTransformeris added to it. - Finally, Job.run() executes the pipeline, passing the transformed records to a StreamWriter which prints them to the console.
Console Output
-----------------------------------------------
0 - Record (MODIFIED) {
0:[order_id]:INT=[101]:Integer
1:[product_id]:INT=[1]:Integer
2:[quantity]:INT=[2]:Integer
3:[name]:STRING=[Laptop]:String
4:[price]:DOUBLE=[1200.0]:Double
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[order_id]:INT=[102]:Integer
1:[product_id]:INT=[3]:Integer
2:[quantity]:INT=[5]:Integer
3:[name]:STRING=[Keyboard]:String
4:[price]:DOUBLE=[75.0]:Double
}
-----------------------------------------------
2 - Record (MODIFIED) {
0:[order_id]:INT=[103]:Integer
1:[product_id]:INT=[1]:Integer
2:[quantity]:INT=[1]:Integer
3:[name]:STRING=[Laptop]:String
4:[price]:DOUBLE=[1200.0]:Double
}
-----------------------------------------------
3 - Record (MODIFIED) {
0:[order_id]:INT=[104]:Integer
1:[product_id]:INT=[99]:Integer
2:[quantity]:INT=[10]:Integer
}
-----------------------------------------------
4 records
