Use Data Lineage with CsvReader

Updated: May 30, 2022

For this example we are going to see how we are going to use DataPipline with Data lineage which is a metadata added to records and fields indicating where they were loaded from. It can be useful for audits and reconciliation as well as troubleshooting.

Data lineage can also be used with other readers, for example Data Lineage with excel reader and Data Lineage with fixedwidthreader.

Input CSV file

rating_code,rating_description
A,Class A
B,Class B
C,Class C

Java Code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;

public class UseDataLineageWithCsvReader {

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("example/data/input/rating-table-01.csv"))
                .setFieldNamesInFirstRow(true)
                .setSaveLineage(true); // Enable lineage support. (By default, it is disabled.) 
                                          // If lineage is not supported by reader, exception will be thrown.
        
        Job.run(reader, new LineageWriter());
    }
    
    public final static class LineageWriter extends DataWriter {

        @Override
        protected void writeImpl(Record record) throws Throwable {
            System.out.println(record);
            
            RecordLineage recordLineage = new RecordLineage().setRecord(record);
            
            System.out.println("Record Lineage");
            System.out.println("    File: " + recordLineage.getFile());
            System.out.println("    File Line: " + recordLineage.getFileLineNumber());
            System.out.println("    File Column: " + recordLineage.getFileColumnNumber());
            System.out.println("    Record: " + recordLineage.getRecordNumber());
            
            System.out.println();
            
            FieldLineage fieldLineage = new FieldLineage();
            
            System.out.println("Field Lineage");
            for (int i=0; i < record.getFieldCount(); i++) {
                Field field = record.getField(i);
                fieldLineage.setField(field);
                System.out.println("    " + field.getName());
                System.out.println("        File: " + fieldLineage.getFile());
                System.out.println("        File Line: " + fieldLineage.getFileLineNumber());
                System.out.println("        File Column: " + fieldLineage.getFileColumnNumber());
                System.out.println("        Record: " + fieldLineage.getRecordNumber());
                System.out.println("        Field Index: " + fieldLineage.getOriginalFieldIndex());
                System.out.println("        Field Name: " + fieldLineage.getOriginalFieldName());
            }
            System.out.println("---------------------------------------------------------");
            System.out.println();
        }
        
    }
}

Code walkthrough

  1. First we create a CSVReader that receives input from rating-table-01.csv and the set the field name to be displayed in the first row.
  2. Job.run() method which transfers data from the reader to the LineageWriter()is then called.
  3. RecordLineage informs us of the starting location where the record was loaded from while FieldLineage informs us of the starting location for each individual field.

Record Lineage Properties

  1. recordLineage.getFile() - The java.io.File, if one was used to create the DataReader.
  2. recordLineage.getFileLineNumber() -The line number in the input file starting with 0.
  3. recordLineage.getFileColumnNumber() -The column number in the input file starting with 0.
  4. recordLineage.getRecordNumber() -The sequential record number starting with 0.

Field Lineage Properties

  1. fieldLineage.getOriginalFieldIndex() -The index of a field set by the DataReader before any transformation or operation was performed.
  2. fieldLineage.getOriginalFieldName() -The name of a field set by the DataReader before any transformation or operation was performed.

Output

Record {
    0:[rating_code]:STRING=[A]:String
    1:[rating_description]:STRING=[Class A]:String
}

Record Lineage
    File: example\data\input\rating-table-01.csv
    File Line: 1
    File Column: 0
    Record: 0

Field Lineage
    rating_code
        File: example\data\input\rating-table-01.csv
        File Line: 1
        File Column: 0
        Record: 0
        Field Index: 0
        Field Name: rating_code
    rating_description
        File: example\data\input\rating-table-01.csv
        File Line: 1
        File Column: 2
        Record: 0
        Field Index: 1
        Field Name: rating_description
---------------------------------------------------------

Record {
    0:[rating_code]:STRING=[B]:String
    1:[rating_description]:STRING=[Class B]:String
}

Record Lineage
    File: example\data\input\rating-table-01.csv
    File Line: 2
    File Column: 0
    Record: 1

Field Lineage
    rating_code
        File: example\data\input\rating-table-01.csv
        File Line: 2
        File Column: 0
        Record: 1
        Field Index: 0
        Field Name: rating_code
    rating_description
        File: example\data\input\rating-table-01.csv
        File Line: 2
        File Column: 2
        Record: 1
        Field Index: 1
        Field Name: rating_description
---------------------------------------------------------

Record {
    0:[rating_code]:STRING=[C]:String
    1:[rating_description]:STRING=[Class C]:String
}

Record Lineage
    File: example\data\input\rating-table-01.csv
    File Line: 3
    File Column: 0
    Record: 2

Field Lineage
    rating_code
        File: example\data\input\rating-table-01.csv
        File Line: 3
        File Column: 0
        Record: 2
        Field Index: 0
        Field Name: rating_code
    rating_description
        File: example\data\input\rating-table-01.csv
        File Line: 3
        File Column: 2
        Record: 2
        Field Index: 1
        Field Name: rating_description
---------------------------------------------------------

Mobile Analytics