Write a Parquet File
This example shows how to write records to a Parquet file. Parquet is a columnar storage file format that offers efficient compression and encoding techniques, making it suitable for handling large datasets. DataPipeline enables users to serialize and store structured data in Parquet files, optimizing storage and facilitating high-performance data processing.
Java Code Listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DebugReader; import com.northconcepts.datapipeline.core.FieldType; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.internal.lang.Moment; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class WriteAParquetFile { private static final File PARQUET_FILE = new File("example/data/output/WriteAParquetFile.parquet"); public static void main(String[] args) { System.out.println("============================================================"); System.out.println("Write records to a parquet file"); System.out.println("============================================================"); DataReader reader = new MemoryReader(createRecordList()); reader = new DebugReader(reader); ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE); Job.run(reader, writer); System.out.println("============================================================"); System.out.println("Prepared Schema"); System.out.println("============================================================"); System.out.println(writer.getSchema()); System.out.println("============================================================"); System.out.println("Read the parquet file"); System.out.println("============================================================"); Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out)); } public static RecordList createRecordList() { RecordList recordList = new RecordList(); Record record1 = new Record(); record1.setField("BLOB", new byte[] { 2, 4, 6, 8, 10, 12 }); record1.setField("BOOLEAN", true); record1.setField("BYTE", (byte) 97); record1.setField("CHAR", 'A'); record1.setField("DATE", Moment.parseMoment("2014-12-25").getDatePart()); record1.setField("DATETIME", Moment.parseMoment("2014-12-25 13:41:57").getDate()); record1.setField("DOUBLE", 2048.1024); record1.setField("FLOAT", 4096.32f); record1.setField("INT", 8192); record1.setField("LONG", 1152921504606846976L); record1.setField("SHORT", (short) 32); record1.setField("BIG_DECIMAL", new BigDecimal("123.456789")); record1.setField("BIG_INTEGER", BigInteger.valueOf(123456L)); record1.setField("STRING", "A basic numeric constant is considered an integer."); record1.setField("TIME", Moment.parseMoment("13:41:57").getTimePart()); record1.setField("Array-1", Arrays.asList("J", 234, new BigDecimal("456.789"), "A")); record1.setField("Array-2", new String[] { "J", "A", "V", "A" }); record1.setField("Array-3", new Double[] { 123.123, 345.345, 456.456, 555.678 }); // Record with null values. Record record2 = new Record(); record2.setFieldNull("BLOB", FieldType.BLOB); record2.setFieldNull("BOOLEAN", FieldType.BOOLEAN); record2.setFieldNull("BYTE", FieldType.BYTE); record2.setFieldNull("CHAR", FieldType.CHAR); record2.setFieldNull("DATE", FieldType.DATE); record2.setFieldNull("DATETIME", FieldType.DATETIME); record2.setFieldNull("DOUBLE", FieldType.DOUBLE); record2.setFieldNull("FLOAT", FieldType.FLOAT); record2.setFieldNull("INT", FieldType.INT); record2.setFieldNull("LONG", FieldType.LONG); record2.setFieldNull("SHORT", FieldType.SHORT); record2.setFieldNull("BIG_DECIMAL", FieldType.BIG_DECIMAL); record2.setFieldNull("BIG_INTEGER", FieldType.BIG_INTEGER); record2.setFieldNull("STRING", FieldType.STRING); record2.setFieldNull("TIME", FieldType.TIME); record2.setFieldNull("Array-1", FieldType.UNDEFINED); record2.setFieldNull("Array-2", FieldType.STRING); record2.setFieldNull("Array-3", FieldType.DOUBLE); recordList.add(record1, record2); return recordList; } }
Code Walkthrough
createRecordList()
creates records with and without values and adds them to RecordList object.setField()
creates a field with a field name specified in the first parameter and a value specified in the second parameter.setFieldNull()
creates an empty field with a field name specified in the first parameter and a field type specified in the second parameter.- MemoryReader is created to read records from
RecordList
object which is returned bycreateRecordList()
. - DebugReader is created using
MemoryReader
object to print records passing through a stream. - ParquetDataWriter is created to write records to the specified Parquet file(i.e
WriteAParquetFile.parquet
). - Data is transferred from
DebugReader
to theParquetDataWriter
viaJob.run()
method. writer.getSchema()
method returns the schema that is used to write the specified Parquet file. This schema contains a group of fields called a message. Each field in the message has three attributes: repetition, type, and name.- ParquetDataReader is created and passed to
Job.run()
method to stream records from the output Parquet file. - Data are transferred from
ParquetDataReader
to the console via Job.run() method. See how to compile and run data pipeline jobs.
ParquetDataWriter
Writes records to Apache Parquet columnar files. It extends IntegrationWriter class and can be created using File object. Using this class you can set the configuration, schema, compression codec name, and other metadata of the newly created Parquet file.
ParquetDataReader
Read records from Apache Parquet columnar files. Using this class you can get the configuration, schema, filter setting, and other metadata of a given Parquet file.
Record
Record class holds persistent data in key-value fields as it flows through the pipeline. A method setField()
in this class creates a new field as key-value pair by taking a field name and a value as a parameter. If you don't want to add values to fields upon creation, you can create an empty field with filed name and type using setFieldNull()
method.
FieldType
An enumeration class FieldType
lists all field data types used in records. UNDEFINED
is used for any type not natively supported by Data Pipeline.
RecordList
As the name suggests it is used to store a list of Record objects in memory. It implements Iterable so you can perform operations similar to Java Collection to this object.
MemoryReader
Obtains records from an in-memory RecordList. It extends DataReader class and can be created with RecordList
(optional) object.
DebugReader
A proxy that prints records passing through to a stream in a human-readable format. This will help you to track and debug every record that you are writing to a Parquet file.
Console Output
============================================================ Write records to a parquet file ============================================================ 22:44:58,653 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0-SNAPSHOT by North Concepts Inc. ========================================================== debug - 0 ------------------------------------------ Record (MODIFIED) { 0:[BLOB]:BLOB=[[2...12]]:[B 1:[BOOLEAN]:BOOLEAN=[true]:Boolean 2:[BYTE]:BYTE=[97]:Byte 3:[CHAR]:CHAR=[A]:Character 4:[DATE]:DATE=[2014-12-25]:Date 5:[DATETIME]:DATETIME=[Thu Dec 25 13:41:57 UZT 2014]:Date 6:[DOUBLE]:DOUBLE=[2048.1024]:Double 7:[FLOAT]:FLOAT=[4096.32]:Float 8:[INT]:INT=[8192]:Integer 9:[LONG]:LONG=[1152921504606846976]:Long 10:[SHORT]:SHORT=[32]:Short 11:[BIG_DECIMAL]:BIG_DECIMAL=[123.456789]:BigDecimal 12:[BIG_INTEGER]:BIG_INTEGER=[123456]:BigInteger 13:[STRING]:STRING=[A basic numeric constant is considered an integer.]:String 14:[TIME]:TIME=[13:41:57]:Time 15:[Array-1]:ARRAY of UNDEFINED=[[J, 234, 456.789, A]]:ArrayValue 16:[Array-2]:ARRAY of STRING=[[J, A, V, A]]:ArrayValue 17:[Array-3]:ARRAY of DOUBLE=[[123.123, 345.345, 456.456, 555.678]]:ArrayValue } ========================================================== debug - 1 ------------------------------------------ Record (MODIFIED) { 0:[BLOB]:BLOB=[null] 1:[BOOLEAN]:BOOLEAN=[null] 2:[BYTE]:BYTE=[null] 3:[CHAR]:CHAR=[null] 4:[DATE]:DATE=[null] 5:[DATETIME]:DATETIME=[null] 6:[DOUBLE]:DOUBLE=[null] 7:[FLOAT]:FLOAT=[null] 8:[INT]:INT=[null] 9:[LONG]:LONG=[null] 10:[SHORT]:SHORT=[null] 11:[BIG_DECIMAL]:BIG_DECIMAL=[null] 12:[BIG_INTEGER]:BIG_INTEGER=[null] 13:[STRING]:STRING=[null] 14:[TIME]:TIME=[null] 15:[Array-1]:UNDEFINED=[null] 16:[Array-2]:STRING=[null] 17:[Array-3]:DOUBLE=[null] } ============================================================ Prepared Schema ============================================================ message schema { optional binary BLOB; optional boolean BOOLEAN; optional int32 BYTE (INTEGER(32,true)); optional binary CHAR (STRING); optional int32 DATE (DATE); optional int64 DATETIME (TIMESTAMP(MILLIS,true)); optional double DOUBLE; optional float FLOAT; optional int32 INT (INTEGER(32,true)); optional int64 LONG (INTEGER(64,true)); optional int32 SHORT (INTEGER(32,true)); optional binary BIG_DECIMAL (DECIMAL(9,6)); optional binary BIG_INTEGER (DECIMAL(6,0)); optional binary STRING (STRING); optional int64 TIME (TIME(MICROS,true)); optional group Array-1 (LIST) { repeated binary Array-1 (STRING); } optional group Array-2 (LIST) { repeated binary Array-2 (STRING); } optional group Array-3 (LIST) { repeated double Array-3; } } ============================================================ Read the parquet file ============================================================ ----------------------------------------------- 0 - Record (MODIFIED) { 0:[BLOB]:STRING=[ ]:String 1:[BOOLEAN]:BOOLEAN=[true]:Boolean 2:[BYTE]:INT=[97]:Integer 3:[CHAR]:STRING=[A]:String 4:[DATE]:DATE=[2014-12-25]:Date 5:[DATETIME]:DATETIME=[Thu Dec 25 13:41:57 UZT 2014]:Date 6:[DOUBLE]:DOUBLE=[2048.1024]:Double 7:[FLOAT]:FLOAT=[4096.32]:Float 8:[INT]:INT=[8192]:Integer 9:[LONG]:LONG=[1152921504606846976]:Long 10:[SHORT]:INT=[32]:Integer 11:[BIG_DECIMAL]:BIG_DECIMAL=[123.456789]:BigDecimal 12:[BIG_INTEGER]:BIG_DECIMAL=[123456]:BigDecimal 13:[STRING]:STRING=[A basic numeric constant is considered an integer.]:String 14:[TIME]:TIME=[13:41:57]:Time 15:[Array-1]:ARRAY of STRING=[[J, 234, 456.789, A]]:ArrayValue 16:[Array-2]:ARRAY of STRING=[[J, A, V, A]]:ArrayValue 17:[Array-3]:ARRAY of DOUBLE=[[123.123, 345.345, 456.456, 555.678]]:ArrayValue } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[BLOB]:STRING=[null] 1:[BOOLEAN]:BOOLEAN=[null] 2:[BYTE]:INT=[null] 3:[CHAR]:STRING=[null] 4:[DATE]:DATE=[null] 5:[DATETIME]:DATETIME=[null] 6:[DOUBLE]:DOUBLE=[null] 7:[FLOAT]:FLOAT=[null] 8:[INT]:INT=[null] 9:[LONG]:LONG=[null] 10:[SHORT]:INT=[null] 11:[BIG_DECIMAL]:BIG_DECIMAL=[null] 12:[BIG_INTEGER]:BIG_DECIMAL=[null] 13:[STRING]:STRING=[null] 14:[TIME]:TIME=[null] 15:[Array-1]:ARRAY of STRING=[[null]]:ArrayValue 16:[Array-2]:ARRAY of STRING=[[null]]:ArrayValue 17:[Array-3]:ARRAY of DOUBLE=[[null]]:ArrayValue } ----------------------------------------------- 2 records