Measure Data Lineage Performance
This example measures the performance of DataPipeline's Data lineage feature. This involves adding metadata to records and fields to track their origin within a data flow.
Data lineage can be useful for audits and reconciliation as well as troubleshooting. This feature is also crucial for establishing and maintaining data governance frameworks. The library allows users to capture and analyze data lineage information, ensuring transparency, accountability, and effective data governance practices.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.NullWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; public class MeasureDataLineagePerformance { public static void main(String[] args) { warmupJvm(); readCsv("1000_Sales_Records.csv", false); readCsv("1000_Sales_Records.csv", true); readCsv("100000_Sales_Records.csv", false); readCsv("100000_Sales_Records.csv", true); } private static void warmupJvm() { System.out.println("Warming up JVM"); for (int i = 0; i < 3; i++) { DataReader reader = new CSVReader(new File("example/data/input/100000_Sales_Records.csv")) .setFieldNamesInFirstRow(true) .setSaveLineage(true); Job.run(reader, new NullWriter()); } } private static void readCsv(String fileName, boolean saveLineage) { System.out.println("--------------------------------------------------------"); DataReader reader = new CSVReader(new File("example/data/input/" + fileName)) .setFieldNamesInFirstRow(true) .setSaveLineage(saveLineage); Job job = Job.run(reader, new NullWriter()); System.out.println("Lineage enabled: " + saveLineage); System.out.println("Records: " + job.getRecordsTransferred()); System.out.println("Time: " + job.getRunningTimeAsString()); } }
Code Walkthrough
warmupJvm()
as the name suggests is used to warm up the JVM before the actual measurement starts.- Inside the for loop, CSVReader is created corresponding to the input file
100000_Sales_Records.csv
. - The
CSVReader.setFieldNamesInFirstRow(true)
method is invoked to specify that the names specified in the first row should be used as field names. setSaveLineage(true)
is used when metadata should be added.- Data is transferred from the reader to the new NullWriter() via Job.run() method.
readCsv()
methods receive two arguments i.e. the filename and a boolean that determines if metadata should be added. This is very useful as you will compare the performance in both cases i.e. when metadata is added and also when it is disabled.job.getRecordsTransferred()
returns the number of records in that specific file.job.getRunningTimeAsString()
returns the running time.
CSVReader
CSVReader is an input reader which can be used to read CSV files. It is a sub-class of TextReader and inherits the open and close among other methods. The CSVReader.setFieldNamesInFirstRow(true)
method causes the CSVReader to use the names specified in the first row of the input data as field names. If this method is not invoked, the fields would be named as A1, A2, etc. similar to MS Excel. If those field names need to be changed, a rename transformation can be added on top of CSVReader or any other type (Refer Rename a field for example).
Console Output
> Task :MeasureDataLineagePerformance.main() Warming up JVM 19:02:05,562 DEBUG [main] datapipeline:37 - DataPipeline v7.2.0-SNAPSHOT by North Concepts Inc. 19:02:05,815 DEBUG [main] datapipeline:615 - Job[1,job-1,Wed May 25 19:02:05 EAT 2022]::Start 19:02:07,937 DEBUG [main] datapipeline:661 - job::Success 19:02:07,938 DEBUG [main] datapipeline:615 - Job[2,job-2,Wed May 25 19:02:07 EAT 2022]::Start 19:02:09,645 DEBUG [main] datapipeline:661 - job::Success 19:02:09,646 DEBUG [main] datapipeline:615 - Job[3,job-3,Wed May 25 19:02:09 EAT 2022]::Start 19:02:11,384 DEBUG [main] datapipeline:661 - job::Success -------------------------------------------------------- 19:02:11,386 DEBUG [main] datapipeline:615 - Job[4,job-4,Wed May 25 19:02:11 EAT 2022]::Start 19:02:11,396 DEBUG [main] datapipeline:661 - job::Success Lineage enabled: false Records: 1000 Time: 10 Milliseconds -------------------------------------------------------- 19:02:11,398 DEBUG [main] datapipeline:615 - Job[5,job-5,Wed May 25 19:02:11 EAT 2022]::Start 19:02:11,423 DEBUG [main] datapipeline:661 - job::Success Lineage enabled: true Records: 1000 Time: 25 Milliseconds -------------------------------------------------------- 19:02:11,425 DEBUG [main] datapipeline:615 - Job[6,job-6,Wed May 25 19:02:11 EAT 2022]::Start 19:02:11,920 DEBUG [main] datapipeline:661 - job::Success Lineage enabled: false Records: 100000 Time: 495 Milliseconds -------------------------------------------------------- 19:02:11,922 DEBUG [main] datapipeline:615 - Job[7,job-7,Wed May 25 19:02:11 EAT 2022]::Start 19:02:13,581 DEBUG [main] datapipeline:661 - job::Success Lineage enabled: true Records: 100000 Time: 1 Second, 660 Milliseconds