Write a Parquet file using the schema derived from any dataset

Updated: Sep 19, 2023

This example shows how to create Parquet files by deriving the schema directly from any dataset. It simplifies the process of defining the Parquet schema, allowing you to focus on your data engineering goals instead of the intricacies of the Parquet format.  Use-cases include scenarios where dynamic or evolving data sources require on-the-fly schema generation, such as streaming data sources, log analytics, and data ingestion.

 

Input CSV File

Id,AccountNo,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
1,101,Reeves,Keanu,12,1000.23,17-01-1998,A
2,102,Butler,Gerard,123456,1234567890.98,06-08-2003,B
3,103,Hewitt,Jennifer,4294967295,9876543210.776655,25-05-1985,B
4,104,Pinkett-Smith,Jada,184467440737095,-1.23457E+18,05-12-2006,A
5,105,Murray,Bill,9223372036854,-112233.99,05-02-2007,C
6,106,Murray,Bill,1,-778899.12345,05-02-2007,D
7,107,John,Doe,9223372036854,900000.98765,25-05-1985,B
8,108,Jane,Doe,789456123123456,8000000000.887765,17-01-1998,B
9,109,James,Bond,7894561230123,10000.99887,05-12-2006,A

 

Java Code Listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DebugReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;
import com.northconcepts.datapipeline.transform.BasicFieldTransformer;
import com.northconcepts.datapipeline.transform.TransformingReader;

public class WriteAParquetFileUsingSchemaFromData {

    private static final File PARQUET_FILE = new File("example/data/output/WriteAParquetFileUsingSchemaFromData.parquet");

    public static void main(String[] args) {
        System.out.println("============================================================");
        System.out.println("Write records to a parquet file");
        System.out.println("============================================================");

        DataReader reader = new CSVReader(new File("example/data/input/bank_account.csv"))
                .setFieldNamesInFirstRow(true);

        reader = new TransformingReader(reader)
                .add(new BasicFieldTransformer("Id").stringToInt())
                .add(new BasicFieldTransformer("Balance").stringToLong())
                .add(new BasicFieldTransformer("CreditLimit").stringToDouble())
                .add(new BasicFieldTransformer("AccountCreated").stringToDateTime("dd-mm-yyyy"))
                .add(new BasicFieldTransformer("Rating").stringToChar())
                ;

        reader = new DebugReader(reader);

        ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE);
        Job.run(reader, writer);

        System.out.println("============================================================");
        System.out.println("Prepared Schema");
        System.out.println("============================================================");

        System.out.println(writer.getSchema());

        System.out.println("============================================================");
        System.out.println("Read the parquet file");
        System.out.println("============================================================");

        Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out));

    }
}

 

Code Walkthrough

  1. CSVReader is created corresponding to the input file bank_account.csv.
  2. The setFieldNamesInFirstRow(true) method is invoked to specify that the names specified in the first row should be used as field names.
  3. TransformingReader is created to apply transformations to records from the source.
  4. Then, BasicFieldTransformer is used to convert data types of fields.
  5. DebugReader wrapped around the CSVReader is created. Each record is printed on System.out as they are read.
  6. ParquetDataWriter is created to write records to the specified file WriteAParquetFileUsingSchemaFromData.parquet.
  7. Job.run() is used to transfer the data from reader to writer.
  8. Next, the schema of the writer is then printed on the console.
  9. ParquetDataReader is created to read records from the specified WriteAParquetFileUsingSchemaFromData.parquet.
  10. Finally, Job.run() is used to transfer the data from ParquetDataReader to StreamWriter(System.out).

 

DebugReader

DebugReader can be wrapped around any Reader for debugging purposes. It modifies the input data and displays it in the specified OutputStream. It can be created using the following parameters:

  • DataReader - This is the reader corresponding to the input file whose contents are being debugged.
  • String - This is the text that is to be prefixed to the output. Any text can be used here.
  • OutputStream - This is the output stream where the output will be printed. System.out is used for demo purposes, it can be replaced with any OutputStream.

 

Console Output

============================================================
Write records to a parquet file
============================================================
21:19:15,382 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0 by North Concepts Inc.
==========================================================
debug - 0
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[1]:Integer
    1:[AccountNo]:STRING=[101]:String
    2:[LastName]:STRING=[Reeves]:String
    3:[FirstName]:STRING=[Keanu]:String
    4:[Balance]:LONG=[12]:Long
    5:[CreditLimit]:DOUBLE=[1000.23]:Double
    6:[AccountCreated]:DATETIME=[Sat Jan 17 00:01:00 UZT 1998]:Date
    7:[Rating]:CHAR=[A]:Character
}

==========================================================
debug - 1
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[2]:Integer
    1:[AccountNo]:STRING=[102]:String
    2:[LastName]:STRING=[Butler]:String
    3:[FirstName]:STRING=[Gerard]:String
    4:[Balance]:LONG=[123456]:Long
    5:[CreditLimit]:DOUBLE=[1.23456789098E9]:Double
    6:[AccountCreated]:DATETIME=[Mon Jan 06 00:08:00 UZT 2003]:Date
    7:[Rating]:CHAR=[B]:Character
}

==========================================================
debug - 2
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[3]:Integer
    1:[AccountNo]:STRING=[103]:String
    2:[LastName]:STRING=[Hewitt]:String
    3:[FirstName]:STRING=[Jennifer]:String
    4:[Balance]:LONG=[4294967295]:Long
    5:[CreditLimit]:DOUBLE=[9.876543210776655E9]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 25 00:05:00 UZT 1985]:Date
    7:[Rating]:CHAR=[B]:Character
}

==========================================================
debug - 3
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[4]:Integer
    1:[AccountNo]:STRING=[104]:String
    2:[LastName]:STRING=[Pinkett-Smith]:String
    3:[FirstName]:STRING=[Jada]:String
    4:[Balance]:LONG=[184467440737095]:Long
    5:[CreditLimit]:DOUBLE=[-1.23457E18]:Double
    6:[AccountCreated]:DATETIME=[Thu Jan 05 00:12:00 UZT 2006]:Date
    7:[Rating]:CHAR=[A]:Character
}

==========================================================
debug - 4
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[5]:Integer
    1:[AccountNo]:STRING=[105]:String
    2:[LastName]:STRING=[Murray]:String
    3:[FirstName]:STRING=[Bill]:String
    4:[Balance]:LONG=[9223372036854]:Long
    5:[CreditLimit]:DOUBLE=[-112233.99]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 05 00:02:00 UZT 2007]:Date
    7:[Rating]:CHAR=[C]:Character
}

==========================================================
debug - 5
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[6]:Integer
    1:[AccountNo]:STRING=[106]:String
    2:[LastName]:STRING=[Murray]:String
    3:[FirstName]:STRING=[Bill]:String
    4:[Balance]:LONG=[1]:Long
    5:[CreditLimit]:DOUBLE=[-778899.12345]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 05 00:02:00 UZT 2007]:Date
    7:[Rating]:CHAR=[D]:Character
}

==========================================================
debug - 6
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[7]:Integer
    1:[AccountNo]:STRING=[107]:String
    2:[LastName]:STRING=[John]:String
    3:[FirstName]:STRING=[Doe]:String
    4:[Balance]:LONG=[9223372036854]:Long
    5:[CreditLimit]:DOUBLE=[900000.98765]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 25 00:05:00 UZT 1985]:Date
    7:[Rating]:CHAR=[B]:Character
}

==========================================================
debug - 7
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[8]:Integer
    1:[AccountNo]:STRING=[108]:String
    2:[LastName]:STRING=[Jane]:String
    3:[FirstName]:STRING=[Doe]:String
    4:[Balance]:LONG=[789456123123456]:Long
    5:[CreditLimit]:DOUBLE=[8.000000000887765E9]:Double
    6:[AccountCreated]:DATETIME=[Sat Jan 17 00:01:00 UZT 1998]:Date
    7:[Rating]:CHAR=[B]:Character
}

==========================================================
debug - 8
------------------------------------------
Record (MODIFIED) {
    0:[Id]:INT=[9]:Integer
    1:[AccountNo]:STRING=[109]:String
    2:[LastName]:STRING=[James]:String
    3:[FirstName]:STRING=[Bond]:String
    4:[Balance]:LONG=[7894561230123]:Long
    5:[CreditLimit]:DOUBLE=[10000.99887]:Double
    6:[AccountCreated]:DATETIME=[Thu Jan 05 00:12:00 UZT 2006]:Date
    7:[Rating]:CHAR=[A]:Character
}

============================================================
Prepared Schema
============================================================
message schema {
  optional int32 Id (INTEGER(32,true));
  optional binary AccountNo (STRING);
  optional binary LastName (STRING);
  optional binary FirstName (STRING);
  optional int64 Balance (INTEGER(64,true));
  optional double CreditLimit;
  optional int64 AccountCreated (TIMESTAMP(MILLIS,true));
  optional binary Rating (STRING);
}

============================================================
Read the parquet file
============================================================
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[Id]:INT=[1]:Integer
    1:[AccountNo]:STRING=[101]:String
    2:[LastName]:STRING=[Reeves]:String
    3:[FirstName]:STRING=[Keanu]:String
    4:[Balance]:LONG=[12]:Long
    5:[CreditLimit]:DOUBLE=[1000.23]:Double
    6:[AccountCreated]:DATETIME=[Sat Jan 17 00:01:00 UZT 1998]:Date
    7:[Rating]:STRING=[A]:String
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[Id]:INT=[2]:Integer
    1:[AccountNo]:STRING=[102]:String
    2:[LastName]:STRING=[Butler]:String
    3:[FirstName]:STRING=[Gerard]:String
    4:[Balance]:LONG=[123456]:Long
    5:[CreditLimit]:DOUBLE=[1.23456789098E9]:Double
    6:[AccountCreated]:DATETIME=[Mon Jan 06 00:08:00 UZT 2003]:Date
    7:[Rating]:STRING=[B]:String
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[Id]:INT=[3]:Integer
    1:[AccountNo]:STRING=[103]:String
    2:[LastName]:STRING=[Hewitt]:String
    3:[FirstName]:STRING=[Jennifer]:String
    4:[Balance]:LONG=[4294967295]:Long
    5:[CreditLimit]:DOUBLE=[9.876543210776655E9]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 25 00:05:00 UZT 1985]:Date
    7:[Rating]:STRING=[B]:String
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[Id]:INT=[4]:Integer
    1:[AccountNo]:STRING=[104]:String
    2:[LastName]:STRING=[Pinkett-Smith]:String
    3:[FirstName]:STRING=[Jada]:String
    4:[Balance]:LONG=[184467440737095]:Long
    5:[CreditLimit]:DOUBLE=[-1.23457E18]:Double
    6:[AccountCreated]:DATETIME=[Thu Jan 05 00:12:00 UZT 2006]:Date
    7:[Rating]:STRING=[A]:String
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[Id]:INT=[5]:Integer
    1:[AccountNo]:STRING=[105]:String
    2:[LastName]:STRING=[Murray]:String
    3:[FirstName]:STRING=[Bill]:String
    4:[Balance]:LONG=[9223372036854]:Long
    5:[CreditLimit]:DOUBLE=[-112233.99]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 05 00:02:00 UZT 2007]:Date
    7:[Rating]:STRING=[C]:String
}

-----------------------------------------------
5 - Record (MODIFIED) {
    0:[Id]:INT=[6]:Integer
    1:[AccountNo]:STRING=[106]:String
    2:[LastName]:STRING=[Murray]:String
    3:[FirstName]:STRING=[Bill]:String
    4:[Balance]:LONG=[1]:Long
    5:[CreditLimit]:DOUBLE=[-778899.12345]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 05 00:02:00 UZT 2007]:Date
    7:[Rating]:STRING=[D]:String
}

-----------------------------------------------
6 - Record (MODIFIED) {
    0:[Id]:INT=[7]:Integer
    1:[AccountNo]:STRING=[107]:String
    2:[LastName]:STRING=[John]:String
    3:[FirstName]:STRING=[Doe]:String
    4:[Balance]:LONG=[9223372036854]:Long
    5:[CreditLimit]:DOUBLE=[900000.98765]:Double
    6:[AccountCreated]:DATETIME=[Fri Jan 25 00:05:00 UZT 1985]:Date
    7:[Rating]:STRING=[B]:String
}

-----------------------------------------------
7 - Record (MODIFIED) {
    0:[Id]:INT=[8]:Integer
    1:[AccountNo]:STRING=[108]:String
    2:[LastName]:STRING=[Jane]:String
    3:[FirstName]:STRING=[Doe]:String
    4:[Balance]:LONG=[789456123123456]:Long
    5:[CreditLimit]:DOUBLE=[8.000000000887765E9]:Double
    6:[AccountCreated]:DATETIME=[Sat Jan 17 00:01:00 UZT 1998]:Date
    7:[Rating]:STRING=[B]:String
}

-----------------------------------------------
8 - Record (MODIFIED) {
    0:[Id]:INT=[9]:Integer
    1:[AccountNo]:STRING=[109]:String
    2:[LastName]:STRING=[James]:String
    3:[FirstName]:STRING=[Bond]:String
    4:[Balance]:LONG=[7894561230123]:Long
    5:[CreditLimit]:DOUBLE=[10000.99887]:Double
    6:[AccountCreated]:DATETIME=[Thu Jan 05 00:12:00 UZT 2006]:Date
    7:[Rating]:STRING=[A]:String
}

-----------------------------------------------
9 records

 

Mobile Analytics