Use Functions In Transformers

Data Pipeline supports transforming records using either built-in transformers or your own custom logic.

This example demonstrates how to use a Java lambda Function to create a user-defined transformer that modifies records on the fly while reading from a CSV file and writing them to the console. The transformation is applied using TransformingReader, which processes and updates each record as it passes through.

Input CSV File

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

Java Code

package com.northconcepts.datapipeline.examples.cookbook;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.transform.Transformer;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.core.Field;

import java.io.File;

public class UseFunctionsInTransformers {

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
                .setFieldNamesInFirstRow(true);

        reader = new TransformingReader(reader)
                .add(Transformer.of(record -> {
                    Field creditLimit = record.getField("CreditLimit");
                    // increase everyone's limit by 10%
                    double newValue = Double.parseDouble(creditLimit.getValueAsString()) * 1.10;
                    creditLimit.setValue(newValue);
                    return true;
                }));

        DataWriter writer = new StreamWriter(System.out);

        Job.run(reader, writer);
    }
}

Code Walkthrough

  1. CSVReader is created using the file path of the input file credit-balance-01.csv.
  2. The transformer is created using Transformer.of(). This function retrieves the CreditLimit field, increases its value by 10%, and writes the new value back to the record.
  3. A new TransformingReader is created using the CSVReader. The custom transformer is added to it so each record is modified as it flows through the reader.
  4. Job.run(reader, writer) is used to transfer the data from TransformingReader to the console. See how to compile and run data pipeline jobs. 

Console Output

-----------------------------------------------
0 - Record {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:STRING=[9315.45]:String
    4:[CreditLimit]:STRING=[10000.00]:String
    5:[AccountCreated]:STRING=[1/17/1998]:String
    6:[Rating]:STRING=[A]:String
}

-----------------------------------------------
1 - Record {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:STRING=[49654.87]:String
    4:[CreditLimit]:STRING=[100000.00]:String
    5:[AccountCreated]:STRING=[12/5/2006]:String
    6:[Rating]:STRING=[A]:String
}

-----------------------------------------------
2 records

]]>
Mobile Analytics