Read and Transform Data in Parallel
Updated: Aug 10, 2023
This example shows how to process large datasets using multiple threads to read and transform data concurrently. This approach reduces processing time and enhances overall performance when dealing with massive amounts of data.
This can be used for big data analytics, ETL (Extract, Transform, Load) processes, and data preprocessing tasks. It enables you to read and transform data in parallel, leading to faster data processing and analysis, especially in scenarios where traditional single-threaded approaches would be time-consuming and resource-intensive.
Input CSV File
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,,,1998-1-17,A 312,Butler,Gerard,90.00,,2003-8-6,B 101,Hewitt,Jennifer Love,0,17000.00,1985-5-25,B
...
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.AsyncTaskReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.SortingReader; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.DataReaderDecorator; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.RenameField; import com.northconcepts.datapipeline.transform.TransformingReader; public class ReadAndTransformDataInParallel { private static final int THREADS = 4; public static void main(String[] args) { DataReader reader = new CSVReader(new File("example/data/input/credit-balance-02-1000000.csv")) .setFieldNamesInFirstRow(true); reader = new AsyncTaskReader(reader, createTaskToRunInMultipleThreads(), THREADS); Job.run(new SortingReader(reader).add("Balance"), StreamWriter.newSystemOutWriter()); } private static DataReaderDecorator createTaskToRunInMultipleThreads() { return (originalDataReader) -> new TransformingReader(originalDataReader) .add(new RenameField("Rating", "AccountRating")) .add(new BasicFieldTransformer("Balance").stringToDouble().nullToValue(0.0)); } }
Code Walkthrough
- CSVReader is created corresponding to the input file
credit-balance-02-1000000.csv
. - The
setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. - Then, AsyncTaskReader is a proxy that uses multiple threads (i.e. four) to process incoming data, where the work applied to the incoming data is specified by DataReaderDecorator.
createTaskToRunInMultipleThreads()
method returns DataReaderDecorator object which specifies different actions for data. In the given example, "Rating" column is renamed, and "Balance" is parsed from string to double value.- Next, SortingReader is used to sort the incoming data based on specified fields in an ascending order.
- Finally, Job.run() is used to transfer the data from the
SortingReader
toStreamWriter.newSystemOutWriter()
. See how to compile and run data pipeline jobs.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:DOUBLE=[0.0]:Double 4:[CreditLimit]:STRING=[17000.00]:String 5:[AccountCreated]:STRING=[1985-5-25]:String 6:[AccountRating]:STRING=[B]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:DOUBLE=[0.0]:Double 4:[CreditLimit]:STRING=[null] 5:[AccountCreated]:STRING=[1998-1-17]:String 6:[AccountRating]:STRING=[A]:String } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:DOUBLE=[90.0]:Double 4:[CreditLimit]:STRING=[null] 5:[AccountCreated]:STRING=[2003-8-6]:String 6:[AccountRating]:STRING=[B]:String } ...