Examples >

Use Multi Threading in a Single Job

This example shows you how to incorporate multi-threading in a single program using the AsyncReader class.

For demo purpose, this example reads a CSV file via AsyncReader and writes its contents to a new CSV file. However, the AsyncReader can be wrapped around other input types as well.

Input CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

Java Code Listing

/*
 * Copyright (c) 2006-2018 North Concepts Inc.  All rights reserved.
 * Proprietary and Confidential.  Use is subject to license terms.
 *
 * http://northconcepts.com/data-pipeline/licensing/
 *
 */
package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import org.apache.log4j.Logger;

import com.northconcepts.datapipeline.core.AsyncReader;
import com.northconcepts.datapipeline.core.DataEndpoint;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.job.Job;

public class UseMultiThreadingInASingleJob {
    
    public static final Logger log = DataEndpoint.log; 

    public static void main(String[] args) throws Throwable {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
            .setFieldNamesInFirstRow(true);

        DataWriter writer = new CSVWriter(new File("example/data/output/credit-balance-04.csv"))
            .setFieldNamesInFirstRow(true);

        // buffers up to 10 MB using another thread;
        // asyncReader.read() will pull from this buffer
        AsyncReader asyncReader = new AsyncReader(reader)
            .setMaxBufferSizeInBytes(1024 * 1024 * 10);
    
        Job.run(asyncReader, writer);
        
        log.info("PeakBufferSize: " + asyncReader.getPeakBufferSizeInBytes());
    }

}

Code Walkthrough

  1. A CSVReader is created corresponding to the input file credit-balance-01.csv.
  2. A CSVWriter is created corresponding to the output CSV file credit-balance-04.csv.
  3. A new AsyncReader is created using the CSVReader.
  4. The setMaxBufferSizeInBytes method is invoked to set the size of the AsyncReader's internal buffer.
  5. Data is transferred from the input CSV file to the output CSV file via JobTemplate.DEFAULT.transfer.
  6. The getPeakBufferSizeInBytes method is used after the transfer to illustrate how much of the buffer was actually used.

AsyncReader

AsyncReader handles all its own threading. Normally, when something like JdbcReader.read() or CSVReader.read() are called, the program waits while the data is actually read from the database or file. Adding an AsyncReader on top of them allows the reads to proactively happen in a separate thread and the data stored into a buffer until AsyncReader.read method is called. The setMaxBufferSizeInBytes method is used to limit the size of that buffer.

Console Output

13:35:45,363 DEBUG [main] datapipeline:60 - job::Start
13:35:45,375 DEBUG [main] datapipeline:72 - job::Success
13:35:45,376  INFO [main] datapipeline:40 - PeakBufferSize: 558

Output CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C
Mobile Analytics