Pipe a Writer to a Reader

Updated: Feb 21, 2022

This example shows you how to pipe a writer to a reader using the PipedReader and PipedWriter classes.

The demo code creates a new thread which writes data to a PipedWriter that is connected to a PipedReader. In the main thread, the PipedReader is then retrieved and used to display data written to the PipedWriter.

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.PipedReader;
import com.northconcepts.datapipeline.core.PipedWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;

public class PipeAWriterToAReader {
    
    public static void main(String[] args) {
        RecordProducer producer = new RecordProducer();
        producer.start();
        
        DataReader reader = producer.getReader();
        DataWriter writer = new StreamWriter(System.out);        
        
        Job.run(reader, writer);
    }


    public static class RecordProducer extends Thread {
        
        private final PipedReader reader = new PipedReader();
        private final PipedWriter writer = new PipedWriter(reader);

        public RecordProducer() {
        }
        
        public PipedReader getReader() {
            return reader;
        }
        
        @Override
        public void run() {
            writer.open();
            try {
                for (int i = 0; i < 20; i++) {
                    Record r = new Record();
                    r.setField("key", "key-" + i);
                    r.setField("value", "value-" + i);
                    writer.write(r);
                }
            } finally {
                writer.close();
            }
        }
        
    }
    
}

Code Walkthrough - RecordProducer class

  1. RecordProducer class is defined which extends Thread class.
  2. Private field corresponding to PipedReader is defined and initialised.
  3. Another private field corresponding to PipedWriter is defined and initialised using the PipedReader.
  4. A for loop is defined inside the run method.
  5. In each iteration of the loop, a Record object is created with some test data.
  6. The Record is written to the PipedWriter.
  7. After the for loop completes, the writer is closed in a finally block.

Code walkthrough - PipeAWriterToAReader class

  1. An instance of RecordProducer is created.
  2. The start method is invoked which kicks off a new thread.
  3. The PipedReader is retrieved from the RecordProducer.
  4. A new StreamWriter is created corresponding to the standard console using System.out.
  5. Data is transferred from the PipedReader to StreamWriter via JobTemplate.DEFAULT.transfer method.

How data piping works

In the main method, an instance of RecordProducer is created which spawns off a new thread that begins writing records to the PipedWriter. The PipedReader connected to this PipedWriter is retrieved and data from the PipedReader is transferred to a new StreamWriter via the JobTemplate.DEFAULT.transfer method. This shows how the data which was written to the PipedWriter in a separate thread can be retrieved from the PipedReader in the main thread.

Console output

11:00:47,423 DEBUG [main] datapipeline:60 - job::Start
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[key]:STRING=[key-0]:String
    1:[value]:STRING=[value-0]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[key]:STRING=[key-1]:String
    1:[value]:STRING=[value-1]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[key]:STRING=[key-2]:String
    1:[value]:STRING=[value-2]:String
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[key]:STRING=[key-3]:String
    1:[value]:STRING=[value-3]:String
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[key]:STRING=[key-4]:String
    1:[value]:STRING=[value-4]:String
}

-----------------------------------------------
5 - Record (MODIFIED) {
    0:[key]:STRING=[key-5]:String
    1:[value]:STRING=[value-5]:String
}

-----------------------------------------------
6 - Record (MODIFIED) {
    0:[key]:STRING=[key-6]:String
    1:[value]:STRING=[value-6]:String
}

-----------------------------------------------
7 - Record (MODIFIED) {
    0:[key]:STRING=[key-7]:String
    1:[value]:STRING=[value-7]:String
}

-----------------------------------------------
8 - Record (MODIFIED) {
    0:[key]:STRING=[key-8]:String
    1:[value]:STRING=[value-8]:String
}

-----------------------------------------------
9 - Record (MODIFIED) {
    0:[key]:STRING=[key-9]:String
    1:[value]:STRING=[value-9]:String
}

-----------------------------------------------
10 - Record (MODIFIED) {
    0:[key]:STRING=[key-10]:String
    1:[value]:STRING=[value-10]:String
}

-----------------------------------------------
11 - Record (MODIFIED) {
    0:[key]:STRING=[key-11]:String
    1:[value]:STRING=[value-11]:String
}

-----------------------------------------------
12 - Record (MODIFIED) {
    0:[key]:STRING=[key-12]:String
    1:[value]:STRING=[value-12]:String
}

-----------------------------------------------
13 - Record (MODIFIED) {
    0:[key]:STRING=[key-13]:String
    1:[value]:STRING=[value-13]:String
}

-----------------------------------------------
14 - Record (MODIFIED) {
    0:[key]:STRING=[key-14]:String
    1:[value]:STRING=[value-14]:String
}

-----------------------------------------------
15 - Record (MODIFIED) {
    0:[key]:STRING=[key-15]:String
    1:[value]:STRING=[value-15]:String
}

-----------------------------------------------
16 - Record (MODIFIED) {
    0:[key]:STRING=[key-16]:String
    1:[value]:STRING=[value-16]:String
}

-----------------------------------------------
17 - Record (MODIFIED) {
    0:[key]:STRING=[key-17]:String
    1:[value]:STRING=[value-17]:String
}

-----------------------------------------------
18 - Record (MODIFIED) {
    0:[key]:STRING=[key-18]:String
    1:[value]:STRING=[value-18]:String
}

-----------------------------------------------
19 - Record (MODIFIED) {
    0:[key]:STRING=[key-19]:String
    1:[value]:STRING=[value-19]:String
}

-----------------------------------------------
11:00:47,437 DEBUG [main] datapipeline:72 - job::Success
20 records
Mobile Analytics