Measure Data Lineage Performance

This example measures the performance of DataPipeline's Data lineage feature.  This involves adding metadata to records and fields to track their origin within a data flow.

Data lineage can be useful for audits and reconciliation as well as troubleshooting. This feature is also crucial for establishing and maintaining data governance frameworks. The library allows users to capture and analyze data lineage information, ensuring transparency, accountability, and effective data governance practices.

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.NullWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;

public class MeasureDataLineagePerformance {

    public static void main(String[] args) {
        warmupJvm();

        readCsv("1000_Sales_Records.csv", false);
        readCsv("1000_Sales_Records.csv", true);
        readCsv("100000_Sales_Records.csv", false);
        readCsv("100000_Sales_Records.csv", true);
    }

    private static void warmupJvm() {
        System.out.println("Warming up JVM");
        
        for (int i = 0; i < 3; i++) {
            DataReader reader = new CSVReader(new File("example/data/input/100000_Sales_Records.csv"))
                    .setFieldNamesInFirstRow(true)
                    .setSaveLineage(true);
            Job.run(reader, new NullWriter());
        }
    }

    private static void readCsv(String fileName, boolean saveLineage) {
        System.out.println("--------------------------------------------------------");
        DataReader reader = new CSVReader(new File("example/data/input/" + fileName))
                .setFieldNamesInFirstRow(true)
                .setSaveLineage(saveLineage);

        Job job = Job.run(reader, new NullWriter());

        System.out.println("Lineage enabled: " + saveLineage);
        System.out.println("Records: " + job.getRecordsTransferred());
        System.out.println("Time: " + job.getRunningTimeAsString());
    }
    
}

 

Code Walkthrough

  1. warmupJvm() as the name suggests is used to warm up the JVM before the actual measurement starts.
  2. Inside the for loop, CSVReader is created corresponding to the input file 100000_Sales_Records.csv.
  3. The CSVReader.setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  4. setSaveLineage(true) is used when metadata should be added.
  5. Data is transferred from the reader to the new NullWriter() via Job.run() method.
  6. readCsv() methods receive two arguments i.e. the filename and a boolean that determines if metadata should be added. This is very useful as you will compare the performance in both cases i.e. when metadata is added and also when it is disabled.
  7. job.getRecordsTransferred() returns the number of records in that specific file.
  8. job.getRunningTimeAsString() returns the running time.

 

CSVReader

CSVReader is an input reader which can be used to read CSV files. It is a sub-class of TextReader and inherits the open and close among other methods. The CSVReader.setFieldNamesInFirstRow(true) method causes the CSVReader to use the names specified in the first row of the input data as field names. If this method is not invoked, the fields would be named as A1, A2, etc. similar to MS Excel. If those field names need to be changed, a rename transformation can be added on top of CSVReader or any other type (Refer Rename a field for example).

 

Console Output

> Task :MeasureDataLineagePerformance.main()
Warming up JVM
19:02:05,562 DEBUG [main] datapipeline:37 - DataPipeline v7.2.0-SNAPSHOT by North Concepts Inc.
19:02:05,815 DEBUG [main] datapipeline:615 - Job[1,job-1,Wed May 25 19:02:05 EAT 2022]::Start
19:02:07,937 DEBUG [main] datapipeline:661 - job::Success
19:02:07,938 DEBUG [main] datapipeline:615 - Job[2,job-2,Wed May 25 19:02:07 EAT 2022]::Start
19:02:09,645 DEBUG [main] datapipeline:661 - job::Success
19:02:09,646 DEBUG [main] datapipeline:615 - Job[3,job-3,Wed May 25 19:02:09 EAT 2022]::Start
19:02:11,384 DEBUG [main] datapipeline:661 - job::Success
--------------------------------------------------------
19:02:11,386 DEBUG [main] datapipeline:615 - Job[4,job-4,Wed May 25 19:02:11 EAT 2022]::Start
19:02:11,396 DEBUG [main] datapipeline:661 - job::Success
Lineage enabled: false
Records: 1000
Time: 10 Milliseconds
--------------------------------------------------------
19:02:11,398 DEBUG [main] datapipeline:615 - Job[5,job-5,Wed May 25 19:02:11 EAT 2022]::Start
19:02:11,423 DEBUG [main] datapipeline:661 - job::Success
Lineage enabled: true
Records: 1000
Time: 25 Milliseconds
--------------------------------------------------------
19:02:11,425 DEBUG [main] datapipeline:615 - Job[6,job-6,Wed May 25 19:02:11 EAT 2022]::Start
19:02:11,920 DEBUG [main] datapipeline:661 - job::Success
Lineage enabled: false
Records: 100000
Time: 495 Milliseconds
--------------------------------------------------------
19:02:11,922 DEBUG [main] datapipeline:615 - Job[7,job-7,Wed May 25 19:02:11 EAT 2022]::Start
19:02:13,581 DEBUG [main] datapipeline:661 - job::Success
Lineage enabled: true
Records: 100000
Time: 1 Second, 660 Milliseconds
Mobile Analytics