Pipe a Writer to a Reader
This example shows you how to pipe a writer to a reader using the PipedReader and PipedWriter classes.
The demo code creates a new thread which writes data to a PipedWriter that is connected to a PipedReader. In the main thread, the PipedReader is then retrieved and used to display data written to the PipedWriter.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.PipedReader; import com.northconcepts.datapipeline.core.PipedWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; public class PipeAWriterToAReader { public static void main(String[] args) { RecordProducer producer = new RecordProducer(); producer.start(); DataReader reader = producer.getReader(); DataWriter writer = new StreamWriter(System.out); Job.run(reader, writer); } public static class RecordProducer extends Thread { private final PipedReader reader = new PipedReader(); private final PipedWriter writer = new PipedWriter(reader); public RecordProducer() { } public PipedReader getReader() { return reader; } @Override public void run() { writer.open(); try { for (int i = 0; i < 20; i++) { Record r = new Record(); r.setField("key", "key-" + i); r.setField("value", "value-" + i); writer.write(r); } } finally { writer.close(); } } } }
Code Walkthrough - RecordProducer class
RecordProducer
class is defined which extends Thread class.- Private field corresponding to PipedReader is defined and initialised.
- Another private field corresponding to PipedWriter is defined and initialised using the PipedReader.
- A for loop is defined inside the run method.
- In each iteration of the loop, a Record object is created with some test data.
- The Record is written to the PipedWriter.
- After the for loop completes, the writer is closed in a finally block.
Code walkthrough - PipeAWriterToAReader class
- An instance of
RecordProducer
is created. - The start method is invoked which kicks off a new thread.
- The PipedReader is retrieved from the
RecordProducer
. - A new StreamWriter is created corresponding to the standard console using System.out.
- Data is transferred from the PipedReader to StreamWriter via JobTemplate.DEFAULT.transfer method.
How data piping works
In the main method, an instance of RecordProducer
is created which spawns off a new thread that begins writing records to the
PipedWriter.
The PipedReader connected to this
PipedWriter is retrieved and data from the
PipedReader is transferred to a new
StreamWriter via the
JobTemplate.DEFAULT.transfer method.
This shows how the data which was written to the PipedWriter
in a separate thread can be retrieved from the PipedReader
in the main thread.
Console output
11:00:47,423 DEBUG [main] datapipeline:60 - job::Start ----------------------------------------------- 0 - Record (MODIFIED) { 0:[key]:STRING=[key-0]:String 1:[value]:STRING=[value-0]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[key]:STRING=[key-1]:String 1:[value]:STRING=[value-1]:String } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[key]:STRING=[key-2]:String 1:[value]:STRING=[value-2]:String } ----------------------------------------------- 3 - Record (MODIFIED) { 0:[key]:STRING=[key-3]:String 1:[value]:STRING=[value-3]:String } ----------------------------------------------- 4 - Record (MODIFIED) { 0:[key]:STRING=[key-4]:String 1:[value]:STRING=[value-4]:String } ----------------------------------------------- 5 - Record (MODIFIED) { 0:[key]:STRING=[key-5]:String 1:[value]:STRING=[value-5]:String } ----------------------------------------------- 6 - Record (MODIFIED) { 0:[key]:STRING=[key-6]:String 1:[value]:STRING=[value-6]:String } ----------------------------------------------- 7 - Record (MODIFIED) { 0:[key]:STRING=[key-7]:String 1:[value]:STRING=[value-7]:String } ----------------------------------------------- 8 - Record (MODIFIED) { 0:[key]:STRING=[key-8]:String 1:[value]:STRING=[value-8]:String } ----------------------------------------------- 9 - Record (MODIFIED) { 0:[key]:STRING=[key-9]:String 1:[value]:STRING=[value-9]:String } ----------------------------------------------- 10 - Record (MODIFIED) { 0:[key]:STRING=[key-10]:String 1:[value]:STRING=[value-10]:String } ----------------------------------------------- 11 - Record (MODIFIED) { 0:[key]:STRING=[key-11]:String 1:[value]:STRING=[value-11]:String } ----------------------------------------------- 12 - Record (MODIFIED) { 0:[key]:STRING=[key-12]:String 1:[value]:STRING=[value-12]:String } ----------------------------------------------- 13 - Record (MODIFIED) { 0:[key]:STRING=[key-13]:String 1:[value]:STRING=[value-13]:String } ----------------------------------------------- 14 - Record (MODIFIED) { 0:[key]:STRING=[key-14]:String 1:[value]:STRING=[value-14]:String } ----------------------------------------------- 15 - Record (MODIFIED) { 0:[key]:STRING=[key-15]:String 1:[value]:STRING=[value-15]:String } ----------------------------------------------- 16 - Record (MODIFIED) { 0:[key]:STRING=[key-16]:String 1:[value]:STRING=[value-16]:String } ----------------------------------------------- 17 - Record (MODIFIED) { 0:[key]:STRING=[key-17]:String 1:[value]:STRING=[value-17]:String } ----------------------------------------------- 18 - Record (MODIFIED) { 0:[key]:STRING=[key-18]:String 1:[value]:STRING=[value-18]:String } ----------------------------------------------- 19 - Record (MODIFIED) { 0:[key]:STRING=[key-19]:String 1:[value]:STRING=[value-19]:String } ----------------------------------------------- 11:00:47,437 DEBUG [main] datapipeline:72 - job::Success 20 records