Use Data Lineage with JdbcReader

Updated: Feb 21, 2022
/*
 * Copyright (c) 2006-2022 North Concepts Inc.  All rights reserved.
 * Proprietary and Confidential.  Use is subject to license terms.
 * 
 * https://northconcepts.com/data-pipeline/licensing/
 */
package com.northconcepts.datapipeline.examples.cookbook;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;

public class UseDataLineageWithJdbcReader {

    public static void main(String[] args) throws Throwable {
        
        final String DATABASE_DRIVER = "org.hsqldb.jdbcDriver";
        final String DATABASE_URL = "jdbc:hsqldb:mem:aname";
        final String DATABASE_USERNAME = "sa";
        final String DATABASE_PASSWORD = "";
        final String DATABASE_TABLE = "CreditBalance";

        Class.forName(DATABASE_DRIVER);
        Connection connection = DriverManager.getConnection(DATABASE_URL, DATABASE_USERNAME, DATABASE_PASSWORD);

        createTable(connection);
        
        DataReader reader = new JdbcReader(connection, "SELECT * FROM " + DATABASE_TABLE) 
                                .setSaveLineage(true);
        
        Job.run(reader, new LineageWriter());
    }
    
    public static void createTable(Connection connection) throws Throwable{
        PreparedStatement preparedStatement;
        String dropTableQuery = "DROP TABLE CreditBalance IF EXISTS;";

        preparedStatement = connection.prepareStatement(dropTableQuery);
        preparedStatement.execute();
        
        String createTableQuery = "CREATE TABLE CreditBalance ("
                + "Id INTEGER, "
                + "Name VARCHAR(256), "
                + "Balance DOUBLE, "
                + "CreditLimit DOUBLE, "
                + "PRIMARY KEY (Id));";

        preparedStatement = connection.prepareStatement(createTableQuery);
        preparedStatement.execute();
        
        String insertDataQuery = "INSERT INTO CreditBalance values"
                + "(1, 'Harry Potter', 20000, 100000), "
                + "(2, 'Harmione', 1500, 1000), "
                + "(3, 'Dumbledore', 5000, 100000) ";
        
        preparedStatement = connection.prepareStatement(insertDataQuery);
        preparedStatement.execute();
    }
    
    public final static class LineageWriter extends DataWriter {

        @Override
        protected void writeImpl(Record record) throws Throwable {
            System.out.println(record);
            
            RecordLineage recordLineage = new RecordLineage().setRecord(record);
            
            System.out.println("Record Lineage");
            System.out.println("    Database URL: " + recordLineage.getDatabaseUrl());
            System.out.println("    Database Query: " + recordLineage.getDatabaseQuery());
            System.out.println("    Record: " + recordLineage.getRecordNumber());
            
            System.out.println();
            
            FieldLineage fieldLineage = new FieldLineage();
            
            System.out.println("Field Lineage");
            for (int i=0; i < record.getFieldCount(); i++) {
                Field field = record.getField(i);
                fieldLineage.setField(field);
                
                System.out.println("    " + field.getName());
                System.out.println("        Database URL: " + fieldLineage.getDatabaseUrl());
                System.out.println("        Database Query: " + fieldLineage.getDatabaseQuery());
                System.out.println("        Database Column Name: " + fieldLineage.getDatabaseColumnName());
                System.out.println("        Record: " + fieldLineage.getRecordNumber());
                System.out.println("        Field Index: " + fieldLineage.getOriginalFieldIndex());
                System.out.println("        Field Name: " + fieldLineage.getOriginalFieldName());
            }
            System.out.println("---------------------------------------------------------");
            System.out.println();
        }
        
    }
}

All Examples

Mobile Analytics