Use Data Lineage with ParquetReader
This example shows you how to use Data Lineage with ParquetDataReader in Data Pipeline.
Data Lineage is metadata added to records and fields indicating where they were loaded from. It can be useful for audits and reconciliation as well as troubleshooting.
Parquet is an open source, column-oriented data file format built for efficient data storage and retrieval.
Java Code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Field; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.lineage.FieldLineage; import com.northconcepts.datapipeline.lineage.RecordLineage; import com.northconcepts.datapipeline.parquet.ParquetDataReader; public class UseDataLineageWithParquetReader { public static void main(String[] args) { DataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet")) .setSaveLineage(true); // Enable lineage support. (By default, it is disabled.) Job.run(reader, new LineageWriter()); } public final static class LineageWriter extends DataWriter { @Override protected void writeImpl(Record record) throws Throwable { System.out.println(record); RecordLineage recordLineage = new RecordLineage().setRecord(record); System.out.println("Record Lineage"); System.out.println(" File: " + recordLineage.getFile()); System.out.println(" Record: " + recordLineage.getRecordNumber()); System.out.println(" Schema: " + recordLineage.getRecordSchema()); System.out.println(); FieldLineage fieldLineage = new FieldLineage(); System.out.println("Field Lineage"); for (int i = 0; i < record.getFieldCount(); i++) { Field field = record.getField(i); fieldLineage.setField(field); System.out.println(" " + field.getName()); System.out.println(" File: " + fieldLineage.getFile()); System.out.println(" Record: " + fieldLineage.getRecordNumber()); System.out.println(" Field Index: " + fieldLineage.getOriginalFieldIndex()); System.out.println(" Field Name: " + fieldLineage.getOriginalFieldName()); System.out.println(" Field Schema: " + fieldLineage.getFieldSchema()); } System.out.println("---------------------------------------------------------"); System.out.println(); } } }
Code walkthrough
- First, ParquetDataReader is created corresponding to the input file
read_parquet_file.parquet
. - setSaveLineage(true) method enables ParquetDataReader to support lineage(disabled by default).
LineageWriter
extends DataWriter class and overrides writeImpl() method to write the specified record which is transferred from ParquetDataReader viaJob.run()
method.- RecordLineage is created to inform you of the starting location where the record was loaded from.
recordLineage.getFile()
returns a file used to create the DataReader which isread_parquet_file.parquet
in this example.recordLineage.getRecordNumber()
returns the beginning column number in the input file starting with 0.recordLineage.getRecordSchema()
returns the schema for the input Parquet file.- FieldLineage is created to inform you of the starting location for each individual field.
ieldLineage.getRecordNumber()
returns the sequential record number starting with 0.fieldLineage.getOriginalFieldIndex()
returns the index of a field set by the DataReader before any transformation or operation was performed.fieldLineage.getOriginalFieldName()
returns the name of a field set by the DataReader before any transformation or operation was performed.fieldLineage.getFieldSchema()
returns the field's schema for the input Parquet file.- Data are transferred from ParquetDataReader to
LineageWriter
via Job.run() method. See how to compile and run data pipeline jobs.
RecordLineage
Convenience wrapper to read and write record-level data lineage properties.It extends Object class and can be created Record object(optionally you can also set the Record object using setRecord()
method). It is useful to get the starting location where the record was loaded from.
ParquetDataReader
Read records from Apache Parquet columnar files. Using this class you can get the configuration, schema, filter setting and other metadata of a given parquet file.
FieldLineage
Convenience wrapper to read and write field-level data lineage properties. It extends Object class and can created using Field object(optionally you can also set the Record object using setRecord()
method). It is useful to to get the starting location for each individual field.
Field
Field holds persistent key-value data as part of a record.
Console output
Record (MODIFIED) { 0:[id]:INT=[4]:Integer 1:[bool_col]:BOOLEAN=[true]:Boolean 2:[tinyint_col]:INT=[0]:Integer 3:[smallint_col]:INT=[0]:Integer 4:[int_col]:INT=[0]:Integer 5:[bigint_col]:LONG=[0]:Long 6:[float_col]:FLOAT=[0.0]:Float 7:[double_col]:DOUBLE=[0.0]:Double 8:[date_string_col]:STRING=[03/01/09]:String 9:[string_col]:STRING=[0]:String 10:[timestamp_col]:DATETIME=[Sun Mar 01 12:00:00 EAT 2009]:Date } Record Lineage File: example/data/input/read_parquet_file.parquet Record: 0 Schema: message schema { optional int32 id; optional boolean bool_col; optional int32 tinyint_col; optional int32 smallint_col; optional int32 int_col; optional int64 bigint_col; optional float float_col; optional double double_col; optional binary date_string_col; optional binary string_col; optional int96 timestamp_col; } Field Lineage id File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 0 Field Name: id Field Schema: optional int32 id bool_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 1 Field Name: bool_col Field Schema: optional boolean bool_col tinyint_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 2 Field Name: tinyint_col Field Schema: optional int32 tinyint_col smallint_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 3 Field Name: smallint_col Field Schema: optional int32 smallint_col int_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 4 Field Name: int_col Field Schema: optional int32 int_col bigint_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 5 Field Name: bigint_col Field Schema: optional int64 bigint_col float_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 6 Field Name: float_col Field Schema: optional float float_col double_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 7 Field Name: double_col Field Schema: optional double double_col date_string_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 8 Field Name: date_string_col Field Schema: optional binary date_string_col string_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 9 Field Name: string_col Field Schema: optional binary string_col timestamp_col File: example/data/input/read_parquet_file.parquet Record: 0 Field Index: 10 Field Name: timestamp_col Field Schema: optional int96 timestamp_col . . . .
Only some of the outputs are shown for clarity purposes.