Use Multi Threading in a Single Job

Updated: Aug 10, 2023

This example shows how to incorporate multi-threading capabilities within a single program. It simplifies the implementation of concurrent execution, allowing you to perform multiple tasks simultaneously, improving overall performance and efficiency.

For demo purposes, this example reads a CSV file via AsyncReader and writes its contents to a new CSV file. However, the AsyncReader can be wrapped around other input types as well.

 

AsyncReader

AsyncReader handles all its own threading. Normally, when something like JdbcReader.read() or CSVReader.read() are called, the program waits while the data is read from the database or file. Adding an AsyncReader on top of them allows the reads to proactively happen in a separate thread and the data stored into a buffer until AsyncReader.read method is called. The setMaxBufferSizeInBytes method is used to limit the size of that buffer.

 

Input CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import org.apache.log4j.Logger;

import com.northconcepts.datapipeline.core.AsyncReader;
import com.northconcepts.datapipeline.core.DataEndpoint;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.job.Job;

public class UseMultiThreadingInASingleJob {
    
    public static final Logger log = DataEndpoint.log; 

    public static void main(String[] args) throws Throwable {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
            .setFieldNamesInFirstRow(true);

        DataWriter writer = new CSVWriter(new File("example/data/output/credit-balance-04.csv"))
            .setFieldNamesInFirstRow(true);

        // buffers up to 10 MB using another thread;
        // asyncReader.read() will pull from this buffer
        AsyncReader asyncReader = new AsyncReader(reader)
            .setMaxBufferSizeInBytes(1024 * 1024 * 10);
    
        Job.run(asyncReader, writer);
        
        log.info("PeakBufferSize: " + asyncReader.getPeakBufferSizeInBytes());
    }

}

 

Code Walkthrough

  1. A CSVReader is created corresponding to the input file credit-balance-01.csv.
  2. A CSVWriter is created corresponding to the output CSV file credit-balance-04.csv.
  3. A new AsyncReader is created using the CSVReader.
  4. The setMaxBufferSizeInBytes method is invoked to set the size of the AsyncReader's internal buffer. In the given example, it is set to 10 MB.
  5. Data is transferred from the input CSV file to the output CSV file via JobTemplate.DEFAULT.transfer.
  6. The getPeakBufferSizeInBytes method is used after the transfer to illustrate how much of the buffer was used.

 

Console Output

13:35:45,363 DEBUG [main] datapipeline:60 - job::Start
13:35:45,375 DEBUG [main] datapipeline:72 - job::Success
13:35:45,376  INFO [main] datapipeline:40 - PeakBufferSize: 558

 

Output CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C
Mobile Analytics