Compress a Parquet File
Updated: Jul 7, 2023
This example compresses Parquet files, reducing their file size and optimizing storage efficiency. By compressing Parquet files, users can achieve significant space savings while maintaining the benefits of the Parquet format, such as columnar storage and efficient data processing.
Compressing Parquet files helps optimize storage utilization by reducing the amount of disk space required to store data. This is particularly useful in scenarios where storage costs are a concern, such as in cloud environments or when dealing with large-scale datasets. Compressed Parquet files can result in substantial space savings without sacrificing data integrity or query performance.
Java Code Listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DebugReader; import com.northconcepts.datapipeline.core.FieldType; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.internal.lang.Moment; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class CompressAParquetFile { private static final File PARQUET_FILE = new File("example/data/output/WriteAParquetFile.parquet"); public static void main(String[] args) { System.out.println("============================================================"); System.out.println("Write records to a parquet file"); System.out.println("============================================================"); DataReader reader = new MemoryReader(createRecordList()); reader = new DebugReader(reader); ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE); writer.setCompressionCodecName(CompressionCodecName.GZIP); Job.run(reader, writer); System.out.println("============================================================"); System.out.println("Prepared Schema"); System.out.println("============================================================"); System.out.println(writer.getSchema()); System.out.println("============================================================"); System.out.println("Read the parquet file"); System.out.println("============================================================"); Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out)); } public static RecordList createRecordList() { RecordList recordList = new RecordList(); Record record1 = new Record(); record1.setField("BLOB", new byte[] { 2, 4, 6, 8, 10, 12 }); record1.setField("BOOLEAN", true); record1.setField("BYTE", (byte) 97); record1.setField("CHAR", 'A'); record1.setField("DATE", Moment.parseMoment("2014-12-25").getDatePart()); record1.setField("DATETIME", Moment.parseMoment("2014-12-25 13:41:57").getDate()); record1.setField("DOUBLE", 2048.1024); record1.setField("FLOAT", 4096.32f); record1.setField("INT", 8192); record1.setField("LONG", 1152921504606846976L); record1.setField("SHORT", (short) 32); record1.setField("BIG_DECIMAL", new BigDecimal("123.456789")); record1.setField("BIG_INTEGER", BigInteger.valueOf(123456L)); record1.setField("STRING", "A basic numeric constant is considered an integer."); record1.setField("TIME", Moment.parseMoment("13:41:57").getTimePart()); record1.setField("Array-1", Arrays.asList("J", 234, new BigDecimal("456.789"), "A")); record1.setField("Array-2", new String[] { "J", "A", "V", "A" }); record1.setField("Array-3", new Double[] { 123.123, 345.345, 456.456, 555.678 }); // Record with null values. Record record2 = new Record(); record2.setFieldNull("BLOB", FieldType.BLOB); record2.setFieldNull("BOOLEAN", FieldType.BOOLEAN); record2.setFieldNull("BYTE", FieldType.BYTE); record2.setFieldNull("CHAR", FieldType.CHAR); record2.setFieldNull("DATE", FieldType.DATE); record2.setFieldNull("DATETIME", FieldType.DATETIME); record2.setFieldNull("DOUBLE", FieldType.DOUBLE); record2.setFieldNull("FLOAT", FieldType.FLOAT); record2.setFieldNull("INT", FieldType.INT); record2.setFieldNull("LONG", FieldType.LONG); record2.setFieldNull("SHORT", FieldType.SHORT); record2.setFieldNull("BIG_DECIMAL", FieldType.BIG_DECIMAL); record2.setFieldNull("BIG_INTEGER", FieldType.BIG_INTEGER); record2.setFieldNull("STRING", FieldType.STRING); record2.setFieldNull("TIME", FieldType.TIME); record2.setFieldNull("Array-1", FieldType.UNDEFINED); record2.setFieldNull("Array-2", FieldType.STRING); record2.setFieldNull("Array-3", FieldType.DOUBLE); recordList.add(record1, record2); return recordList; } }
Code walkthrough
- MemoryReader which is used to obtain records from an in-memory RecordList is used to accept the record List returned by
createRecordList()
. - DebugReader is a proxy that prints records passing through to a stream in a human-readable format
- ParquetDataWriter which accepts the parquet file as input is used to write records to Apache Parquet columnar files.
CompressionCodecName
is used to specify the type of compression that will be used. In this case, GZIP was chosen.Job.run()
is then used to transfer data from thereader
to thewriter
i.e.Job.run(reader, writer)
.- The second job method is used to read the contents of the parquet file and write it to a StreamWriter.
createRecordList
class is used to create the records that were used as input i.e. the records that are to be written to the Parquet file. For this example, two records were created, one with data and the other with null values.
Console Output
============================================================ Write records to a parquet file ============================================================ 23:07:49,443 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0-SNAPSHOT by North Concepts Inc. ========================================================== debug - 0 ------------------------------------------ Record (MODIFIED) { 0:[BLOB]:BLOB=[[2...12]]:[B 1:[BOOLEAN]:BOOLEAN=[true]:Boolean 2:[BYTE]:BYTE=[97]:Byte 3:[CHAR]:CHAR=[A]:Character 4:[DATE]:DATE=[2014-12-25]:Date 5:[DATETIME]:DATETIME=[Thu Dec 25 13:41:57 UZT 2014]:Date 6:[DOUBLE]:DOUBLE=[2048.1024]:Double 7:[FLOAT]:FLOAT=[4096.32]:Float 8:[INT]:INT=[8192]:Integer 9:[LONG]:LONG=[1152921504606846976]:Long 10:[SHORT]:SHORT=[32]:Short 11:[BIG_DECIMAL]:BIG_DECIMAL=[123.456789]:BigDecimal 12:[BIG_INTEGER]:BIG_INTEGER=[123456]:BigInteger 13:[STRING]:STRING=[A basic numeric constant is considered an integer.]:String 14:[TIME]:TIME=[13:41:57]:Time 15:[Array-1]:ARRAY of UNDEFINED=[[J, 234, 456.789, A]]:ArrayValue 16:[Array-2]:ARRAY of STRING=[[J, A, V, A]]:ArrayValue 17:[Array-3]:ARRAY of DOUBLE=[[123.123, 345.345, 456.456, 555.678]]:ArrayValue } ========================================================== debug - 1 ------------------------------------------ Record (MODIFIED) { 0:[BLOB]:BLOB=[null] 1:[BOOLEAN]:BOOLEAN=[null] 2:[BYTE]:BYTE=[null] 3:[CHAR]:CHAR=[null] 4:[DATE]:DATE=[null] 5:[DATETIME]:DATETIME=[null] 6:[DOUBLE]:DOUBLE=[null] 7:[FLOAT]:FLOAT=[null] 8:[INT]:INT=[null] 9:[LONG]:LONG=[null] 10:[SHORT]:SHORT=[null] 11:[BIG_DECIMAL]:BIG_DECIMAL=[null] 12:[BIG_INTEGER]:BIG_INTEGER=[null] 13:[STRING]:STRING=[null] 14:[TIME]:TIME=[null] 15:[Array-1]:UNDEFINED=[null] 16:[Array-2]:STRING=[null] 17:[Array-3]:DOUBLE=[null] } ============================================================ Prepared Schema ============================================================ message schema { optional binary BLOB; optional boolean BOOLEAN; optional int32 BYTE (INTEGER(32,true)); optional binary CHAR (STRING); optional int32 DATE (DATE); optional int64 DATETIME (TIMESTAMP(MILLIS,true)); optional double DOUBLE; optional float FLOAT; optional int32 INT (INTEGER(32,true)); optional int64 LONG (INTEGER(64,true)); optional int32 SHORT (INTEGER(32,true)); optional binary BIG_DECIMAL (DECIMAL(9,6)); optional binary BIG_INTEGER (DECIMAL(6,0)); optional binary STRING (STRING); optional int64 TIME (TIME(MICROS,true)); optional group Array-1 (LIST) { repeated binary Array-1 (STRING); } optional group Array-2 (LIST) { repeated binary Array-2 (STRING); } optional group Array-3 (LIST) { repeated double Array-3; } } ============================================================ Read the parquet file ============================================================ ----------------------------------------------- 0 - Record (MODIFIED) { 0:[BLOB]:STRING=[ ]:String 1:[BOOLEAN]:BOOLEAN=[true]:Boolean 2:[BYTE]:INT=[97]:Integer 3:[CHAR]:STRING=[A]:String 4:[DATE]:DATE=[2014-12-25]:Date 5:[DATETIME]:DATETIME=[Thu Dec 25 13:41:57 UZT 2014]:Date 6:[DOUBLE]:DOUBLE=[2048.1024]:Double 7:[FLOAT]:FLOAT=[4096.32]:Float 8:[INT]:INT=[8192]:Integer 9:[LONG]:LONG=[1152921504606846976]:Long 10:[SHORT]:INT=[32]:Integer 11:[BIG_DECIMAL]:BIG_DECIMAL=[123.456789]:BigDecimal 12:[BIG_INTEGER]:BIG_DECIMAL=[123456]:BigDecimal 13:[STRING]:STRING=[A basic numeric constant is considered an integer.]:String 14:[TIME]:TIME=[13:41:57]:Time 15:[Array-1]:ARRAY of STRING=[[J, 234, 456.789, A]]:ArrayValue 16:[Array-2]:ARRAY of STRING=[[J, A, V, A]]:ArrayValue 17:[Array-3]:ARRAY of DOUBLE=[[123.123, 345.345, 456.456, 555.678]]:ArrayValue } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[BLOB]:STRING=[null] 1:[BOOLEAN]:BOOLEAN=[null] 2:[BYTE]:INT=[null] 3:[CHAR]:STRING=[null] 4:[DATE]:DATE=[null] 5:[DATETIME]:DATETIME=[null] 6:[DOUBLE]:DOUBLE=[null] 7:[FLOAT]:FLOAT=[null] 8:[INT]:INT=[null] 9:[LONG]:LONG=[null] 10:[SHORT]:INT=[null] 11:[BIG_DECIMAL]:BIG_DECIMAL=[null] 12:[BIG_INTEGER]:BIG_DECIMAL=[null] 13:[STRING]:STRING=[null] 14:[TIME]:TIME=[null] 15:[Array-1]:ARRAY of STRING=[[null]]:ArrayValue 16:[Array-2]:ARRAY of STRING=[[null]]:ArrayValue 17:[Array-3]:ARRAY of DOUBLE=[[null]]:ArrayValue } ----------------------------------------------- 2 records