Generate Parquet Schema By Analyzing All Records
Updated: Jun 1, 2023
In this example you are going to learn how you can create a Parquet schema by analyzing all records.
Parquet is an open-source, column-oriented data file format built for efficient data storage and retrieval.
Check Parquet Examples for more similar examples.
Input CSV file
The input CSV file contains 1000 records.
Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit Middle East and North Africa,Libya,Cosmetics,Offline,M,10/18/2014,686800706,10/31/2014,8446,437.20,263.33,3692591.20,2224085.18,1468506.02 North America,Canada,Vegetables,Online,M,11/7/2011,185941302,12/8/2011,3018,154.06,90.93,464953.08,274426.74,190526.34 Middle East and North Africa,Libya,Baby Food,Offline,C,10/31/2016,246222341,12/9/2016,1517,255.28,159.42,387259.76,241840.14,145419.62 Asia,Japan,Cereal,Offline,C,4/10/2010,161442649,5/12/2010,3322,205.70,117.11,683335.40,389039.42,294295.98 . . .
Java code listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.Field; import com.northconcepts.datapipeline.core.ProxyReader; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class GenerateParquetSchemaByAnalyzingAllRecords { private static final Long RECORDS_TO_ANALYZE = null; private static final File PARQUET_FILE = new File("example/data/output/GenerateParquetSchemaForAllRecords.parquet"); public static void main(String[] args) { DataReader reader = new CSVReader(new File("example/data/input/1000_Sales_Records.csv")) .setFieldNamesInFirstRow(true); reader = fixFieldNames(reader); ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE) .setMaxRecordsAnalyzed(RECORDS_TO_ANALYZE); Job.run(reader, writer); System.out.println("============================================================"); System.out.println("Prepared Schema"); System.out.println("============================================================"); System.out.println(writer.getSchema()); System.out.println("============================================================"); System.out.println("Read the parquet file"); System.out.println("============================================================"); Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out)); } private static ProxyReader fixFieldNames(DataReader reader) { return new ProxyReader(reader){ @Override protected Record interceptRecord(Record record) throws Throwable { for (Field field : record) { field.setName(field.getName().replace(" ", "_")); } return record; } }; } }
Code Walkthrough
- An
CSVReader
is created corresponding to the input file1000_Sales_Records.csv
. setFieldNamesInFirstRow(true)
is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2, A3 and so on by default.fixFieldNames(reader)
removes the spaces in the field names since Parquet does not support it.ParquetDataWriter
instance that corresponds to the output fileGenerateParquetSchemaFor500Records.parquet
is created. It writes records to Apache Parquet columnar files..setMaxRecordsAnalyzed(RECORDS_TO_ANALYZE)
specifies the number of initial records to be analyzed in order to create the schema. By default, this number is 1000 records.null
means all records will be analyzed.- Data are transferred from
CSVReader
toParquetDataWriter
viaJob.run()
method. See how to compile and run DataPipeline jobs. writer.getSchema()
is used to get the generated schema.ParquetDataReader
is used to read the records from the Parquet file.StreamWriter
is used to write the records to a stream in a human-readable format. For this case it will printed to the console.
Parquet Schema Output
message schema { optional binary Region (STRING); optional binary Country (STRING); optional binary Item_Type (STRING); optional binary Sales_Channel (STRING); optional binary Order_Priority (STRING); optional binary Order_Date (STRING); optional binary Order_ID (STRING); optional binary Ship_Date (STRING); optional binary Units_Sold (STRING); optional binary Unit_Price (STRING); optional binary Unit_Cost (STRING); optional binary Total_Revenue (STRING); optional binary Total_Cost (STRING); optional binary Total_Profit (STRING); }