How to read data in parallel using AsyncMultiReader

How to read data in parallel using AsyncMultiReader

Data Pipeline now includes a new AsyncMultiReader endpoint that lets you read from multiple DataReaders in parallel.  Here’s how it works.

Behind the scenes, AsyncMultiReader uses separate threads to load records from each data source into its internal buffer.  It takes care of all the threading issues — blocking, failures, and shutdown — to keep your code simple.

Quick Overview

AsyncMultiReader is very easy to use in your own applications if you’re already familiar with how Data Pipeline works.  If you’re just beginning, a good place to start is the how to compile and run a job example.

Your first step is to create an instance of AsyncMultiReader, passing in your source readers, or you can use its add(DataReader…) method after construction.  After that, it’s just like using any other DataReader to transfer or migrate data.  You can add operators on top of the reader and then create the target DataWriter and run the transfer.

Slow Reader Example

Here’s a longer example showing how to use AsyncMultiReader to speed up data transfers.

A good place to use this reader is when your incoming data source is slower than your target, but the incoming allows multiple connections concurrently.  We’ll create an intentionally slow, fake DataReader to simulate this behaviour.

Our custom SlowReader class accepts several parameters to determine its behaviour:

  • name — the unique name added to each record.  Helps us to distinguish records coming from different slow readers.
  • elapsedTime — the length of time this instance should run before terminating.
  • recordDelay — the length of time to wait before returning a new record.
  • fail —  indicates if this instance should terminate by throwing an exception or just end normally.

You might have noticed an overflow field in the record.  This is to help flush any I/O buffering of the output that might stop us from seeing output in real-time.  You definitely don’t want to do this in real life.

Creating a custom DataReader

Custom DataReaders are very easy to create.  Just extend the DataReader class and implement its readImpl() method.  In most cases, you’ll also want to implement the open() and close() methods, but we can skip those in this simple example.

You’ll also want to implement the addExceptionProperties method to help with exception reporting. This method uses dynamic exception fields to expose the internal state when things go wrong.  The method is called by the framework whenever this reader uses one of its overloaded exception() methods.

Putting it all together

Our final bit of code wraps 5 slow readers in an AsyncMultiReader and writes them all as CSV to the console.  This example will fail with an exception in yellow or yellow2, but you can play around with the parameters to see what happens.

Where do exceptions go?

This brings us to in important question.  If AsyncMultiReader uses separate threads to load data, what happens to exceptions thrown in those threads?

The answer is it depends.  By default, exceptions thrown in any reader thread is re-thrown by AsyncMultiReader the next time its read() method is called.  This stops the main transfer, gracefully terminates the other reader threads, and closes their data sources.

You can disable the behaviour by calling AsyncMultiReader.setFailOnException(false) before the transfer starts.  With this flag disabled, the transfer will continue to run and end successfully, even if one or all slow readers fail.

In either scenario, you can override the AsyncMultiReader.onTheadFinished(ReaderThread) callback to be notified on the reader thread when they end.  If ReaderThread.getException() returns null, it means that thread ended normally.  Whatever you do, make sure to call super.onTheadFinished() to make sure everything gets cleaned up properly.


Happy coding!

About Dele Taylor

We make Data Pipeline — a lightweight ETL framework for Java. Use it to filter, transform, and aggregate data on-the-fly in your web, mobile, and desktop apps. Learn more about it at

One thought on “How to read data in parallel using AsyncMultiReader

Leave a Reply

Your email address will not be published. Required fields are marked *
You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">