Use Data Lineage with Lookup
Updated: Feb 21, 2022
package com.northconcepts.datapipeline.examples.cookbook;
import java.io.File;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.FieldList;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.lineage.FieldLineage;
import com.northconcepts.datapipeline.lineage.RecordLineage;
import com.northconcepts.datapipeline.transform.TransformingReader;
import com.northconcepts.datapipeline.transform.lookup.DataReaderLookup;
import com.northconcepts.datapipeline.transform.lookup.Lookup;
import com.northconcepts.datapipeline.transform.lookup.LookupTransformer;
public class UseDataLineageWithLookup {
public static void main(String[] args) {
DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
.setFieldNamesInFirstRow(true)
.setSaveLineage(true);
/* This lookup matches [credit-balance-01.csv].[Rating] to [rating-table-01.csv].[rating_code]
* to return [rating-table-01.csv].[rating_description]
*/
Lookup lookup = new DataReaderLookup(
new CSVReader(new File("example/data/input/rating-table-01.csv"))
.setFieldNamesInFirstRow(true)
.setSaveLineage(true),
new FieldList("rating_code"),
new FieldList("rating_description")
);
reader = new TransformingReader(reader)
.add(new LookupTransformer(new FieldList("Rating"), lookup));
Job.run(reader, new LineageWriter());
}
public final static class LineageWriter extends DataWriter {
@Override
protected void writeImpl(Record record) throws Throwable {
System.out.println(record);
RecordLineage recordLineage = new RecordLineage().setRecord(record);
System.out.println("Record Lineage");
System.out.println(" File: " + recordLineage.getFile());
System.out.println(" File Line: " + recordLineage.getFileLineNumber());
System.out.println(" File Column: " + recordLineage.getFileColumnNumber());
System.out.println(" Record: " + recordLineage.getRecordNumber());
System.out.println();
FieldLineage fieldLineage = new FieldLineage();
System.out.println("Field Lineage");
for (int i=0; i < record.getFieldCount(); i++) {
Field field = record.getField(i);
fieldLineage.setField(field);
System.out.println(" " + field.getName());
System.out.println(" File: " + fieldLineage.getFile());
System.out.println(" File Line: " + fieldLineage.getFileLineNumber());
System.out.println(" File Column: " + fieldLineage.getFileColumnNumber());
System.out.println(" Record: " + fieldLineage.getRecordNumber());
System.out.println(" Field Index: " + fieldLineage.getOriginalFieldIndex());
System.out.println(" Field Name: " + fieldLineage.getOriginalFieldName());
}
System.out.println("---------------------------------------------------------");
System.out.println();
}
}
}
