Use Data Lineage with JdbcReader

Updated: Aug 28, 2023

This example shows how to use data lineage with JdbcReader to obtain records from the database. Data lineage is metadata added to records and fields indicating where they were loaded from. It is useful for audits and reconciliation as well as troubleshooting.

Data lineage can also be used with other readers, for example, Data Lineage with CSVReader and Data Lineage with FixedWidthReader.

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;

public class UseDataLineageWithJdbcReader {

    public static void main(String[] args) throws Throwable {
        
        final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver";
        final String DATABASE_URL = "jdbc:hsqldb:mem:aname";
        final String DATABASE_USERNAME = "sa";
        final String DATABASE_PASSWORD = "";
        final String DATABASE_TABLE = "CreditBalance";

        Class.forName(DATABASE_DRIVER);
        Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD);

        createTable(connection);
        
        DataReader reader = new JdbcReader(connection, "SELECT * FROM " + DATABASE_TABLE) 
                                .setSaveLineage(true);
        
        Job.run(reader, new LineageWriter());
    }
    
    public static void createTable(Connection connection) throws Throwable{
        PreparedStatement preparedStatement;
        String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;";

        preparedStatement = connection.prepareStatement(dropTableQuery);
        preparedStatement.execute();
        
        String createTableQuery = "CREATE TABLE CreditBalance ("
                + "Id INTEGER, "
                + "Name VARCHAR(256), "
                + "Balance DOUBLE, "
                + "CreditLimit DOUBLE, "
                + "PRIMARY KEY (Id));";

        preparedStatement = connection.prepareStatement(createTableQuery);
        preparedStatement.execute();
        
        String insertDataQuery = "INSERT INTO CreditBalance values"
                + "(1, 'Harry Potter', 20000, 100000), "
                + "(2, 'Harmione', 1500, 1000), "
                + "(3, 'Dumbledore', 5000, 100000) ";
        
        preparedStatement = connection.prepareStatement(insertDataQuery);
        preparedStatement.execute();
    }
    
    public final static class LineageWriter extends DataWriter {

        @Override
        protected void writeImpl(Record record) throws Throwable {
            System.out.println(record);
            
            RecordLineage recordLineage = new RecordLineage().setRecord(record);
            
            System.out.println("Record Lineage");
            System.out.println("    Database URL: " + recordLineage.getDatabaseUrl());
            System.out.println("    Database Query: " + recordLineage.getDatabaseQuery());
            System.out.println("    Record: " + recordLineage.getRecordNumber());
            
            System.out.println();
            
            FieldLineage fieldLineage = new FieldLineage();
            
            System.out.println("Field Lineage");
            for (int i=0; i < record.getFieldCount(); i++) {
                Field field = record.getField(i);
                fieldLineage.setField(field);
                
                System.out.println("    " + field.getName());
                System.out.println("        Database URL: " + fieldLineage.getDatabaseUrl());
                System.out.println("        Database Query: " + fieldLineage.getDatabaseQuery());
                System.out.println("        Database Column Name: " + fieldLineage.getDatabaseColumnName());
                System.out.println("        Record: " + fieldLineage.getRecordNumber());
                System.out.println("        Field Index: " + fieldLineage.getOriginalFieldIndex());
                System.out.println("        Field Name: " + fieldLineage.getOriginalFieldName());
            }
            System.out.println("---------------------------------------------------------");
            System.out.println();
        }
        
    }
}

 

Code Walkthrough

  1. HSQL database connection parameters are introduced first.
  2. A connection instance is created by calling DriverManager.getConnection() method.
  3. In a separate createTable() method, queries for dropping, creating, and inserting into "CreditBalance" table are executed sequentially. 
  4. JdbcReader is used to obtain the record from the database table "CreditBalance".
  5. setSaveLineage(true) enable lineage support since it is turned off by default.
  6. Job.run() method which transfers data from the reader to the LineageWriter() is then called.

RecordLineage

  1. RecordLineage informs us of the starting location where the record was loaded.
  2. recordLineage.getFile() - The java.io.File, if one was used to create the DataReader.
  3. recordLineage.getFileLineNumber() -The line number in the input file starting with 0.
  4. recordLineage.getFileColumnNumber() -The column number in the input file starting with 0.
  5. recordLineage.getRecordNumber() -The sequential record number starting with 0.

FieldLineage

  1. FieldLineage informs us of the starting location for each individual field
  2. fieldLineage.getOriginalFieldIndex() -The index of a field set by the DataReader before any transformation or operation was performed.
  3. fieldLineage.getOriginalFieldName() -The name of a field set by the DataReader before any transformation or operation was performed.

 

Console Output

Record (MODIFIED) {
    0:[ID]:INT=[1]:Integer
    1:[NAME]:STRING=[Harry Potter]:String
    2:[BALANCE]:DOUBLE=[20000.0]:Double
    3:[CREDITLIMIT]:DOUBLE=[100000.0]:Double
}

Record Lineage
    Database URL: jdbc:hsqldb:mem:aname
    Database Query: SELECT * FROM CreditBalance
    Record: 0

Field Lineage
    ID
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: ID
        Record: 0
        Field Index: 0
        Field Name: ID
    NAME
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: NAME
        Record: 0
        Field Index: 1
        Field Name: NAME
    BALANCE
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: BALANCE
        Record: 0
        Field Index: 2
        Field Name: BALANCE
    CREDITLIMIT
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: CREDITLIMIT
        Record: 0
        Field Index: 3
        Field Name: CREDITLIMIT
---------------------------------------------------------

Record (MODIFIED) {
    0:[ID]:INT=[2]:Integer
    1:[NAME]:STRING=[Harmione]:String
    2:[BALANCE]:DOUBLE=[1500.0]:Double
    3:[CREDITLIMIT]:DOUBLE=[1000.0]:Double
}

Record Lineage
    Database URL: jdbc:hsqldb:mem:aname
    Database Query: SELECT * FROM CreditBalance
    Record: 1

Field Lineage
    ID
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: ID
        Record: 1
        Field Index: 0
        Field Name: ID
    NAME
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: NAME
        Record: 1
        Field Index: 1
        Field Name: NAME
    BALANCE
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: BALANCE
        Record: 1
        Field Index: 2
        Field Name: BALANCE
    CREDITLIMIT
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: CREDITLIMIT
        Record: 1
        Field Index: 3
        Field Name: CREDITLIMIT
---------------------------------------------------------

Record (MODIFIED) {
    0:[ID]:INT=[3]:Integer
    1:[NAME]:STRING=[Dumbledore]:String
    2:[BALANCE]:DOUBLE=[5000.0]:Double
    3:[CREDITLIMIT]:DOUBLE=[100000.0]:Double
}

Record Lineage
    Database URL: jdbc:hsqldb:mem:aname
    Database Query: SELECT * FROM CreditBalance
    Record: 2

Field Lineage
    ID
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: ID
        Record: 2
        Field Index: 0
        Field Name: ID
    NAME
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: NAME
        Record: 2
        Field Index: 1
        Field Name: NAME
    BALANCE
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: BALANCE
        Record: 2
        Field Index: 2
        Field Name: BALANCE
    CREDITLIMIT
        Database URL: jdbc:hsqldb:mem:aname
        Database Query: SELECT * FROM CreditBalance
        Database Column Name: CREDITLIMIT
        Record: 2
        Field Index: 3
        Field Name: CREDITLIMIT
---------------------------------------------------------
Mobile Analytics