Use Multi Threading in a Single Job
This example shows how to incorporate multi-threading capabilities within a single program. It simplifies the implementation of concurrent execution, allowing you to perform multiple tasks simultaneously, improving overall performance and efficiency.
For demo purposes, this example reads a CSV file via AsyncReader and writes its contents to a new CSV file. However, the AsyncReader can be wrapped around other input types as well.
AsyncReader
AsyncReader handles all its own threading. Normally, when something like JdbcReader.read()
or CSVReader.read()
are called, the program waits while the data is read from the database or file. Adding an AsyncReader on top of them allows the reads to proactively happen in a separate thread and the data stored into a buffer until AsyncReader.read method is called. The setMaxBufferSizeInBytes method is used to limit the size of that buffer.
Input CSV file
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import org.apache.log4j.Logger; import com.northconcepts.datapipeline.core.AsyncReader; import com.northconcepts.datapipeline.core.DataEndpoint; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.job.Job; public class UseMultiThreadingInASingleJob { public static final Logger log = DataEndpoint.log; public static void main(String[] args) throws Throwable { DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv")) .setFieldNamesInFirstRow(true); DataWriter writer = new CSVWriter(new File("example/data/output/credit-balance-04.csv")) .setFieldNamesInFirstRow(true); // buffers up to 10 MB using another thread; // asyncReader.read() will pull from this buffer AsyncReader asyncReader = new AsyncReader(reader) .setMaxBufferSizeInBytes(1024 * 1024 * 10); Job.run(asyncReader, writer); log.info("PeakBufferSize: " + asyncReader.getPeakBufferSizeInBytes()); } }
Code Walkthrough
- A CSVReader is created corresponding to the input file
credit-balance-01.csv
. - A CSVWriter is created corresponding to the output CSV file
credit-balance-04.csv
. - A new AsyncReader is created using the CSVReader.
- The setMaxBufferSizeInBytes method is invoked to set the size of the AsyncReader's internal buffer. In the given example, it is set to 10 MB.
- Data is transferred from the input CSV file to the output CSV file via JobTemplate.DEFAULT.transfer.
- The getPeakBufferSizeInBytes method is used after the transfer to illustrate how much of the buffer was used.
Console Output
13:35:45,363 DEBUG [main] datapipeline:60 - job::Start 13:35:45,375 DEBUG [main] datapipeline:72 - job::Success 13:35:45,376 INFO [main] datapipeline:40 - PeakBufferSize: 558
Output CSV file
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C