Handle Transforming Writer Failures
Updated: Jun 18, 2023
In this example, you will learn how you can use DataPipline to handle failures that may occur when using TransformingWriter class to modify data. It provides robust error-handling mechanisms and ensures data integrity and consistency even in the face of unexpected failures.
This approach can be integrated into automated data processing pipelines or systems to automatically detect and recovers from failures during data modification operations. This ensures uninterrupted data processing and minimizes the impact of failures on the overall data flow.
Input FW file
Account LastName FirstName Balance CreditLimit AccountCreated Rating 101 Reeves Keanu 9315.45 10000.00 1/17/1998 A 312 Butler Gerard 90.00 1000.00 8/6/2003 B 868 Hewitt Jennifer Love 0 17000.00 5/25/1985 B 761 Pinkett-Smith Jada 49654.87 100000.00 12/5/2006 A 317 Murray Bill 789.65 5000.00 2/5/2007 C
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import java.text.DecimalFormat; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.filter.FilterExpression; import com.northconcepts.datapipeline.fixedwidth.FixedWidthReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.memory.MemoryWriter; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.SetCalculatedField; import com.northconcepts.datapipeline.transform.TransformingWriter; public class HandleTransformingWriterFailures { public static void main(String[] args) { // save discarded/undiscarded records in memory, but can be any DataWriter MemoryWriter discardWriter = new MemoryWriter(); MemoryWriter undiscardWriter = new MemoryWriter(); // add a "failed_stringToDouble_rule" field to each record containing the reason it was discarded String discardReasonFieldName = "failed_stringToDouble_rule"; // transform the "Rating" value to a Double, should generate an exception BasicFieldTransformer transformer = new BasicFieldTransformer("Rating"); transformer.stringToDouble(new DecimalFormat()); DataReader reader = new FixedWidthReader(new File("example/data/input/credit-balance-01.fw")) .addFields(8) .addFields(16) .addFields(16) .addFields(12) .addFields(14) .skipField(16) // ignore AccountCreated field .addFields(7) .setFieldNamesInFirstRow(true); // records with "Balance > 100" causes an exception and is sent to discardWriter DataWriter writer = new TransformingWriter(undiscardWriter, discardWriter, discardReasonFieldName) .setCondition(new FilterExpression("Balance > 100")) .add(transformer); // Since Balance field is a string, need to convert it to double writer = new TransformingWriter(writer) .add(new SetCalculatedField("Balance", "parseDouble(Balance)")); Job.run(reader, writer); // write the undiscarded records to STDOUT System.out.println("\n---- The undiscarded records ----"); Job.run(new MemoryReader(undiscardWriter.getRecordList()), new StreamWriter(System.out)); // write the discarded records to STDOUT System.out.println("\n---- The discarded records ----"); Job.run(new MemoryReader(discardWriter.getRecordList()), new StreamWriter(System.out)); } }
Code walkthrough
- Two MemoryWriter instances are created to save discarded and undiscarded records.
- We may need an additional field in the record to hold the failure message. Therefore,
"failed_stringToDouble_rule"
String field is initialized. - BasicFieldTransformer is created to parse the "Rating" field from String to Double type.
- FixedWidthReader is used to read the data from
credit-balance-01.fw
file. The numbers which are given as a parameter foraddFields()
andskipFields()
methods mean the width of fields. - Discarded and undiscarded records are divided with TransformingWriter. The filtration for the records with a balance greater than 100 is applied before being added to the
transformer
object mentioned in step 3. - TransformingWriter is used again with SetCalculatedField to parse the Balance field from String to Double type.
- Data is transferred from the
reader
to thewriter
via Job.run() method. - The records of both undiscarded and discarded writers are transferred to StreamWriter(System.out) instance in order to be shown in the console.
Output
---- The undiscarded records ---- ----------------------------------------------- 0 - Record (MODIFIED) { 0:[Account]:STRING=[312]:String 1:[LastName]:STRING=[Butler]:String 2:[FirstName]:STRING=[Gerard]:String 3:[Balance]:DOUBLE=[90.0]:Double 4:[CreditLimit]:STRING=[1000.00]:String 5:[Rating]:STRING=[B]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Account]:STRING=[868]:String 1:[LastName]:STRING=[Hewitt]:String 2:[FirstName]:STRING=[Jennifer Love]:String 3:[Balance]:DOUBLE=[0.0]:Double 4:[CreditLimit]:STRING=[17000.00]:String 5:[Rating]:STRING=[B]:String } ----------------------------------------------- 2 records 18:48:24,024 DEBUG [main] datapipeline:661 - job::Success ---- The discarded records ---- ----------------------------------------------- 0 - Record (MODIFIED) { 0:[Account]:STRING=[101]:String 1:[LastName]:STRING=[Reeves]:String 2:[FirstName]:STRING=[Keanu]:String 3:[Balance]:DOUBLE=[9315.45]:Double 4:[CreditLimit]:STRING=[10000.00]:String 5:[Rating]:STRING=[A]:String 6:[failed_stringToDouble_rule]:STRING=[transformation [if condition(record satisfies expression: Balance > 100) then transform(transforming Rating)] failed on record 0...221]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Account]:STRING=[761]:String 1:[LastName]:STRING=[Pinkett-Smith]:String 2:[FirstName]:STRING=[Jada]:String 3:[Balance]:DOUBLE=[49654.87]:Double 4:[CreditLimit]:STRING=[100000.00]:String 5:[Rating]:STRING=[A]:String 6:[failed_stringToDouble_rule]:STRING=[transformation [if condition(record satisfies expression: Balance > 100) then transform(transforming Rating)] failed on record 3...221]:String } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[Account]:STRING=[317]:String 1:[LastName]:STRING=[Murray]:String 2:[FirstName]:STRING=[Bill]:String 3:[Balance]:DOUBLE=[789.65]:Double 4:[CreditLimit]:STRING=[5000.00]:String 5:[Rating]:STRING=[C]:String 6:[failed_stringToDouble_rule]:STRING=[transformation [if condition(record satisfies expression: Balance > 100) then transform(transforming Rating)] failed on record 4...221]:String } ----------------------------------------------- 3 records