Read A CSV File using Multithreading

Updated: Aug 28, 2023

This example shows how to read CSV files using multithreading in Java. This can be used when rapid data extraction and transformation from CSV files are essential, such as in ETL pipelines, data warehousing, and real-time analytics.

 

Input CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import com.northconcepts.datapipeline.core.AsyncReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.examples.database.DB;
import com.northconcepts.datapipeline.jdbc.JdbcWriter;
import com.northconcepts.datapipeline.job.Job;

import java.io.File;
import java.sql.Connection;

public class ReadACsvFileUsingMultithreading {
    public static void main(String[] args) throws Throwable {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
                .setFieldNamesInFirstRow(true);
        reader = new AsyncReader(reader).setMaxBufferSizeInBytes(1024 * 1024 * 10);

        DB db = new DB();
        db.execute("CREATE TABLE credit_balance (" +
                " account INT NOT NULL," +
                " lastname VARCHAR(32)," +
                " firstname VARCHAR(32)," +
                " balance DECIMAL," +
                " creditlimit DECIMAL," +
                " accountcreated  VARCHAR(12)," +
                " rating VARCHAR(5)," +
                " PRIMARY KEY (account));");

        Connection connection = db.getConnection();
        DataWriter writer = new JdbcWriter(connection, "credit_balance");

        Job.run(reader, writer);
    }
}

 

Code Walkthrough

  1. CSVReader is created using the file path of the input file credit-balance-01.csv.
  2. A new AsyncReader is created using the CSVReader.
  3. The setMaxBufferSizeInBytes method is invoked to set the size of the AsyncReader's internal buffer. In the given example, it is set to 10 MB.
  4. The DB instance is created which will initiate HSQL DB.
  5. Query to create credit_balance table is executed.
  6. Connection instance to the database is created and called inside Jdbcwriter.
  7. Job.run(reader, writer) is used to transfer the data from AsyncReader to JdbcWriter. See how to compile and run data pipeline jobs. 

 

CSVReader

CSVReader is an input reader which can be used to read CSV files. It is a sub-class of TextReader and inherits the open and close among other methods. The CSVReader.setFieldNamesInFirstRow(true) method causes the CSVReader to use the names specified in the first row of the input data as field names. If this method is not invoked, the fields would be named as A1, A2, etc. similar to MS Excel. If those fields' names need to be changed, a rename transformation can be added on top of CSVReader or any other type (Refer Rename a field for example).

 

AsyncReader

AsyncReader handles all its own threading. Normally, when something like JdbcReader.read() or CSVReader.read() are called, the program waits while the data is read from the database or file. Adding an AsyncReader on top of them allows the reads to proactively happen in a separate thread and the data stored into a buffer until AsyncReader.read method is called. The setMaxBufferSizeInBytes method is used to limit the size of that buffer.

Mobile Analytics