Use Multi Threading in a Single Job
This example shows you how to incorporate multi-threading in a single program using the AsyncReader class.
For demo purpose, this example reads a CSV file via AsyncReader and writes its contents to a new CSV file. However, the AsyncReader can be wrapped around other input types as well.
Input CSV file
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C
Java Code Listing
/* * Copyright (c) 2006-2022 North Concepts Inc. All rights reserved. * Proprietary and Confidential. Use is subject to license terms. * * https://northconcepts.com/data-pipeline/licensing/ */ package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import org.apache.log4j.Logger; import com.northconcepts.datapipeline.core.AsyncReader; import com.northconcepts.datapipeline.core.DataEndpoint; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.job.Job; public class UseMultiThreadingInASingleJob { public static final Logger log = DataEndpoint.log; public static void main(String[] args) throws Throwable { DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv")) .setFieldNamesInFirstRow(true); DataWriter writer = new CSVWriter(new File("example/data/output/credit-balance-04.csv")) .setFieldNamesInFirstRow(true); // buffers up to 10 MB using another thread; // asyncReader.read() will pull from this buffer AsyncReader asyncReader = new AsyncReader(reader) .setMaxBufferSizeInBytes(1024 * 1024 * 10); Job.run(asyncReader, writer); log.info("PeakBufferSize: " + asyncReader.getPeakBufferSizeInBytes()); } }
Code Walkthrough
- A CSVReader is created corresponding to the input file
credit-balance-01.csv
. - A CSVWriter is created corresponding to the output CSV file
credit-balance-04.csv
. - A new AsyncReader is created using the CSVReader.
- The setMaxBufferSizeInBytes method is invoked to set the size of the AsyncReader's internal buffer.
- Data is transferred from the input CSV file to the output CSV file via JobTemplate.DEFAULT.transfer.
- The getPeakBufferSizeInBytes method is used after the transfer to illustrate how much of the buffer was actually used.
AsyncReader
AsyncReader handles all its own threading. Normally, when something
like JdbcReader.read()
or CSVReader.read()
are called, the program waits while the data is actually read from the database or file. Adding an
AsyncReader on top of them allows the reads to proactively happen in a
separate thread and the data stored into a buffer until AsyncReader.read
method is called. The setMaxBufferSizeInBytes
method is used to limit the size of that buffer.
Console Output
13:35:45,363 DEBUG [main] datapipeline:60 - job::Start 13:35:45,375 DEBUG [main] datapipeline:72 - job::Success 13:35:45,376 INFO [main] datapipeline:40 - PeakBufferSize: 558
Output CSV file
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A 312,Butler,Gerard,90.00,1000.00,8/6/2003,B 868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B 761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A 317,Murray,Bill,789.65,5000.00,2/5/2007,C