Use Data Lineage with Lookup
Updated: Feb 21, 2022
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Field; import com.northconcepts.datapipeline.core.FieldList; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.lineage.FieldLineage; import com.northconcepts.datapipeline.lineage.RecordLineage; import com.northconcepts.datapipeline.transform.TransformingReader; import com.northconcepts.datapipeline.transform.lookup.DataReaderLookup; import com.northconcepts.datapipeline.transform.lookup.Lookup; import com.northconcepts.datapipeline.transform.lookup.LookupTransformer; public class UseDataLineageWithLookup { public static void main(String[] args) { DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv")) .setFieldNamesInFirstRow(true) .setSaveLineage(true); /* This lookup matches [credit-balance-01.csv].[Rating] to [rating-table-01.csv].[rating_code] * to return [rating-table-01.csv].[rating_description] */ Lookup lookup = new DataReaderLookup( new CSVReader(new File("example/data/input/rating-table-01.csv")) .setFieldNamesInFirstRow(true) .setSaveLineage(true), new FieldList("rating_code"), new FieldList("rating_description") ); reader = new TransformingReader(reader) .add(new LookupTransformer(new FieldList("Rating"), lookup)); Job.run(reader, new LineageWriter()); } public final static class LineageWriter extends DataWriter { @Override protected void writeImpl(Record record) throws Throwable { System.out.println(record); RecordLineage recordLineage = new RecordLineage().setRecord(record); System.out.println("Record Lineage"); System.out.println(" File: " + recordLineage.getFile()); System.out.println(" File Line: " + recordLineage.getFileLineNumber()); System.out.println(" File Column: " + recordLineage.getFileColumnNumber()); System.out.println(" Record: " + recordLineage.getRecordNumber()); System.out.println(); FieldLineage fieldLineage = new FieldLineage(); System.out.println("Field Lineage"); for (int i=0; i < record.getFieldCount(); i++) { Field field = record.getField(i); fieldLineage.setField(field); System.out.println(" " + field.getName()); System.out.println(" File: " + fieldLineage.getFile()); System.out.println(" File Line: " + fieldLineage.getFileLineNumber()); System.out.println(" File Column: " + fieldLineage.getFileColumnNumber()); System.out.println(" Record: " + fieldLineage.getRecordNumber()); System.out.println(" Field Index: " + fieldLineage.getOriginalFieldIndex()); System.out.println(" Field Name: " + fieldLineage.getOriginalFieldName()); } System.out.println("---------------------------------------------------------"); System.out.println(); } } }