Use a Retrying Reader
This example utilizes a RetryingReader, which is designed to handle failures by making multiple attempts to read a file. This approach ensures data integrity and resilience, making it suitable for scenarios where reliable data retrieval is crucial, such as log processing, data ingestion pipelines, or distributed systems that require fault-tolerant file reading capabilities.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.avro.AvroReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.retry.RetryingReader; public class UseRetryingReader { public static void main(String[] args) { DataReader reader = new AvroReader(new File("example/data/input/twitter.avro")); reader = new RetryingReader(reader); Job.run(reader, new StreamWriter(System.out)); } /* output ----------------------------------------------- 0 - Record (MODIFIED) { 0:[username]:STRING=[miguno]:String 1:[tweet]:STRING=[Rock: Nerf paper, scissors is fine.]:String 2:[timestamp]:LONG=[1366150681]:Long } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[username]:STRING=[BlizzardCS]:String 1:[tweet]:STRING=[Works as intended. Terran is IMBA.]:String 2:[timestamp]:LONG=[1366154481]:Long } ----------------------------------------------- 2 records */ }
Code walkthrough
- First, AvroReader is created to read records from
twitter.avro
file. - RetryingReader is created using
AvroReader
object. - Data are transferred from RetryingReader to the console via Job.run() method. See how to compile and run data pipeline jobs.
AvroReader
Read records from an AVRO file. It is a data file created by Apache Avro, an open-source project that provides data serialization and data exchange services for Apache Hadoop. AvroReader class extends IntegrationReader and can be created using File or InputStream object.
RetryingReader
A proxy that attempts to continue reading on failure. It extends ProxyReader and can be created using DataReader object. You can find out why certain records are failing to be read by the reader using RetryingReader.getDiscardReasonFieldName() method.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[username]:STRING=[miguno]:String 1:[tweet]:STRING=[Rock: Nerf paper, scissors is fine.]:String 2:[timestamp]:LONG=[1366150681]:Long } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[username]:STRING=[BlizzardCS]:String 1:[tweet]:STRING=[Works as intended. Terran is IMBA.]:String 2:[timestamp]:LONG=[1366154481]:Long } ----------------------------------------------- 2 records