Read and Transform Data in Parallel

Updated: Aug 10, 2023

This example shows how to process large datasets using multiple threads to read and transform data concurrently. This approach reduces processing time and enhances overall performance when dealing with massive amounts of data.

This can be used for big data analytics, ETL (Extract, Transform, Load) processes, and data preprocessing tasks. It enables you to read and transform data in parallel, leading to faster data processing and analysis, especially in scenarios where traditional single-threaded approaches would be time-consuming and resource-intensive.

 

Input CSV File

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,,,1998-1-17,A
312,Butler,Gerard,90.00,,2003-8-6,B
101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
...

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.AsyncTaskReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.SortingReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.DataReaderDecorator;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.transform.BasicFieldTransformer;
import com.northconcepts.datapipeline.transform.RenameField;
import com.northconcepts.datapipeline.transform.TransformingReader;

public class ReadAndTransformDataInParallel {

    private static final int THREADS = 4;

    public static void main(String[] args) {

        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02-1000000.csv"))
                .setFieldNamesInFirstRow(true);

        reader = new AsyncTaskReader(reader, createTaskToRunInMultipleThreads(), THREADS);

        Job.run(new SortingReader(reader).add("Balance"), StreamWriter.newSystemOutWriter());
    }

    private static DataReaderDecorator createTaskToRunInMultipleThreads() {
        return (originalDataReader) -> new TransformingReader(originalDataReader)
                .add(new RenameField("Rating", "AccountRating"))
                .add(new BasicFieldTransformer("Balance").stringToDouble().nullToValue(0.0));
    }

}


 

Code Walkthrough

  1. CSVReader is created corresponding to the input file credit-balance-02-1000000.csv.
  2. The setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  3. Then, AsyncTaskReader is a proxy that uses multiple threads (i.e. four) to process incoming data, where the work applied to the incoming data is specified by DataReaderDecorator.
  4. createTaskToRunInMultipleThreads() method returns DataReaderDecorator object which specifies different actions for data. In the given example, "Rating" column is renamed, and "Balance" is parsed from string to double value.
  5. Next, SortingReader is used to sort the incoming data based on specified fields in an ascending order.
  6. Finally, Job.run() is used to transfer the data from the SortingReader to StreamWriter.newSystemOutWriter(). See how to compile and run data pipeline jobs.

 

Console Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:DOUBLE=[0.0]:Double
    4:[CreditLimit]:STRING=[17000.00]:String
    5:[AccountCreated]:STRING=[1985-5-25]:String
    6:[AccountRating]:STRING=[B]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:DOUBLE=[0.0]:Double
    4:[CreditLimit]:STRING=[null]
    5:[AccountCreated]:STRING=[1998-1-17]:String
    6:[AccountRating]:STRING=[A]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:DOUBLE=[90.0]:Double
    4:[CreditLimit]:STRING=[null]
    5:[AccountCreated]:STRING=[2003-8-6]:String
    6:[AccountRating]:STRING=[B]:String
}
...
Mobile Analytics