Lookup Using RecordList

In data processing, it's common to need to enrich a primary data stream with information from a secondary data source. This is analogous to a JOIN operation in a database. Data Pipeline provides a flexible way to perform these lookups using transformers.

This example demonstrates how to enrich a stream of sales orders with product details. It uses a LookupTransformer in conjunction with a RecordListLookup to merge product names and prices into each sales order record based on a matching product ID. The lookup data (product details) is held in an in-memory RecordList.

Java Code

package com.northconcepts.datapipeline.examples.cookbook;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.FieldList;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.transform.lookup.LookupTransformer;
import com.northconcepts.datapipeline.transform.lookup.RecordListLookup;

public class LookupUsingRecordList {

    public static void main(String[] args) {
        RecordList salesOrders = new RecordList();
        salesOrders.add(new Record().setField("order_id", 101).setField("product_id", 1).setField("quantity", 2));
        salesOrders.add(new Record().setField("order_id", 102).setField("product_id", 3).setField("quantity", 5));
        salesOrders.add(new Record().setField("order_id", 103).setField("product_id", 1).setField("quantity", 1));
        salesOrders.add(new Record().setField("order_id", 104).setField("product_id", 99).setField("quantity", 10)); // No matching product

        DataReader reader = new MemoryReader(salesOrders);

        RecordList productData = new RecordList();
        productData.add(new Record().setField("id", 1).setField("name", "Laptop").setField("price", 1200.00));
        productData.add(new Record().setField("id", 2).setField("name", "Mouse").setField("price", 25.50));
        productData.add(new Record().setField("id", 3).setField("name", "Keyboard").setField("price", 75.00));

        FieldList keyFieldsInLookup = new FieldList("id");
        FieldList valueFieldsToReturn = new FieldList("name", "price");
        RecordListLookup productLookup = new RecordListLookup(productData, keyFieldsInLookup, valueFieldsToReturn);

        FieldList keyFieldsInStream = new FieldList("product_id");
        LookupTransformer lookupTransformer = new LookupTransformer(keyFieldsInStream, productLookup);
        lookupTransformer.setAllowNoResults(true); // Prevent job failure on no match.

        DataReader transformingReader = new TransformingReader(reader).add(lookupTransformer);

        DataWriter writer = new StreamWriter(System.out);

        Job.run(transformingReader, writer);
    }
}

Code Walkthrough

  1. Two in-memory datasets are created using RecordList: salesOrders (the primary data stream) and productData (the lookup data).
  2. A MemoryReader is created to read records from the salesOrders list.
  3. A RecordListLookup is configured. It's initialized with the productData. It is instructed to use the "id" field as the lookup key and to return the "name" and "price" fields when a match is found. Internally, it builds a hash map for efficient lookups.
  4. A LookupTransformer is created. It's configured to use the "product_id" field from each incoming record as the key to search for in the productLookup.
  5. The method setAllowNoResults(true) is called on the transformer. This is important as it prevents the job from failing if a product_id from the sales orders (like 99 in the example) does not exist in the product data. The record will simply pass through without being enriched.
  6. The MemoryReader is wrapped in a TransformingReader, and the lookupTransformer is added to it.
  7. Finally, Job.run() executes the pipeline, passing the transformed records to a StreamWriter which prints them to the console.

Console Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[order_id]:INT=[101]:Integer
    1:[product_id]:INT=[1]:Integer
    2:[quantity]:INT=[2]:Integer
    3:[name]:STRING=[Laptop]:String
    4:[price]:DOUBLE=[1200.0]:Double
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[order_id]:INT=[102]:Integer
    1:[product_id]:INT=[3]:Integer
    2:[quantity]:INT=[5]:Integer
    3:[name]:STRING=[Keyboard]:String
    4:[price]:DOUBLE=[75.0]:Double
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[order_id]:INT=[103]:Integer
    1:[product_id]:INT=[1]:Integer
    2:[quantity]:INT=[1]:Integer
    3:[name]:STRING=[Laptop]:String
    4:[price]:DOUBLE=[1200.0]:Double
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[order_id]:INT=[104]:Integer
    1:[product_id]:INT=[99]:Integer
    2:[quantity]:INT=[10]:Integer
}

-----------------------------------------------
4 records
Mobile Analytics