Handle Transforming Reader Failures

In this example, you will learn how to handle failures in data transformation using the TransformingReader class. The class is responsible for making changes to incoming data, and this example addresses any potential issues that may arise during the transformation process.

Many data processing tasks involve applying specific business rules to the data. The TransformingReader is responsible for executing these rules, and DataPipeline assists in handling failures encountered during rule execution. Capturing the reason for failure enables you to enforce business rules and identify any violations or inconsistencies in the data.

 

Input XML File

<records>
  <record id="1" first_name="Brian" last_name="Mulliner" hired_on="Thu Jun 18 21:11:17 EDT 2015"></record>
  <record id="2" first_name="Johnny" last_name="Blue" hired_on="Wed Apr 18 21:11:17 EDT 2007"></record>
  <record id="3" first_name="Bula" last_name="Klass" hired_on="Mon Jun 18 21:11:17 EDT 2012"></record>
  <record id="4" first_name="Belinda" last_name="Payor" hired_on="Mon Nov 18 21:11:17 EST 2013"></record>
  <record id="5" first_name="Annmarie" last_name="Remick" hired_on="Tue Sep 18 21:11:17 EDT 2012"></record>
  <record id="6" first_name="Winona" last_name="Arendz" hired_on="Sat Oct 18 21:11:17 EDT 2014"></record>
  <record id="7" first_name="Jeremiah" last_name="Hoelter" hired_on="Mon May 18 21:11:17 EDT 2015"></record>
  <record id="8" first_name="Clayton" last_name="Brucki" hired_on="Tue Feb 18 21:11:17 EST 2014"></record>
  <record id="9" first_name="Lorena" last_name="Decena" hired_on="Tue Nov 18 21:11:17 EST 2014"></record>
  <record id="10" first_name="Cecile" last_name="Fauntleroy" hired_on="Tue Mar 18 21:11:17 EDT 2014"></record>
  <record id="11" first_name="Cherryl" last_name="Creveling" hired_on="Fri Nov 18 21:11:17 EST 2011"></record>
  <record id="12" first_name="Preston" last_name="Penson" hired_on="Sun Jun 18 21:11:17 EDT 2006"></record>
  <record id="13" first_name="Steven" last_name="Pitsenbarger" hired_on="Sun Feb 18 21:11:17 EST 2007"></record>
  <record id="14" first_name="Beaulah" last_name="Graciano" hired_on="Mon Aug 18 21:11:17 EDT 2014"></record>
  <record id="15" first_name="Nadine" last_name="Woollen" hired_on="Mon Nov 18 21:11:17 EST 2013"></record>
  <record id="16" first_name="Jermaine" last_name="Trocchio" hired_on="Sun Apr 18 21:11:17 EDT 2010"></record>
  <record id="17" first_name="Ladawn" last_name="Laroia" hired_on="Fri May 18 21:11:17 EDT 2012"></record>
  <record id="18" first_name="Sharda" last_name="Ackiss" hired_on="Thu Aug 18 21:11:17 EDT 2011"></record>
  <record id="19" first_name="Herlinda" last_name="Moskwa" hired_on="Tue Nov 18 21:11:17 EST 2008"></record>
  <record id="20" first_name="Bob" last_name="Vincik" hired_on="Fri May 18 21:11:17 EDT 2012"></record>
</records>  

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.filter.FieldFilter;
import com.northconcepts.datapipeline.filter.rule.PatternMatch;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.memory.MemoryWriter;
import com.northconcepts.datapipeline.transform.RenameField;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.xml.XmlReader;

public class HandleTransformingReaderFailures {

    public static void main(String[] args) {
        // save discarded records in memory, but can be any DataWriter
        MemoryWriter discardWriter = new MemoryWriter();

        // add a "rename_field_exception" field to each record containing the reason it was discarded 
        String discardReasonFieldName = "rename_field_exception";
        
        DataReader reader = new XmlReader(new File("example/data/input/call-center-agents-2.xml"))
                .addField("id", "//record/@id")
                .addField("first_name", "//record/@first_name")
                .addField("last_name", "//record/@last_name")
                .addField("hired_on", "//record/@hired_on")
                .addRecordBreak("//record");
        
        // rename the field "first_name" to "last_name" for first names starting with 'B'
        // generates an exception because the "last_name" field already exists
        // discardWriter will contain all records with first names starting with 'B'
        reader =  new TransformingReader(reader, discardWriter, discardReasonFieldName)
                .setCondition(new FieldFilter("first_name", new PatternMatch("B.*")))
                .add(new RenameField("first_name", "last_name"));
        
        DataWriter writer = new StreamWriter(System.out);
        
        // write the successful condition records to STDOUT
        Job.run(reader, writer);
        
        // write the discarded records to STDOUT
        System.out.println("\n---- The discarded records ----");
        Job.run(new MemoryReader(discardWriter.getRecordList()), new StreamWriter(System.out));

    }

}

 

Code Walkthrough

  1. MemoryWriter instance is created to save discarded records in memory.
  2. We may need an additional field in the record to hold the failure message. Therefore, "rename_field_exception" String field is initialized.
  3. XmlReader is used to read the data from call-center-agents-2.xml file. The arguments for addFields() methods specify the record field name for the output and expression to get the parameter variable from the input file respectively.
  4. Discarded and undiscarded records are divided with TransformingReader. The filtration for the records with "first_name" starting with the letter "B" is applied.
  5. RenameField is used to rename the existing "first_name" to "last_name". As the records already contain this field, an exception should be thrown. 
  6. Data is transferred from the reader to the writer via Job.run() method. See how to compile and run data pipeline jobs.
  7. The records of both undiscarded and discarded writers are transferred to StreamWriter(System.out) instance in order to be shown in the console.

 

Console Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[id]:STRING=[2]:String
    1:[first_name]:STRING=[Johnny]:String
    2:[last_name]:STRING=[Blue]:String
    3:[hired_on]:STRING=[Wed Apr 18 21:11:17 EDT 2007]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[id]:STRING=[5]:String
    1:[first_name]:STRING=[Annmarie]:String
    2:[last_name]:STRING=[Remick]:String
    3:[hired_on]:STRING=[Tue Sep 18 21:11:17 EDT 2012]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[id]:STRING=[6]:String
    1:[first_name]:STRING=[Winona]:String
    2:[last_name]:STRING=[Arendz]:String
    3:[hired_on]:STRING=[Sat Oct 18 21:11:17 EDT 2014]:String
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[id]:STRING=[7]:String
    1:[first_name]:STRING=[Jeremiah]:String
    2:[last_name]:STRING=[Hoelter]:String
    3:[hired_on]:STRING=[Mon May 18 21:11:17 EDT 2015]:String
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[id]:STRING=[8]:String
    1:[first_name]:STRING=[Clayton]:String
    2:[last_name]:STRING=[Brucki]:String
    3:[hired_on]:STRING=[Tue Feb 18 21:11:17 EST 2014]:String
}

-----------------------------------------------
5 - Record (MODIFIED) {
    0:[id]:STRING=[9]:String
    1:[first_name]:STRING=[Lorena]:String
    2:[last_name]:STRING=[Decena]:String
    3:[hired_on]:STRING=[Tue Nov 18 21:11:17 EST 2014]:String
}

-----------------------------------------------
6 - Record (MODIFIED) {
    0:[id]:STRING=[10]:String
    1:[first_name]:STRING=[Cecile]:String
    2:[last_name]:STRING=[Fauntleroy]:String
    3:[hired_on]:STRING=[Tue Mar 18 21:11:17 EDT 2014]:String
}

-----------------------------------------------
7 - Record (MODIFIED) {
    0:[id]:STRING=[11]:String
    1:[first_name]:STRING=[Cherryl]:String
    2:[last_name]:STRING=[Creveling]:String
    3:[hired_on]:STRING=[Fri Nov 18 21:11:17 EST 2011]:String
}

-----------------------------------------------
8 - Record (MODIFIED) {
    0:[id]:STRING=[12]:String
    1:[first_name]:STRING=[Preston]:String
    2:[last_name]:STRING=[Penson]:String
    3:[hired_on]:STRING=[Sun Jun 18 21:11:17 EDT 2006]:String
}

-----------------------------------------------
9 - Record (MODIFIED) {
    0:[id]:STRING=[13]:String
    1:[first_name]:STRING=[Steven]:String
    2:[last_name]:STRING=[Pitsenbarger]:String
    3:[hired_on]:STRING=[Sun Feb 18 21:11:17 EST 2007]:String
}

-----------------------------------------------
10 - Record (MODIFIED) {
    0:[id]:STRING=[15]:String
    1:[first_name]:STRING=[Nadine]:String
    2:[last_name]:STRING=[Woollen]:String
    3:[hired_on]:STRING=[Mon Nov 18 21:11:17 EST 2013]:String
}

-----------------------------------------------
11 - Record (MODIFIED) {
    0:[id]:STRING=[16]:String
    1:[first_name]:STRING=[Jermaine]:String
    2:[last_name]:STRING=[Trocchio]:String
    3:[hired_on]:STRING=[Sun Apr 18 21:11:17 EDT 2010]:String
}

-----------------------------------------------
12 - Record (MODIFIED) {
    0:[id]:STRING=[17]:String
    1:[first_name]:STRING=[Ladawn]:String
    2:[last_name]:STRING=[Laroia]:String
    3:[hired_on]:STRING=[Fri May 18 21:11:17 EDT 2012]:String
}

-----------------------------------------------
13 - Record (MODIFIED) {
    0:[id]:STRING=[18]:String
    1:[first_name]:STRING=[Sharda]:String
    2:[last_name]:STRING=[Ackiss]:String
    3:[hired_on]:STRING=[Thu Aug 18 21:11:17 EDT 2011]:String
}

-----------------------------------------------
14 - Record (MODIFIED) {
    0:[id]:STRING=[19]:String
    1:[first_name]:STRING=[Herlinda]:String
    2:[last_name]:STRING=[Moskwa]:String
    3:[hired_on]:STRING=[Tue Nov 18 21:11:17 EST 2008]:String
}

-----------------------------------------------
15 records

---- The discarded records ----
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[id]:STRING=[1]:String
    1:[first_name]:STRING=[Brian]:String
    2:[last_name]:STRING=[Mulliner]:String
    3:[hired_on]:STRING=[Thu Jun 18 21:11:17 EDT 2015]:String
    4:[rename_field_exception]:STRING=[transformation [if condition(first_name field: (value's pattern matches B.*)) then transform(renaming first_name to last_name)] ...296]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[id]:STRING=[3]:String
    1:[first_name]:STRING=[Bula]:String
    2:[last_name]:STRING=[Klass]:String
    3:[hired_on]:STRING=[Mon Jun 18 21:11:17 EDT 2012]:String
    4:[rename_field_exception]:STRING=[transformation [if condition(first_name field: (value's pattern matches B.*)) then transform(renaming first_name to last_name)] ...296]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[id]:STRING=[4]:String
    1:[first_name]:STRING=[Belinda]:String
    2:[last_name]:STRING=[Payor]:String
    3:[hired_on]:STRING=[Mon Nov 18 21:11:17 EST 2013]:String
    4:[rename_field_exception]:STRING=[transformation [if condition(first_name field: (value's pattern matches B.*)) then transform(renaming first_name to last_name)] ...296]:String
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[id]:STRING=[14]:String
    1:[first_name]:STRING=[Beaulah]:String
    2:[last_name]:STRING=[Graciano]:String
    3:[hired_on]:STRING=[Mon Aug 18 21:11:17 EDT 2014]:String
    4:[rename_field_exception]:STRING=[transformation [if condition(first_name field: (value's pattern matches B.*)) then transform(renaming first_name to last_name)] ...297]:String
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[id]:STRING=[20]:String
    1:[first_name]:STRING=[Bob]:String
    2:[last_name]:STRING=[Vincik]:String
    3:[hired_on]:STRING=[Fri May 18 21:11:17 EDT 2012]:String
    4:[rename_field_exception]:STRING=[transformation [if condition(first_name field: (value's pattern matches B.*)) then transform(renaming first_name to last_name)] ...297]:String
}

-----------------------------------------------
5 records
Mobile Analytics