Write a Parquet File

Updated: Jul 7, 2023

This example shows how to write records to a Parquet file. Parquet is a columnar storage file format that offers efficient compression and encoding techniques, making it suitable for handling large datasets. DataPipeline enables users to serialize and store structured data in Parquet files, optimizing storage and facilitating high-performance data processing. 

 

Java Code Listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DebugReader;
import com.northconcepts.datapipeline.core.FieldType;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.internal.lang.Moment;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteAParquetFile {

    private static final File PARQUET_FILE = new File("example/data/output/WriteAParquetFile.parquet");

    public static void main(String[] args) {
        System.out.println("============================================================");
        System.out.println("Write records to a parquet file");
        System.out.println("============================================================");

        DataReader reader = new MemoryReader(createRecordList());
        reader = new DebugReader(reader);
        ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE);
        Job.run(reader, writer);

        System.out.println("============================================================");
        System.out.println("Prepared Schema");
        System.out.println("============================================================");

        System.out.println(writer.getSchema());

        System.out.println("============================================================");
        System.out.println("Read the parquet file");
        System.out.println("============================================================");

        Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out));

    }

    public static RecordList createRecordList() {
        RecordList recordList = new RecordList();

        Record record1 = new Record();
        record1.setField("BLOB", new byte[] { 2, 4, 6, 8, 10, 12 });
        record1.setField("BOOLEAN", true);
        record1.setField("BYTE", (byte) 97);
        record1.setField("CHAR", 'A');
        record1.setField("DATE", Moment.parseMoment("2014-12-25").getDatePart());
        record1.setField("DATETIME", Moment.parseMoment("2014-12-25 13:41:57").getDate());
        record1.setField("DOUBLE", 2048.1024);
        record1.setField("FLOAT", 4096.32f);
        record1.setField("INT", 8192);
        record1.setField("LONG", 1152921504606846976L);
        record1.setField("SHORT", (short) 32);
        record1.setField("BIG_DECIMAL", new BigDecimal("123.456789"));
        record1.setField("BIG_INTEGER", BigInteger.valueOf(123456L));
        record1.setField("STRING", "A basic numeric constant is considered an integer.");
        record1.setField("TIME", Moment.parseMoment("13:41:57").getTimePart());
        record1.setField("Array-1", Arrays.asList("J", 234, new BigDecimal("456.789"), "A"));
        record1.setField("Array-2", new String[] { "J", "A", "V", "A" });
        record1.setField("Array-3", new Double[] { 123.123, 345.345, 456.456, 555.678 });


        // Record with null values.
        Record record2 = new Record();
        record2.setFieldNull("BLOB", FieldType.BLOB);
        record2.setFieldNull("BOOLEAN", FieldType.BOOLEAN);
        record2.setFieldNull("BYTE", FieldType.BYTE);
        record2.setFieldNull("CHAR", FieldType.CHAR);
        record2.setFieldNull("DATE", FieldType.DATE);
        record2.setFieldNull("DATETIME", FieldType.DATETIME);
        record2.setFieldNull("DOUBLE", FieldType.DOUBLE);
        record2.setFieldNull("FLOAT", FieldType.FLOAT);
        record2.setFieldNull("INT", FieldType.INT);
        record2.setFieldNull("LONG", FieldType.LONG);
        record2.setFieldNull("SHORT", FieldType.SHORT);
        record2.setFieldNull("BIG_DECIMAL", FieldType.BIG_DECIMAL);
        record2.setFieldNull("BIG_INTEGER", FieldType.BIG_INTEGER);
        record2.setFieldNull("STRING", FieldType.STRING);
        record2.setFieldNull("TIME", FieldType.TIME);
        record2.setFieldNull("Array-1", FieldType.UNDEFINED);
        record2.setFieldNull("Array-2", FieldType.STRING);
        record2.setFieldNull("Array-3", FieldType.DOUBLE);

        recordList.add(record1, record2);
        return recordList;
    }
}

 

Code Walkthrough

  1. createRecordList() creates records with and without values and adds them to RecordList object.
  2. setField() creates a field with a field name specified in the first parameter and a value specified in the second parameter.
  3. setFieldNull() creates an empty field with a field name specified in the first parameter and a field type specified in the second parameter.
  4. MemoryReader is created to read records from RecordList object which is returned by createRecordList().
  5. DebugReader is created using MemoryReader object to print records passing through a stream.
  6. ParquetDataWriter is created to write records to the specified Parquet file(i.e WriteAParquetFile.parquet).
  7. Data is transferred from DebugReader to the ParquetDataWriter via Job.run() method.
  8. writer.getSchema() method returns the schema that is used to write the specified Parquet file. This schema contains a group of fields called a message. Each field in the message has three attributes: repetition, type, and name.
  9. ParquetDataReader is created and passed to Job.run() method to stream records from the output Parquet file.
  10. Data are transferred from ParquetDataReader to the console via Job.run() method. See how to compile and run data pipeline jobs.

 

ParquetDataWriter

Writes records to Apache Parquet columnar files. It extends IntegrationWriter class and can be created using File object. Using this class you can set the configuration, schema, compression codec name, and other metadata of the newly created Parquet file.

 

ParquetDataReader

Read records from Apache Parquet columnar files. Using this class you can get the configuration, schema, filter setting, and other metadata of a given Parquet file.

 

Record

Record class holds persistent data in key-value fields as it flows through the pipeline. A method setField() in this class creates a new field as key-value pair by taking a field name and a value as a parameter. If you don't want to add values to fields upon creation, you can create an empty field with filed name and type using setFieldNull() method.

 

FieldType

An enumeration class FieldType lists all field data types used in records. UNDEFINED is used for any type not natively supported by Data Pipeline.

 

RecordList

As the name suggests it is used to store a list of Record objects in memory. It implements Iterable so you can perform operations similar to Java Collection to this object.

 

MemoryReader

Obtains records from an in-memory RecordList. It extends DataReader class and can be created with RecordList(optional) object.

 

DebugReader

A proxy that prints records passing through to a stream in a human-readable format. This will help you to track and debug every record that you are writing to a Parquet file.

 

Console Output

============================================================
Write records to a parquet file
============================================================
22:44:58,653 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0-SNAPSHOT by North Concepts Inc.
==========================================================
debug - 0
------------------------------------------
Record (MODIFIED) {
    0:[BLOB]:BLOB=[[2...12]]:[B
    1:[BOOLEAN]:BOOLEAN=[true]:Boolean
    2:[BYTE]:BYTE=[97]:Byte
    3:[CHAR]:CHAR=[A]:Character
    4:[DATE]:DATE=[2014-12-25]:Date
    5:[DATETIME]:DATETIME=[Thu Dec 25 13:41:57 UZT 2014]:Date
    6:[DOUBLE]:DOUBLE=[2048.1024]:Double
    7:[FLOAT]:FLOAT=[4096.32]:Float
    8:[INT]:INT=[8192]:Integer
    9:[LONG]:LONG=[1152921504606846976]:Long
    10:[SHORT]:SHORT=[32]:Short
    11:[BIG_DECIMAL]:BIG_DECIMAL=[123.456789]:BigDecimal
    12:[BIG_INTEGER]:BIG_INTEGER=[123456]:BigInteger
    13:[STRING]:STRING=[A basic numeric constant is considered an integer.]:String
    14:[TIME]:TIME=[13:41:57]:Time
    15:[Array-1]:ARRAY of UNDEFINED=[[J, 234, 456.789, A]]:ArrayValue
    16:[Array-2]:ARRAY of STRING=[[J, A, V, A]]:ArrayValue
    17:[Array-3]:ARRAY of DOUBLE=[[123.123, 345.345, 456.456, 555.678]]:ArrayValue
}

==========================================================
debug - 1
------------------------------------------
Record (MODIFIED) {
    0:[BLOB]:BLOB=[null]
    1:[BOOLEAN]:BOOLEAN=[null]
    2:[BYTE]:BYTE=[null]
    3:[CHAR]:CHAR=[null]
    4:[DATE]:DATE=[null]
    5:[DATETIME]:DATETIME=[null]
    6:[DOUBLE]:DOUBLE=[null]
    7:[FLOAT]:FLOAT=[null]
    8:[INT]:INT=[null]
    9:[LONG]:LONG=[null]
    10:[SHORT]:SHORT=[null]
    11:[BIG_DECIMAL]:BIG_DECIMAL=[null]
    12:[BIG_INTEGER]:BIG_INTEGER=[null]
    13:[STRING]:STRING=[null]
    14:[TIME]:TIME=[null]
    15:[Array-1]:UNDEFINED=[null]
    16:[Array-2]:STRING=[null]
    17:[Array-3]:DOUBLE=[null]
}

============================================================
Prepared Schema
============================================================
message schema {
  optional binary BLOB;
  optional boolean BOOLEAN;
  optional int32 BYTE (INTEGER(32,true));
  optional binary CHAR (STRING);
  optional int32 DATE (DATE);
  optional int64 DATETIME (TIMESTAMP(MILLIS,true));
  optional double DOUBLE;
  optional float FLOAT;
  optional int32 INT (INTEGER(32,true));
  optional int64 LONG (INTEGER(64,true));
  optional int32 SHORT (INTEGER(32,true));
  optional binary BIG_DECIMAL (DECIMAL(9,6));
  optional binary BIG_INTEGER (DECIMAL(6,0));
  optional binary STRING (STRING);
  optional int64 TIME (TIME(MICROS,true));
  optional group Array-1 (LIST) {
    repeated binary Array-1 (STRING);
  }
  optional group Array-2 (LIST) {
    repeated binary Array-2 (STRING);
  }
  optional group Array-3 (LIST) {
    repeated double Array-3;
  }
}

============================================================
Read the parquet file
============================================================
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[BLOB]:STRING=[
]:String
    1:[BOOLEAN]:BOOLEAN=[true]:Boolean
    2:[BYTE]:INT=[97]:Integer
    3:[CHAR]:STRING=[A]:String
    4:[DATE]:DATE=[2014-12-25]:Date
    5:[DATETIME]:DATETIME=[Thu Dec 25 13:41:57 UZT 2014]:Date
    6:[DOUBLE]:DOUBLE=[2048.1024]:Double
    7:[FLOAT]:FLOAT=[4096.32]:Float
    8:[INT]:INT=[8192]:Integer
    9:[LONG]:LONG=[1152921504606846976]:Long
    10:[SHORT]:INT=[32]:Integer
    11:[BIG_DECIMAL]:BIG_DECIMAL=[123.456789]:BigDecimal
    12:[BIG_INTEGER]:BIG_DECIMAL=[123456]:BigDecimal
    13:[STRING]:STRING=[A basic numeric constant is considered an integer.]:String
    14:[TIME]:TIME=[13:41:57]:Time
    15:[Array-1]:ARRAY of STRING=[[J, 234, 456.789, A]]:ArrayValue
    16:[Array-2]:ARRAY of STRING=[[J, A, V, A]]:ArrayValue
    17:[Array-3]:ARRAY of DOUBLE=[[123.123, 345.345, 456.456, 555.678]]:ArrayValue
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[BLOB]:STRING=[null]
    1:[BOOLEAN]:BOOLEAN=[null]
    2:[BYTE]:INT=[null]
    3:[CHAR]:STRING=[null]
    4:[DATE]:DATE=[null]
    5:[DATETIME]:DATETIME=[null]
    6:[DOUBLE]:DOUBLE=[null]
    7:[FLOAT]:FLOAT=[null]
    8:[INT]:INT=[null]
    9:[LONG]:LONG=[null]
    10:[SHORT]:INT=[null]
    11:[BIG_DECIMAL]:BIG_DECIMAL=[null]
    12:[BIG_INTEGER]:BIG_DECIMAL=[null]
    13:[STRING]:STRING=[null]
    14:[TIME]:TIME=[null]
    15:[Array-1]:ARRAY of STRING=[[null]]:ArrayValue
    16:[Array-2]:ARRAY of STRING=[[null]]:ArrayValue
    17:[Array-3]:ARRAY of DOUBLE=[[null]]:ArrayValue
}

-----------------------------------------------
2 records

 

Mobile Analytics