Pipe a Writer to a Reader
This example shows you how to pipe a writer to a reader using the PipedReader and PipedWriter classes.
The demo code creates a new thread which writes data to a PipedWriter that is connected to a PipedReader. In the main thread, the PipedReader is then retrieved and used to display data written to the PipedWriter.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.PipedReader;
import com.northconcepts.datapipeline.core.PipedWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
public class PipeAWriterToAReader {
public static void main(String[] args) {
RecordProducer producer = new RecordProducer();
producer.start();
DataReader reader = producer.getReader();
DataWriter writer = new StreamWriter(System.out);
Job.run(reader, writer);
}
public static class RecordProducer extends Thread {
private final PipedReader reader = new PipedReader();
private final PipedWriter writer = new PipedWriter(reader);
public RecordProducer() {
}
public PipedReader getReader() {
return reader;
}
@Override
public void run() {
writer.open();
try {
for (int i = 0; i < 20; i++) {
Record r = new Record();
r.setField("key", "key-" + i);
r.setField("value", "value-" + i);
writer.write(r);
}
} finally {
writer.close();
}
}
}
}
Code Walkthrough - RecordProducer class
RecordProducerclass is defined which extends Thread class.- Private field corresponding to PipedReader is defined and initialised.
- Another private field corresponding to PipedWriter is defined and initialised using the PipedReader.
- A for loop is defined inside the run method.
- In each iteration of the loop, a Record object is created with some test data.
- The Record is written to the PipedWriter.
- After the for loop completes, the writer is closed in a finally block.
Code walkthrough - PipeAWriterToAReader class
- An instance of
RecordProduceris created. - The start method is invoked which kicks off a new thread.
- The PipedReader is retrieved from the
RecordProducer. - A new StreamWriter is created corresponding to the standard console using System.out.
- Data is transferred from the PipedReader to StreamWriter via JobTemplate.DEFAULT.transfer method.
How data piping works
In the main method, an instance of RecordProducer is created which spawns off a new thread that begins writing records to the
PipedWriter.
The PipedReader connected to this
PipedWriter is retrieved and data from the
PipedReader is transferred to a new
StreamWriter via the
JobTemplate.DEFAULT.transfer method.
This shows how the data which was written to the PipedWriter
in a separate thread can be retrieved from the PipedReader
in the main thread.
Console output
11:00:47,423 DEBUG [main] datapipeline:60 - job::Start
-----------------------------------------------
0 - Record (MODIFIED) {
0:[key]:STRING=[key-0]:String
1:[value]:STRING=[value-0]:String
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[key]:STRING=[key-1]:String
1:[value]:STRING=[value-1]:String
}
-----------------------------------------------
2 - Record (MODIFIED) {
0:[key]:STRING=[key-2]:String
1:[value]:STRING=[value-2]:String
}
-----------------------------------------------
3 - Record (MODIFIED) {
0:[key]:STRING=[key-3]:String
1:[value]:STRING=[value-3]:String
}
-----------------------------------------------
4 - Record (MODIFIED) {
0:[key]:STRING=[key-4]:String
1:[value]:STRING=[value-4]:String
}
-----------------------------------------------
5 - Record (MODIFIED) {
0:[key]:STRING=[key-5]:String
1:[value]:STRING=[value-5]:String
}
-----------------------------------------------
6 - Record (MODIFIED) {
0:[key]:STRING=[key-6]:String
1:[value]:STRING=[value-6]:String
}
-----------------------------------------------
7 - Record (MODIFIED) {
0:[key]:STRING=[key-7]:String
1:[value]:STRING=[value-7]:String
}
-----------------------------------------------
8 - Record (MODIFIED) {
0:[key]:STRING=[key-8]:String
1:[value]:STRING=[value-8]:String
}
-----------------------------------------------
9 - Record (MODIFIED) {
0:[key]:STRING=[key-9]:String
1:[value]:STRING=[value-9]:String
}
-----------------------------------------------
10 - Record (MODIFIED) {
0:[key]:STRING=[key-10]:String
1:[value]:STRING=[value-10]:String
}
-----------------------------------------------
11 - Record (MODIFIED) {
0:[key]:STRING=[key-11]:String
1:[value]:STRING=[value-11]:String
}
-----------------------------------------------
12 - Record (MODIFIED) {
0:[key]:STRING=[key-12]:String
1:[value]:STRING=[value-12]:String
}
-----------------------------------------------
13 - Record (MODIFIED) {
0:[key]:STRING=[key-13]:String
1:[value]:STRING=[value-13]:String
}
-----------------------------------------------
14 - Record (MODIFIED) {
0:[key]:STRING=[key-14]:String
1:[value]:STRING=[value-14]:String
}
-----------------------------------------------
15 - Record (MODIFIED) {
0:[key]:STRING=[key-15]:String
1:[value]:STRING=[value-15]:String
}
-----------------------------------------------
16 - Record (MODIFIED) {
0:[key]:STRING=[key-16]:String
1:[value]:STRING=[value-16]:String
}
-----------------------------------------------
17 - Record (MODIFIED) {
0:[key]:STRING=[key-17]:String
1:[value]:STRING=[value-17]:String
}
-----------------------------------------------
18 - Record (MODIFIED) {
0:[key]:STRING=[key-18]:String
1:[value]:STRING=[value-18]:String
}
-----------------------------------------------
19 - Record (MODIFIED) {
0:[key]:STRING=[key-19]:String
1:[value]:STRING=[value-19]:String
}
-----------------------------------------------
11:00:47,437 DEBUG [main] datapipeline:72 - job::Success
20 records
