Generate Parquet Schema By Analyzing All Records

Updated: Jun 1, 2023

In this example you are going to learn how you can create a Parquet schema by analyzing all records.

Parquet is an open-source, column-oriented data file format built for efficient data storage and retrieval.

Check Parquet Examples for more similar examples.

Input CSV file

The input CSV file contains 1000 records.

Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
Middle East and North Africa,Libya,Cosmetics,Offline,M,10/18/2014,686800706,10/31/2014,8446,437.20,263.33,3692591.20,2224085.18,1468506.02
North America,Canada,Vegetables,Online,M,11/7/2011,185941302,12/8/2011,3018,154.06,90.93,464953.08,274426.74,190526.34
Middle East and North Africa,Libya,Baby Food,Offline,C,10/31/2016,246222341,12/9/2016,1517,255.28,159.42,387259.76,241840.14,145419.62
Asia,Japan,Cereal,Offline,C,4/10/2010,161442649,5/12/2010,3322,205.70,117.11,683335.40,389039.42,294295.98
.
.
.

Java code listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.Field;
import com.northconcepts.datapipeline.core.ProxyReader;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class GenerateParquetSchemaByAnalyzingAllRecords {

    private static final Long RECORDS_TO_ANALYZE = null;
    
    private static final File PARQUET_FILE = new File("example/data/output/GenerateParquetSchemaForAllRecords.parquet");

    public static void main(String[] args) {
        DataReader reader = new CSVReader(new File("example/data/input/1000_Sales_Records.csv"))
                .setFieldNamesInFirstRow(true);
        
        reader = fixFieldNames(reader);

        ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE)
                .setMaxRecordsAnalyzed(RECORDS_TO_ANALYZE);
        
        Job.run(reader, writer);

        System.out.println("============================================================");
        System.out.println("Prepared Schema");
        System.out.println("============================================================");

        System.out.println(writer.getSchema());

        System.out.println("============================================================");
        System.out.println("Read the parquet file");
        System.out.println("============================================================");

        Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out));
    }
    
    private static ProxyReader fixFieldNames(DataReader reader) {
        return new ProxyReader(reader){
            @Override
            protected Record interceptRecord(Record record) throws Throwable {
                for (Field field : record) {
                    field.setName(field.getName().replace(" ", "_"));
                }
                return record;
            }
        };
    }
    
}

Code Walkthrough

  1. An CSVReader is created corresponding to the input file 1000_Sales_Records.csv.
  2. setFieldNamesInFirstRow(true) is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2, A3 and so on by default.
  3. fixFieldNames(reader) removes the spaces in the field names since Parquet does not support it.
  4. ParquetDataWriter instance that corresponds to the output file GenerateParquetSchemaFor500Records.parquet is created. It writes records to Apache Parquet columnar files.
  5. .setMaxRecordsAnalyzed(RECORDS_TO_ANALYZE) specifies the number of initial records to be analyzed in order to create the schema. By default, this number is 1000 records. null means all records will be analyzed. 
  6. Data are transferred from CSVReader to ParquetDataWriter via Job.run() method. See how to compile and run DataPipeline jobs.
  7. writer.getSchema() is used to get the generated schema.
  8. ParquetDataReader is used to read the records from the Parquet file.
  9. StreamWriter is used to write the records to a stream in a human-readable format. For this case it will printed to the console.

Parquet Schema Output

message schema {
  optional binary Region (STRING);
  optional binary Country (STRING);
  optional binary Item_Type (STRING);
  optional binary Sales_Channel (STRING);
  optional binary Order_Priority (STRING);
  optional binary Order_Date (STRING);
  optional binary Order_ID (STRING);
  optional binary Ship_Date (STRING);
  optional binary Units_Sold (STRING);
  optional binary Unit_Price (STRING);
  optional binary Unit_Cost (STRING);
  optional binary Total_Revenue (STRING);
  optional binary Total_Cost (STRING);
  optional binary Total_Profit (STRING);
}
Mobile Analytics