Handle Transforming Writer Failures

In this example, you will learn how you can use DataPipline to handle failures that may occur when using TransformingWriter class to modify data. It provides robust error-handling mechanisms and ensures data integrity and consistency even in the face of unexpected failures. 

This approach can be integrated into automated data processing pipelines or systems to automatically detect and recovers from failures during data modification operations. This ensures uninterrupted data processing and minimizes the impact of failures on the overall data flow.

Input FW file

Account LastName        FirstName       Balance     CreditLimit   AccountCreated  Rating 
101     Reeves          Keanu           9315.45     10000.00      1/17/1998       A      
312     Butler          Gerard          90.00       1000.00       8/6/2003        B      
868     Hewitt          Jennifer Love   0           17000.00      5/25/1985       B      
761     Pinkett-Smith   Jada            49654.87    100000.00     12/5/2006       A      
317     Murray          Bill            789.65      5000.00       2/5/2007        C      

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;
import java.text.DecimalFormat;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.filter.FilterExpression;
import com.northconcepts.datapipeline.fixedwidth.FixedWidthReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.memory.MemoryWriter;
import com.northconcepts.datapipeline.transform.BasicFieldTransformer;
import com.northconcepts.datapipeline.transform.SetCalculatedField;
import com.northconcepts.datapipeline.transform.TransformingWriter;

public class HandleTransformingWriterFailures {

    public static void main(String[] args) {
        // save discarded/undiscarded records in memory, but can be any DataWriter
        MemoryWriter discardWriter = new MemoryWriter();
        MemoryWriter undiscardWriter = new MemoryWriter();
        
        // add a "failed_stringToDouble_rule" field to each record containing the reason it was discarded 
        String discardReasonFieldName = "failed_stringToDouble_rule";
        
        // transform the "Rating" value to a Double, should generate an exception
        BasicFieldTransformer transformer = new BasicFieldTransformer("Rating");
        transformer.stringToDouble(new DecimalFormat());
        
        DataReader reader = new FixedWidthReader(new File("example/data/input/credit-balance-01.fw"))
                .addFields(8)
                .addFields(16)
                .addFields(16)
                .addFields(12)
                .addFields(14)
                .skipField(16)  // ignore AccountCreated field
                .addFields(7)
                .setFieldNamesInFirstRow(true);
        
        // records with "Balance > 100" causes an exception and is sent to discardWriter
        DataWriter writer = new TransformingWriter(undiscardWriter, discardWriter, discardReasonFieldName)
                .setCondition(new FilterExpression("Balance > 100"))
                .add(transformer);
        
        // Since Balance field is a string, need to convert it to double
        writer = new TransformingWriter(writer)
                .add(new SetCalculatedField("Balance", "parseDouble(Balance)"));
        
        Job.run(reader, writer);
        
        // write the undiscarded records to STDOUT
        System.out.println("\n---- The undiscarded records ----");
        Job.run(new MemoryReader(undiscardWriter.getRecordList()), new StreamWriter(System.out));

        // write the discarded records to STDOUT
        System.out.println("\n---- The discarded records ----");
        Job.run(new MemoryReader(discardWriter.getRecordList()), new StreamWriter(System.out));
        
    }

}

Code walkthrough

  1. Two MemoryWriter instances are created to save discarded and undiscarded records. 
  2. We may need an additional field in the record to hold the failure message. Therefore, "failed_stringToDouble_rule" String field is initialized.
  3. BasicFieldTransformer is created to parse the "Rating" field from String to Double type.
  4. FixedWidthReader is used to read the data from credit-balance-01.fw file. The numbers which are given as a parameter for addFields() and skipFields() methods mean the width of fields.
  5. Discarded and undiscarded records are divided with TransformingWriter. The filtration for the records with a balance greater than 100 is applied before being added to the transformer object mentioned in step 3.
  6. TransformingWriter is used again with SetCalculatedField to parse the Balance field from String to Double type.
  7. Data is transferred from the reader to the writer via Job.run() method.
  8. The records of both undiscarded and discarded writers are transferred to StreamWriter(System.out) instance in order to be shown in the console.

Output

---- The undiscarded records ----

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account]:STRING=[312]:String
    1:[LastName]:STRING=[Butler]:String
    2:[FirstName]:STRING=[Gerard]:String
    3:[Balance]:DOUBLE=[90.0]:Double
    4:[CreditLimit]:STRING=[1000.00]:String
    5:[Rating]:STRING=[B]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account]:STRING=[868]:String
    1:[LastName]:STRING=[Hewitt]:String
    2:[FirstName]:STRING=[Jennifer Love]:String
    3:[Balance]:DOUBLE=[0.0]:Double
    4:[CreditLimit]:STRING=[17000.00]:String
    5:[Rating]:STRING=[B]:String
}

-----------------------------------------------
2 records
18:48:24,024 DEBUG [main] datapipeline:661 - job::Success

---- The discarded records ----

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Account]:STRING=[101]:String
    1:[LastName]:STRING=[Reeves]:String
    2:[FirstName]:STRING=[Keanu]:String
    3:[Balance]:DOUBLE=[9315.45]:Double
    4:[CreditLimit]:STRING=[10000.00]:String
    5:[Rating]:STRING=[A]:String
    6:[failed_stringToDouble_rule]:STRING=[transformation [if condition(record satisfies expression: Balance > 100) then transform(transforming Rating)] failed on record 0...221]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Account]:STRING=[761]:String
    1:[LastName]:STRING=[Pinkett-Smith]:String
    2:[FirstName]:STRING=[Jada]:String
    3:[Balance]:DOUBLE=[49654.87]:Double
    4:[CreditLimit]:STRING=[100000.00]:String
    5:[Rating]:STRING=[A]:String
    6:[failed_stringToDouble_rule]:STRING=[transformation [if condition(record satisfies expression: Balance > 100) then transform(transforming Rating)] failed on record 3...221]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[Account]:STRING=[317]:String
    1:[LastName]:STRING=[Murray]:String
    2:[FirstName]:STRING=[Bill]:String
    3:[Balance]:DOUBLE=[789.65]:Double
    4:[CreditLimit]:STRING=[5000.00]:String
    5:[Rating]:STRING=[C]:String
    6:[failed_stringToDouble_rule]:STRING=[transformation [if condition(record satisfies expression: Balance > 100) then transform(transforming Rating)] failed on record 4...221]:String
}

-----------------------------------------------
3 records
Mobile Analytics