Write my own Data Writer

Updated: Aug 24, 2023

This example shows how to create a customizable DataWriter component, allowing you to modify the default behavior of writing data. By leveraging this feature, you can implement your own specialized data-writing logic tailored to specific use cases and requirements.

This can be used in scenarios where the default DataWriter's behavior needs customization to accommodate specific data formats, target destinations, or data processing requirements. For example, you may want to write data to a non-standard file format, integrate it with a proprietary data storage system, or apply additional data transformations before writing to the final output. The ability to change the default DataWriter provides you with the flexibility and control to adapt the data writing process according to your unique needs.

 

Input CSV File

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

 

Java Code Listing

WriteMyOwnDataWriter.java

package com.northconcepts.datapipeline.examples.cookbook.customization;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;

public class WriteMyOwnDataWriter {
    
    public static void main(String[] args) throws Throwable {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
            .setFieldNamesInFirstRow(true);
    
        DataWriter writer = new MyDataWriter();
        writer = new MyProxyWriter(writer);

        Job.run(reader, writer);
    }
    
}

 

MyDataWriter.java

package com.northconcepts.datapipeline.examples.cookbook.customization;

import com.northconcepts.datapipeline.core.DataException;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;

public class MyDataWriter extends DataWriter {
    
    public void open() throws DataException {
        super.open();
        // open datasource here
    }

    public void close() throws DataException {
        // close datasource here
        super.close();
    }
    
    protected void writeImpl(Record record) throws Throwable {
        // write record to datasink here
    }
    
}

 

MyProxyWriter.java

package com.northconcepts.datapipeline.examples.cookbook.customization;

import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.ProxyWriter;
import com.northconcepts.datapipeline.core.Record;

public class MyProxyWriter extends ProxyWriter {
    
    public MyProxyWriter(DataWriter nestedDataWriter) {
        super(nestedDataWriter);
    }
    
    protected Record interceptRecord(Record record) throws Throwable {
        // write logic to transform Record here
        // return null to exclude this record
        // use this.push(Record) and Record.clone() to add
        // new records to this stream
        return super.interceptRecord(record);
    }

}

 

Code Walkthrough

  1. CSVReader is created corresponding to the input file credit-balance-01.csv.
  2. The setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  3. MyDataWriter instance is created and then wrapped inside MyProxyWriter instance. Both classes specified in this step are to be customized in order to match the different needs of the clients.
  4. Job.run(reader, writer) is used to transfer the data from reader to writer. See how to compile and run data pipeline jobs. 

 

Mobile Analytics