Use Data Lineage with CsvReader
Updated: May 30, 2022
For this example we are going to see how we are going to use DataPipline with Data lineage which is a metadata added to records and fields indicating where they were loaded from. It can be useful for audits and reconciliation as well as troubleshooting.
Data lineage can also be used with other readers, for example Data Lineage with excel reader and Data Lineage with fixedwidthreader.
Input CSV file
rating_code,rating_description A,Class A B,Class B C,Class C
Java Code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Field; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.lineage.FieldLineage; import com.northconcepts.datapipeline.lineage.RecordLineage; public class UseDataLineageWithCsvReader { public static void main(String[] args) { DataReader reader = new CSVReader(new File("example/data/input/rating-table-01.csv")) .setFieldNamesInFirstRow(true) .setSaveLineage(true); // Enable lineage support. (By default, it is disabled.) // If lineage is not supported by reader, exception will be thrown. Job.run(reader, new LineageWriter()); } public final static class LineageWriter extends DataWriter { @Override protected void writeImpl(Record record) throws Throwable { System.out.println(record); RecordLineage recordLineage = new RecordLineage().setRecord(record); System.out.println("Record Lineage"); System.out.println(" File: " + recordLineage.getFile()); System.out.println(" File Line: " + recordLineage.getFileLineNumber()); System.out.println(" File Column: " + recordLineage.getFileColumnNumber()); System.out.println(" Record: " + recordLineage.getRecordNumber()); System.out.println(); FieldLineage fieldLineage = new FieldLineage(); System.out.println("Field Lineage"); for (int i=0; i < record.getFieldCount(); i++) { Field field = record.getField(i); fieldLineage.setField(field); System.out.println(" " + field.getName()); System.out.println(" File: " + fieldLineage.getFile()); System.out.println(" File Line: " + fieldLineage.getFileLineNumber()); System.out.println(" File Column: " + fieldLineage.getFileColumnNumber()); System.out.println(" Record: " + fieldLineage.getRecordNumber()); System.out.println(" Field Index: " + fieldLineage.getOriginalFieldIndex()); System.out.println(" Field Name: " + fieldLineage.getOriginalFieldName()); } System.out.println("---------------------------------------------------------"); System.out.println(); } } }
Code walkthrough
- First we create a CSVReader that receives input from
rating-table-01.csv
and the set the field name to be displayed in the first row. Job.run()
method which transfers data from thereader
to theLineageWriter()
is then called.RecordLineage
informs us of the starting location where the record was loaded from whileFieldLineage
informs us of the starting location for each individual field.
Record Lineage Properties
recordLineage.getFile()
- The java.io.File, if one was used to create the DataReader.recordLineage.getFileLineNumber()
-The line number in the input file starting with 0.recordLineage.getFileColumnNumber()
-The column number in the input file starting with 0.recordLineage.getRecordNumber()
-The sequential record number starting with 0.
Field Lineage Properties
fieldLineage.getOriginalFieldIndex()
-The index of a field set by the DataReader before any transformation or operation was performed.fieldLineage.getOriginalFieldName()
-The name of a field set by the DataReader before any transformation or operation was performed.
Output
Record { 0:[rating_code]:STRING=[A]:String 1:[rating_description]:STRING=[Class A]:String } Record Lineage File: example\data\input\rating-table-01.csv File Line: 1 File Column: 0 Record: 0 Field Lineage rating_code File: example\data\input\rating-table-01.csv File Line: 1 File Column: 0 Record: 0 Field Index: 0 Field Name: rating_code rating_description File: example\data\input\rating-table-01.csv File Line: 1 File Column: 2 Record: 0 Field Index: 1 Field Name: rating_description --------------------------------------------------------- Record { 0:[rating_code]:STRING=[B]:String 1:[rating_description]:STRING=[Class B]:String } Record Lineage File: example\data\input\rating-table-01.csv File Line: 2 File Column: 0 Record: 1 Field Lineage rating_code File: example\data\input\rating-table-01.csv File Line: 2 File Column: 0 Record: 1 Field Index: 0 Field Name: rating_code rating_description File: example\data\input\rating-table-01.csv File Line: 2 File Column: 2 Record: 1 Field Index: 1 Field Name: rating_description --------------------------------------------------------- Record { 0:[rating_code]:STRING=[C]:String 1:[rating_description]:STRING=[Class C]:String } Record Lineage File: example\data\input\rating-table-01.csv File Line: 3 File Column: 0 Record: 2 Field Lineage rating_code File: example\data\input\rating-table-01.csv File Line: 3 File Column: 0 Record: 2 Field Index: 0 Field Name: rating_code rating_description File: example\data\input\rating-table-01.csv File Line: 3 File Column: 2 Record: 2 Field Index: 1 Field Name: rating_description ---------------------------------------------------------