Use Data Lineage with ParquetReader

This example shows you how to use Data Lineage with ParquetDataReader in Data Pipeline.

Data Lineage is metadata added to records and fields indicating where they were loaded from. It can be useful for audits and reconciliation as well as troubleshooting.

Parquet is an open source, column-oriented data file format built for efficient data storage and retrieval.

Java Code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;

public class UseDataLineageWithParquetReader {

    public static void main(String[] args) {

        DataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet"))
                                .setSaveLineage(true); // Enable lineage support. (By default, it is disabled.)
        Job.run(reader, new LineageWriter());
    }

    public final static class LineageWriter extends DataWriter {

        @Override
        protected void writeImpl(Record record) throws Throwable {
            System.out.println(record);

            RecordLineage recordLineage = new RecordLineage().setRecord(record);

            System.out.println("Record Lineage");
            System.out.println("    File: " + recordLineage.getFile());
            System.out.println("    Record: " + recordLineage.getRecordNumber());
            System.out.println("    Schema: " + recordLineage.getRecordSchema());
            System.out.println();

            FieldLineage fieldLineage = new FieldLineage();

            System.out.println("Field Lineage");
            for (int i = 0; i < record.getFieldCount(); i++) {
                Field field = record.getField(i);
                fieldLineage.setField(field);
                System.out.println("    " + field.getName());
                System.out.println("        File: " + fieldLineage.getFile());
                System.out.println("        Record: " + fieldLineage.getRecordNumber());
                System.out.println("        Field Index: " + fieldLineage.getOriginalFieldIndex());
                System.out.println("        Field Name: " + fieldLineage.getOriginalFieldName());
                System.out.println("        Field Schema: " + fieldLineage.getFieldSchema());
            }
            System.out.println("---------------------------------------------------------");
            System.out.println();
        }

    }
}

Code walkthrough

  1. First, ParquetDataReader is created corresponding to the input file read_parquet_file.parquet.
  2. setSaveLineage(true) method enables ParquetDataReader to support lineage(disabled by default).
  3. LineageWriter extends DataWriter class and overrides writeImpl() method to write the specified record which is transferred from ParquetDataReader via Job.run() method.
  4. RecordLineage is created to inform you of the starting location where the record was loaded from.
  5. recordLineage.getFile() returns a file used to create the DataReader which is read_parquet_file.parquet in this example.
  6. recordLineage.getRecordNumber() returns the beginning column number in the input file starting with 0.
  7. recordLineage.getRecordSchema() returns the schema for the input Parquet file.
  8. FieldLineage is created to inform you of the starting location for each individual field.
  9. ieldLineage.getRecordNumber() returns the sequential record number starting with 0.
  10. fieldLineage.getOriginalFieldIndex() returns the index of a field set by the DataReader before any transformation or operation was performed.
  11. fieldLineage.getOriginalFieldName() returns the name of a field set by the DataReader before any transformation or operation was performed.
  12. fieldLineage.getFieldSchema() returns the field's schema for the input Parquet file.
  13. Data are transferred from ParquetDataReader to LineageWriter via Job.run() method. See how to compile and run data pipeline jobs.

RecordLineage

Convenience wrapper to read and write record-level data lineage properties.It extends Object class and can be created Record object(optionally you can also set the Record object using setRecord() method). It is useful to get the starting location where the record was loaded from.

ParquetDataReader

Read records from Apache Parquet columnar files. Using this class you can get the configuration, schema, filter setting and other metadata of a given parquet file.

FieldLineage

Convenience wrapper to read and write field-level data lineage properties. It extends Object class and can created using Field object(optionally you can also set the Record object using setRecord() method). It is useful to to get the starting location for each individual field.

Field

Field holds persistent key-value data as part of a record.

Console output

Record (MODIFIED) {
    0:[id]:INT=[4]:Integer
    1:[bool_col]:BOOLEAN=[true]:Boolean
    2:[tinyint_col]:INT=[0]:Integer
    3:[smallint_col]:INT=[0]:Integer
    4:[int_col]:INT=[0]:Integer
    5:[bigint_col]:LONG=[0]:Long
    6:[float_col]:FLOAT=[0.0]:Float
    7:[double_col]:DOUBLE=[0.0]:Double
    8:[date_string_col]:STRING=[03/01/09]:String
    9:[string_col]:STRING=[0]:String
    10:[timestamp_col]:DATETIME=[Sun Mar 01 12:00:00 EAT 2009]:Date
}

Record Lineage
    File: example/data/input/read_parquet_file.parquet
    Record: 0
    Schema: message schema {
      optional int32 id;
      optional boolean bool_col;
      optional int32 tinyint_col;
      optional int32 smallint_col;
      optional int32 int_col;
      optional int64 bigint_col;
      optional float float_col;
      optional double double_col;
      optional binary date_string_col;
      optional binary string_col;
      optional int96 timestamp_col;
}


Field Lineage
    id
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 0
        Field Name: id
        Field Schema: optional int32 id
    bool_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 1
        Field Name: bool_col
        Field Schema: optional boolean bool_col
    tinyint_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 2
        Field Name: tinyint_col
        Field Schema: optional int32 tinyint_col
    smallint_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 3
        Field Name: smallint_col
        Field Schema: optional int32 smallint_col
    int_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 4
        Field Name: int_col
        Field Schema: optional int32 int_col
    bigint_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 5
        Field Name: bigint_col
        Field Schema: optional int64 bigint_col
    float_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 6
        Field Name: float_col
        Field Schema: optional float float_col
    double_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 7
        Field Name: double_col
        Field Schema: optional double double_col
    date_string_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 8
        Field Name: date_string_col
        Field Schema: optional binary date_string_col
    string_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 9
        Field Name: string_col
        Field Schema: optional binary string_col
    timestamp_col
        File: example/data/input/read_parquet_file.parquet
        Record: 0
        Field Index: 10
        Field Name: timestamp_col
        Field Schema: optional int96 timestamp_col
.
.
.
.

Only some of the outputs are shown for clarity purposes.

Mobile Analytics