Read and Write Unsigned Numbers in Parquet Files
Updated: Jan 25, 2023
In this example, you are going to learn that how to read and write unsigned and signed numbers with Parquet.
Java Code listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import java.math.BigInteger; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DebugReader; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.RecordList; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.memory.MemoryReader; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class ReadAndWriteUnsignedNumbersInParquet { private static final File PARQUET_FILE = new File("example/data/output/ReadAndWriteUnsignedNumbersInParquet.parquet"); public static void main(String[] args) { System.out.println("============================================================"); System.out.println("Write records to a parquet file"); System.out.println("============================================================"); String schema = "message schema {" + " optional int32 unsigned_field_8 (INTEGER(8,false)); " + " optional int32 signed_field_8 (INTEGER(8,true)); " + " optional int32 unsigned_field_16 (INTEGER(16,false)); " + " optional int32 signed_field_16 (INTEGER(16,true)); " + " optional int32 unsigned_field_32 (INTEGER(32,false)); " + " optional int32 signed_field_32 (INTEGER(32,true)); " + " optional int64 unsigned_field_64 (INTEGER(64,false)); " + " optional int64 signed_field_64 (INTEGER(64,true)); " + "}"; DataReader reader = new MemoryReader(createRecordList()); reader = new DebugReader(reader); ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE); writer.setSchema(schema); Job.run(reader, writer); System.out.println("============================================================"); System.out.println("Prepared Schema"); System.out.println("============================================================"); System.out.println(writer.getSchema()); System.out.println("============================================================"); System.out.println("Read the parquet file"); System.out.println("============================================================"); Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out)); } public static RecordList createRecordList() { RecordList recordList = new RecordList(); Record record1 = new Record(); record1.setField("unsigned_field_8", 255); record1.setField("signed_field_8", 127); record1.setField("unsigned_field_16", 65535); record1.setField("signed_field_16", 32767); record1.setField("unsigned_field_32", 4294967295L); record1.setField("signed_field_32", 2147483647); record1.setField("unsigned_field_64", new BigInteger("18446744073709551615")); record1.setField("signed_field_64", 9223372036854775807L); Record record2 = new Record(); record2.setField("unsigned_field_8", 127); record2.setField("signed_field_8", -127); record2.setField("unsigned_field_16", 65535); record2.setField("signed_field_16", -32767); record2.setField("unsigned_field_32", 4294967295L); record2.setField("signed_field_32", -2147483647); record2.setField("unsigned_field_64", new BigInteger("18446744073709551615")); record2.setField("signed_field_64", -9223372036854775807L); recordList.add(record1, record2); return recordList; } }
Code walkthrough
- Create Parquet schema with 8 columns having signed/unsigned and different bits of numbers.
- There are 2 records created for MemoryReader.
- Create an instance of ParquetWriterReader to write records to Parquet file.
- Job.run(reader, writer) is used to transfer the data from the
reader
to the write.. This will write the data to the Parquet file. - Print Parquet schema
- Read records from parquet and write them to console.
Console Output
============================================================ Write records to a parquet file ============================================================ 15:48:23,059 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. 15:48:23,518 DEBUG [main] datapipeline:615 - Job[1,job-1,Wed Jan 25 15:48:23 IST 2023]::Start ========================================================== debug - 0 ------------------------------------------ Record (MODIFIED) { 0:[unsigned_field_8]:INT=[255]:Integer 1:[signed_field_8]:INT=[127]:Integer 2:[unsigned_field_16]:INT=[65535]:Integer 3:[signed_field_16]:INT=[32767]:Integer 4:[unsigned_field_32]:LONG=[4294967295]:Long 5:[signed_field_32]:INT=[2147483647]:Integer 6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger 7:[signed_field_64]:LONG=[9223372036854775807]:Long } ========================================================== debug - 1 ------------------------------------------ Record (MODIFIED) { 0:[unsigned_field_8]:INT=[127]:Integer 1:[signed_field_8]:INT=[-127]:Integer 2:[unsigned_field_16]:INT=[65535]:Integer 3:[signed_field_16]:INT=[-32767]:Integer 4:[unsigned_field_32]:LONG=[4294967295]:Long 5:[signed_field_32]:INT=[-2147483647]:Integer 6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger 7:[signed_field_64]:LONG=[-9223372036854775807]:Long } 15:48:25,712 DEBUG [main] datapipeline:661 - job::Success ============================================================ Prepared Schema ============================================================ message schema { optional int32 unsigned_field_8 (INTEGER(8,false)); optional int32 signed_field_8 (INTEGER(8,true)); optional int32 unsigned_field_16 (INTEGER(16,false)); optional int32 signed_field_16 (INTEGER(16,true)); optional int32 unsigned_field_32 (INTEGER(32,false)); optional int32 signed_field_32 (INTEGER(32,true)); optional int64 unsigned_field_64 (INTEGER(64,false)); optional int64 signed_field_64 (INTEGER(64,true)); } ============================================================ Read the parquet file ============================================================ 15:48:25,712 DEBUG [main] datapipeline:615 - Job[2,job-2,Wed Jan 25 15:48:25 IST 2023]::Start ----------------------------------------------- 0 - Record (MODIFIED) { 0:[unsigned_field_8]:LONG=[255]:Long 1:[signed_field_8]:INT=[127]:Integer 2:[unsigned_field_16]:LONG=[65535]:Long 3:[signed_field_16]:INT=[32767]:Integer 4:[unsigned_field_32]:LONG=[4294967295]:Long 5:[signed_field_32]:INT=[2147483647]:Integer 6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger 7:[signed_field_64]:LONG=[9223372036854775807]:Long } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[unsigned_field_8]:LONG=[127]:Long 1:[signed_field_8]:INT=[-127]:Integer 2:[unsigned_field_16]:LONG=[65535]:Long 3:[signed_field_16]:INT=[-32767]:Integer 4:[unsigned_field_32]:LONG=[4294967295]:Long 5:[signed_field_32]:INT=[-2147483647]:Integer 6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger 7:[signed_field_64]:LONG=[-9223372036854775807]:Long } ----------------------------------------------- 2 records 15:48:25,852 DEBUG [main] datapipeline:661 - job::Success