Generate Parquet Schema By Analyzing Initial Records

Updated: Jun 4, 2023

In this example you are going to learn how you can create a Parquet schema by analyzing a specified number of initial records.

Parquet is an open-source, column-oriented data file format built for efficient data storage and retrieval.

Check Parquet Examples for more similar examples.

Input CSV file

The input CSV file contains 1000 records. Here is the preview of the records:

Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
Middle East and North Africa,Libya,Cosmetics,Offline,M,10/18/2014,686800706,10/31/2014,8446,437.20,263.33,3692591.20,2224085.18,1468506.02
North America,Canada,Vegetables,Online,M,11/7/2011,185941302,12/8/2011,3018,154.06,90.93,464953.08,274426.74,190526.34
Middle East and North Africa,Libya,Baby Food,Offline,C,10/31/2016,246222341,12/9/2016,1517,255.28,159.42,387259.76,241840.14,145419.62
Asia,Japan,Cereal,Offline,C,4/10/2010,161442649,5/12/2010,3322,205.70,117.11,683335.40,389039.42,294295.98
.
.
.

Java Code Listing

From our DataPipeline library, you can use the following code to generate the parquet schema:

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.ProxyReader;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class GenerateParquetSchemaByAnalyzingInitialRecords {

    private static final Long INITIAL_RECORDS_TO_ANALYZE = 500L;
    
    private static final File PARQUET_FILE = new File("example/data/output/GenerateParquetSchemaFor500Records.parquet");

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("example/data/input/1000_Sales_Records.csv"))
                .setFieldNamesInFirstRow(true);
        
        reader = fixFieldNames(reader);

        ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE);
        writer.setMaxRecordsAnalyzed(INITIAL_RECORDS_TO_ANALYZE);
        Job.run(reader, writer);

        System.out.println("============================================================");
        System.out.println("Prepared Schema");
        System.out.println("============================================================");

        System.out.println(writer.getSchema());

        System.out.println("============================================================");
        System.out.println("Read the parquet file");
        System.out.println("============================================================");

        Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out));
    }
    
    private static ProxyReader fixFieldNames(DataReader reader) {
        return new ProxyReader(reader){
            @Override
            protected Record interceptRecord(Record record) throws Throwable {
                for (Field field : record) {
                    field.setName(field.getName().replace(" ", "_"));
                }
                return record;
            }
        };
    }
    
}

Code Walkthrough

  1. A CSVReader is created corresponding to the input file 1000_Sales_Records.csv.
  2. setFieldNamesInFirstRow(true) is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  3. fixFieldNames(reader) removes the spaces in the field names since Parquet does not support it.
  4. ParquetDataWriter instance is created that corresponds to the output file GenerateParquetSchemaFor500Records.parquet. It writes records to Apache Parquet columnar files.
  5. writer.setMaxRecordsAnalyzed(INITIAL_RECORDS_TO_ANALYZE) specifies the number of initial records to be analyzed in order to create the schema. By default this number is 1000 records.
  6. Data are transferred from CSVReader to ParquetDataWriter via Job.run() method. See how to compile and run DataPipeline jobs.
  7. writer.getSchema() is used to get the generated schema.
  8. ParquetDataReader is used to read the records from the Parquet file.
  9. StreamWriter is used to write the records to a stream in a human-readable format. For this case it will printed to the console.

Parquet Schema Output

message schema { 
optional binary Region (STRING);
optional binary Country (STRING);
optional binary Item_Type (STRING);
optional binary Sales_Channel (STRING);
optional binary Order_Priority (STRING);
optional binary Order_Date (STRING);
optional binary Order_ID (STRING);
optional binary Ship_Date (STRING);
optional binary Units_Sold (STRING);
optional binary Unit_Price (STRING);
optional binary Unit_Cost (STRING);
optional binary Total_Revenue (STRING);
optional binary Total_Cost (STRING);
optional binary Total_Profit (STRING); }
Mobile Analytics