Use Data Lineage with ParquetReader
This example shows you how to use Data Lineage with ParquetDataReader in Data Pipeline.
Data Lineage is metadata added to records and fields indicating where they were loaded from. It can be useful for audits and reconciliation as well as troubleshooting.
Parquet is an open source, column-oriented data file format built for efficient data storage and retrieval.
Java Code listing
package com.northconcepts.datapipeline.examples.cookbook;
import java.io.File;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
public class UseDataLineageWithParquetReader {
public static void main(String[] args) {
DataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet"))
.setSaveLineage(true); // Enable lineage support. (By default, it is disabled.)
Job.run(reader, new LineageWriter());
}
public final static class LineageWriter extends DataWriter {
@Override
protected void writeImpl(Record record) throws Throwable {
System.out.println(record);
RecordLineage recordLineage = new RecordLineage().setRecord(record);
System.out.println("Record Lineage");
System.out.println(" File: " + recordLineage.getFile());
System.out.println(" Record: " + recordLineage.getRecordNumber());
System.out.println(" Schema: " + recordLineage.getRecordSchema());
System.out.println();
FieldLineage fieldLineage = new FieldLineage();
System.out.println("Field Lineage");
for (int i = 0; i < record.getFieldCount(); i++) {
Field field = record.getField(i);
fieldLineage.setField(field);
System.out.println(" " + field.getName());
System.out.println(" File: " + fieldLineage.getFile());
System.out.println(" Record: " + fieldLineage.getRecordNumber());
System.out.println(" Field Index: " + fieldLineage.getOriginalFieldIndex());
System.out.println(" Field Name: " + fieldLineage.getOriginalFieldName());
System.out.println(" Field Schema: " + fieldLineage.getFieldSchema());
}
System.out.println("---------------------------------------------------------");
System.out.println();
}
}
}
Code walkthrough
- First, ParquetDataReader is created corresponding to the input file
read_parquet_file.parquet. - setSaveLineage(true) method enables ParquetDataReader to support lineage(disabled by default).
LineageWriterextends DataWriter class and overrides writeImpl() method to write the specified record which is transferred from ParquetDataReader viaJob.run()method.- RecordLineage is created to inform you of the starting location where the record was loaded from.
recordLineage.getFile()returns a file used to create the DataReader which isread_parquet_file.parquetin this example.recordLineage.getRecordNumber()returns the beginning column number in the input file starting with 0.recordLineage.getRecordSchema()returns the schema for the input Parquet file.- FieldLineage is created to inform you of the starting location for each individual field.
ieldLineage.getRecordNumber()returns the sequential record number starting with 0.fieldLineage.getOriginalFieldIndex()returns the index of a field set by the DataReader before any transformation or operation was performed.fieldLineage.getOriginalFieldName()returns the name of a field set by the DataReader before any transformation or operation was performed.fieldLineage.getFieldSchema()returns the field's schema for the input Parquet file.- Data are transferred from ParquetDataReader to
LineageWritervia Job.run() method. See how to compile and run data pipeline jobs.
RecordLineage
Convenience wrapper to read and write record-level data lineage properties.It extends Object class and can be created Record object(optionally you can also set the Record object using setRecord() method). It is useful to get the starting location where the record was loaded from.
ParquetDataReader
Read records from Apache Parquet columnar files. Using this class you can get the configuration, schema, filter setting and other metadata of a given parquet file.
FieldLineage
Convenience wrapper to read and write field-level data lineage properties. It extends Object class and can created using Field object(optionally you can also set the Record object using setRecord() method). It is useful to to get the starting location for each individual field.
Field
Field holds persistent key-value data as part of a record.
Console output
Record (MODIFIED) {
0:[id]:INT=[4]:Integer
1:[bool_col]:BOOLEAN=[true]:Boolean
2:[tinyint_col]:INT=[0]:Integer
3:[smallint_col]:INT=[0]:Integer
4:[int_col]:INT=[0]:Integer
5:[bigint_col]:LONG=[0]:Long
6:[float_col]:FLOAT=[0.0]:Float
7:[double_col]:DOUBLE=[0.0]:Double
8:[date_string_col]:STRING=[03/01/09]:String
9:[string_col]:STRING=[0]:String
10:[timestamp_col]:DATETIME=[Sun Mar 01 12:00:00 EAT 2009]:Date
}
Record Lineage
File: example/data/input/read_parquet_file.parquet
Record: 0
Schema: message schema {
optional int32 id;
optional boolean bool_col;
optional int32 tinyint_col;
optional int32 smallint_col;
optional int32 int_col;
optional int64 bigint_col;
optional float float_col;
optional double double_col;
optional binary date_string_col;
optional binary string_col;
optional int96 timestamp_col;
}
Field Lineage
id
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 0
Field Name: id
Field Schema: optional int32 id
bool_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 1
Field Name: bool_col
Field Schema: optional boolean bool_col
tinyint_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 2
Field Name: tinyint_col
Field Schema: optional int32 tinyint_col
smallint_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 3
Field Name: smallint_col
Field Schema: optional int32 smallint_col
int_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 4
Field Name: int_col
Field Schema: optional int32 int_col
bigint_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 5
Field Name: bigint_col
Field Schema: optional int64 bigint_col
float_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 6
Field Name: float_col
Field Schema: optional float float_col
double_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 7
Field Name: double_col
Field Schema: optional double double_col
date_string_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 8
Field Name: date_string_col
Field Schema: optional binary date_string_col
string_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 9
Field Name: string_col
Field Schema: optional binary string_col
timestamp_col
File: example/data/input/read_parquet_file.parquet
Record: 0
Field Index: 10
Field Name: timestamp_col
Field Schema: optional int96 timestamp_col
.
.
.
.
Only some of the outputs are shown for clarity purposes.
