Write my own Data Reader

Updated: Aug 24, 2023

This example provides a customizable DataReader component that allows you to modify the default data reading behavior. By utilizing this functionality, you can implement your own data reading logic to accommodate various data sources and formats, ensuring seamless integration with their existing data pipelines and applications.

This can be used in scenarios where the default DataReader provided by DataPipeline does not support a specific data source or requires additional data preprocessing before ingestion. You may have data stored in diverse formats like CSV, JSON, or databases, and you need to tailor the data reading process to handle unique file structures or data schema variations. The ability to change the default DataReader enables you to retrieve data from a range of sources and streamline data ingestion for downstream processing and analysis.

 

Java Code Listing

WriteMyOwnDataReader.java

package com.northconcepts.datapipeline.examples.cookbook.customization;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;

public class WriteMyOwnDataReader {
    
    public static void main(String[] args) throws Throwable {
        DataReader reader = new MyDataReader();
        reader = new MyProxyReader(reader);
        Job.run(reader, new StreamWriter(System.out));
    }
    
}

 

MyDataReader.java

package com.northconcepts.datapipeline.examples.cookbook.customization;

import com.northconcepts.datapipeline.core.DataException;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.Record;

public class MyDataReader extends DataReader {
    
    public void open() throws DataException {
        super.open();
        // open datasource here
    }

    public void close() throws DataException {
        // close datasource here
        super.close();
    }
    
    protected Record readImpl() throws Throwable {
        // write logic to return new instances of Record here
        // return null for end-of-file
        return null;
    }
    
}

 

MyProxyReader.java

package com.northconcepts.datapipeline.examples.cookbook.customization;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.ProxyReader;
import com.northconcepts.datapipeline.core.Record;

public class MyProxyReader extends ProxyReader {
    
    public MyProxyReader(DataReader nestedDataReader) {
        super(nestedDataReader);
    }
    
    protected Record interceptRecord(Record record) throws Throwable {
        // write logic to transform Record here
        // return null to exclude this record
        // use this.push(Record) and Record.clone() to add
        // new records to this stream
        return super.interceptRecord(record);
    }

}

 

Code Walkthrough

  1. MyDataReader instance is created and then wrapped inside MyProxyReader instance. Both classes specified in this step are to be customized in order to match the different needs of the clients.
  2. Job.run(reader, writer) is used to transfer the data from reader to writer. See how to compile and run data pipeline jobs. 

 

Mobile Analytics