Data Pipeline now includes a new AsyncMultiReader endpoint that lets you read from multiple DataReaders in parallel. Here’s how it works.
Behind the scenes, AsyncMultiReader uses separate threads to load records from each data source into its internal buffer. It takes care of all the threading issues — blocking, failures, and shutdown — to keep your code simple.
Quick Overview
AsyncMultiReader is very easy to use in your own applications if you’re already familiar with how Data Pipeline works. If you’re just beginning, a good place to start is the how to compile and run a job example.
1 2 3 |
DataReader reader = new AsyncMultiReader(reader1, reader2, reader3); DataWriter writer = new CSVWriter(new FileWriter("multi.csv")); JobTemplate.DEFAULT.transfer(reader, writer); |
Your first step is to create an instance of AsyncMultiReader, passing in your source readers, or you can use its add(DataReader…) method after construction. After that, it’s just like using any other DataReader to transfer or migrate data. You can add operators on top of the reader and then create the target DataWriter and run the transfer.
Slow Reader Example
Here’s a longer example showing how to use AsyncMultiReader to speed up data transfers.
A good place to use this reader is when your incoming data source is slower than your target, but the incoming allows multiple connections concurrently. We’ll create an intentionally slow, fake DataReader to simulate this behaviour.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
public static class SlowReader extends DataReader { private final String name; private final long elapsedTime; private final long recordDelay; private final boolean fail; public SlowReader(String name, long elapsedTime, long recordDelay, boolean fail) { this.name = name; this.elapsedTime = elapsedTime; this.recordDelay = recordDelay; this.fail = fail; } @Override protected Record readImpl() throws Throwable { if (getOpenElapsedTime() >= elapsedTime) { if (fail) { throw exception("forced to fail"); } return null; } Thread.sleep(recordDelay); Record record = new Record(); record.setField("id", getRecordCount() + 1); record.setField("time", new Date()); record.setField("name", name + "-" + getRecordCount()); record.setField("overflow", Util.repeat("-", 8 * 1024)); // overflow I/O buffers, hopefully return record; } @Override public DataException addExceptionProperties(DataException exception) { exception.set("SlowReader.name", name); exception.set("SlowReader.elapsedTime", elapsedTime); exception.set("SlowReader.recordDelay", recordDelay); exception.set("SlowReader.fail", fail); return super.addExceptionProperties(exception); } } |
Our custom SlowReader
class accepts several parameters to determine its behaviour:
- name — the unique name added to each record. Helps us to distinguish records coming from different slow readers.
- elapsedTime — the length of time this instance should run before terminating.
- recordDelay — the length of time to wait before returning a new record.
- fail — indicates if this instance should terminate by throwing an exception or just end normally.
You might have noticed an overflow field in the record. This is to help flush any I/O buffering of the output that might stop us from seeing output in real-time. You definitely don’t want to do this in real life.
Creating a custom DataReader
Custom DataReaders are very easy to create. Just extend the DataReader
class and implement its readImpl()
method. In most cases, you’ll also want to implement the open()
and close()
methods, but we can skip those in this simple example.
You’ll also want to implement the addExceptionProperties
method to help with exception reporting. This method uses dynamic exception fields to expose the internal state when things go wrong. The method is called by the framework whenever this reader uses one of its overloaded exception() methods.
Putting it all together
1 2 3 4 5 6 7 8 9 10 11 |
DataReader reader = new AsyncMultiReader() .add(new SlowReader("orange", 5L * 1000L, 50L, false)) .add(new SlowReader("red", 5L * 1000L, 50L, false)) .add(new SlowReader("brown", 3L * 1000L, 40L, true)) .add(new SlowReader("yellow", 2L * 1000L, 20L, true)) .add(new SlowReader("yellow2", 2L * 1000L, 20L, true)) //.setFailOnException(false) ; DataWriter writer = new CSVWriter(new OutputStreamWriter(System.out)); JobTemplate.DEFAULT.transfer(reader, writer); |
Our final bit of code wraps 5 slow readers in an AsyncMultiReader and writes them all as CSV to the console. This example will fail with an exception in yellow or yellow2, but you can play around with the parameters to see what happens.
Where do exceptions go?
This brings us to in important question. If AsyncMultiReader uses separate threads to load data, what happens to exceptions thrown in those threads?
The answer is it depends. By default, exceptions thrown in any reader thread is re-thrown by AsyncMultiReader the next time its read() method is called. This stops the main transfer, gracefully terminates the other reader threads, and closes their data sources.
You can disable the behaviour by calling AsyncMultiReader.setFailOnException(false)
before the transfer starts. With this flag disabled, the transfer will continue to run and end successfully, even if one or all slow readers fail.
In either scenario, you can override the AsyncMultiReader.onTheadFinished(ReaderThread)
callback to be notified on the reader thread when they end. If ReaderThread.getException()
returns null, it means that thread ended normally. Whatever you do, make sure to call super.onTheadFinished()
to make sure everything gets cleaned up properly.
Happy coding!
Pingback: Data Pipeline 3.1 Now Available