Use Data Lineage with JdbcReader
Updated: Aug 28, 2023
This example shows how to use data lineage with JdbcReader to obtain records from the database. Data lineage is metadata added to records and fields indicating where they were loaded from. It is useful for audits and reconciliation as well as troubleshooting.
Data lineage can also be used with other readers, for example, Data Lineage with CSVReader and Data Lineage with FixedWidthReader.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Field; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.lineage.FieldLineage; import com.northconcepts.datapipeline.lineage.RecordLineage; public class UseDataLineageWithJdbcReader { public static void main(String[] args) throws Throwable { final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver"; final String DATABASE_URL = "jdbc:hsqldb:mem:aname"; final String DATABASE_USERNAME = "sa"; final String DATABASE_PASSWORD = ""; final String DATABASE_TABLE = "CreditBalance"; Class.forName(DATABASE_DRIVER); Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD); createTable(connection); DataReader reader = new JdbcReader(connection, "SELECT * FROM " + DATABASE_TABLE) .setSaveLineage(true); Job.run(reader, new LineageWriter()); } public static void createTable(Connection connection) throws Throwable{ PreparedStatement preparedStatement; String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;"; preparedStatement = connection.prepareStatement(dropTableQuery); preparedStatement.execute(); String createTableQuery = "CREATE TABLE CreditBalance (" + "Id INTEGER, " + "Name VARCHAR(256), " + "Balance DOUBLE, " + "CreditLimit DOUBLE, " + "PRIMARY KEY (Id));"; preparedStatement = connection.prepareStatement(createTableQuery); preparedStatement.execute(); String insertDataQuery = "INSERT INTO CreditBalance values" + "(1, 'Harry Potter', 20000, 100000), " + "(2, 'Harmione', 1500, 1000), " + "(3, 'Dumbledore', 5000, 100000) "; preparedStatement = connection.prepareStatement(insertDataQuery); preparedStatement.execute(); } public final static class LineageWriter extends DataWriter { @Override protected void writeImpl(Record record) throws Throwable { System.out.println(record); RecordLineage recordLineage = new RecordLineage().setRecord(record); System.out.println("Record Lineage"); System.out.println(" Database URL: " + recordLineage.getDatabaseUrl()); System.out.println(" Database Query: " + recordLineage.getDatabaseQuery()); System.out.println(" Record: " + recordLineage.getRecordNumber()); System.out.println(); FieldLineage fieldLineage = new FieldLineage(); System.out.println("Field Lineage"); for (int i=0; i < record.getFieldCount(); i++) { Field field = record.getField(i); fieldLineage.setField(field); System.out.println(" " + field.getName()); System.out.println(" Database URL: " + fieldLineage.getDatabaseUrl()); System.out.println(" Database Query: " + fieldLineage.getDatabaseQuery()); System.out.println(" Database Column Name: " + fieldLineage.getDatabaseColumnName()); System.out.println(" Record: " + fieldLineage.getRecordNumber()); System.out.println(" Field Index: " + fieldLineage.getOriginalFieldIndex()); System.out.println(" Field Name: " + fieldLineage.getOriginalFieldName()); } System.out.println("---------------------------------------------------------"); System.out.println(); } } }
Code Walkthrough
- HSQL database connection parameters are introduced first.
- A connection instance is created by calling
DriverManager.getConnection()
method. - In a separate
createTable()
method, queries for dropping, creating, and inserting into "CreditBalance" table are executed sequentially. - JdbcReader is used to obtain the record from the database table "CreditBalance".
setSaveLineage(true)
enable lineage support since it is turned off by default.Job.run()
method which transfers data from thereader
to theLineageWriter()
is then called.
RecordLineage
RecordLineage
informs us of the starting location where the record was loaded.recordLineage.getFile()
- The java.io.File, if one was used to create the DataReader.recordLineage.getFileLineNumber()
-The line number in the input file starting with 0.recordLineage.getFileColumnNumber()
-The column number in the input file starting with 0.recordLineage.getRecordNumber()
-The sequential record number starting with 0.
FieldLineage
FieldLineage
informs us of the starting location for each individual fieldfieldLineage.getOriginalFieldIndex()
-The index of a field set by the DataReader before any transformation or operation was performed.fieldLineage.getOriginalFieldName()
-The name of a field set by the DataReader before any transformation or operation was performed.
Console Output
Record (MODIFIED) { 0:[ID]:INT=[1]:Integer 1:[NAME]:STRING=[Harry Potter]:String 2:[BALANCE]:DOUBLE=[20000.0]:Double 3:[CREDITLIMIT]:DOUBLE=[100000.0]:Double } Record Lineage Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Record: 0 Field Lineage ID Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: ID Record: 0 Field Index: 0 Field Name: ID NAME Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: NAME Record: 0 Field Index: 1 Field Name: NAME BALANCE Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: BALANCE Record: 0 Field Index: 2 Field Name: BALANCE CREDITLIMIT Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: CREDITLIMIT Record: 0 Field Index: 3 Field Name: CREDITLIMIT --------------------------------------------------------- Record (MODIFIED) { 0:[ID]:INT=[2]:Integer 1:[NAME]:STRING=[Harmione]:String 2:[BALANCE]:DOUBLE=[1500.0]:Double 3:[CREDITLIMIT]:DOUBLE=[1000.0]:Double } Record Lineage Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Record: 1 Field Lineage ID Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: ID Record: 1 Field Index: 0 Field Name: ID NAME Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: NAME Record: 1 Field Index: 1 Field Name: NAME BALANCE Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: BALANCE Record: 1 Field Index: 2 Field Name: BALANCE CREDITLIMIT Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: CREDITLIMIT Record: 1 Field Index: 3 Field Name: CREDITLIMIT --------------------------------------------------------- Record (MODIFIED) { 0:[ID]:INT=[3]:Integer 1:[NAME]:STRING=[Dumbledore]:String 2:[BALANCE]:DOUBLE=[5000.0]:Double 3:[CREDITLIMIT]:DOUBLE=[100000.0]:Double } Record Lineage Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Record: 2 Field Lineage ID Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: ID Record: 2 Field Index: 0 Field Name: ID NAME Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: NAME Record: 2 Field Index: 1 Field Name: NAME BALANCE Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: BALANCE Record: 2 Field Index: 2 Field Name: BALANCE CREDITLIMIT Database URL: jdbc:hsqldb:mem:aname Database Query: SELECT * FROM CreditBalance Database Column Name: CREDITLIMIT Record: 2 Field Index: 3 Field Name: CREDITLIMIT ---------------------------------------------------------