Use Data Lineage with OrcReader

Updated: Feb 21, 2022
package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;
import com.northconcepts.datapipeline.orc.OrcDataReader;

public class UseDataLineageWithOrcReader {

    public static void main(String[] args) {
        DataReader reader = new OrcDataReader(new File("example/data/input/input_orc_file.orc"))
                                .setSaveLineage(true); // Enable lineage support. (By default, it is disabled.)
        Job.run(reader, new LineageWriter());
    }

    public final static class LineageWriter extends DataWriter {

        @Override
        protected void writeImpl(Record record) throws Throwable {
            System.out.println(record);

            RecordLineage recordLineage = new RecordLineage().setRecord(record);

            System.out.println("Record Lineage");
            System.out.println("    File: " + recordLineage.getFile());
            System.out.println("    Record: " + recordLineage.getRecordNumber());
            System.out.println("    Schema: " + recordLineage.getRecordSchema());
            System.out.println();

            FieldLineage fieldLineage = new FieldLineage();

            System.out.println("Field Lineage");
            for (int i = 0; i < record.getFieldCount(); i++) {
                Field field = record.getField(i);
                fieldLineage.setField(field);
                System.out.println("    " + field.getName());
                System.out.println("        File: " + fieldLineage.getFile());
                System.out.println("        Record: " + fieldLineage.getRecordNumber());
                System.out.println("        Field Index: " + fieldLineage.getOriginalFieldIndex());
                System.out.println("        Field Name: " + fieldLineage.getOriginalFieldName());
                System.out.println("        Field Schema: " + fieldLineage.getFieldSchema());
            }
            System.out.println("---------------------------------------------------------");
            System.out.println();
        }

    }
}

Mobile Analytics