Read and Write Unsigned Numbers in Parquet Files
Updated: Jan 25, 2023
In this example, you are going to learn that how to read and write unsigned and signed numbers with Parquet.
Java Code listing
package com.northconcepts.datapipeline.examples.parquet;
import java.io.File;
import java.math.BigInteger;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DebugReader;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;
public class ReadAndWriteUnsignedNumbersInParquet {
private static final File PARQUET_FILE = new File("example/data/output/ReadAndWriteUnsignedNumbersInParquet.parquet");
public static void main(String[] args) {
System.out.println("============================================================");
System.out.println("Write records to a parquet file");
System.out.println("============================================================");
String schema = "message schema {" +
" optional int32 unsigned_field_8 (INTEGER(8,false)); " +
" optional int32 signed_field_8 (INTEGER(8,true)); " +
" optional int32 unsigned_field_16 (INTEGER(16,false)); " +
" optional int32 signed_field_16 (INTEGER(16,true)); " +
" optional int32 unsigned_field_32 (INTEGER(32,false)); " +
" optional int32 signed_field_32 (INTEGER(32,true)); " +
" optional int64 unsigned_field_64 (INTEGER(64,false)); " +
" optional int64 signed_field_64 (INTEGER(64,true)); " +
"}";
DataReader reader = new MemoryReader(createRecordList());
reader = new DebugReader(reader);
ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE);
writer.setSchema(schema);
Job.run(reader, writer);
System.out.println("============================================================");
System.out.println("Prepared Schema");
System.out.println("============================================================");
System.out.println(writer.getSchema());
System.out.println("============================================================");
System.out.println("Read the parquet file");
System.out.println("============================================================");
Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out));
}
public static RecordList createRecordList() {
RecordList recordList = new RecordList();
Record record1 = new Record();
record1.setField("unsigned_field_8", 255);
record1.setField("signed_field_8", 127);
record1.setField("unsigned_field_16", 65535);
record1.setField("signed_field_16", 32767);
record1.setField("unsigned_field_32", 4294967295L);
record1.setField("signed_field_32", 2147483647);
record1.setField("unsigned_field_64", new BigInteger("18446744073709551615"));
record1.setField("signed_field_64", 9223372036854775807L);
Record record2 = new Record();
record2.setField("unsigned_field_8", 127);
record2.setField("signed_field_8", -127);
record2.setField("unsigned_field_16", 65535);
record2.setField("signed_field_16", -32767);
record2.setField("unsigned_field_32", 4294967295L);
record2.setField("signed_field_32", -2147483647);
record2.setField("unsigned_field_64", new BigInteger("18446744073709551615"));
record2.setField("signed_field_64", -9223372036854775807L);
recordList.add(record1, record2);
return recordList;
}
}
Code walkthrough
- Create Parquet schema with 8 columns having signed/unsigned and different bits of numbers.
- There are 2 records created for MemoryReader.
- Create an instance of ParquetWriterReader to write records to Parquet file.
- Job.run(reader, writer) is used to transfer the data from the
readerto the write.. This will write the data to the Parquet file. - Print Parquet schema
- Read records from parquet and write them to console.
Console Output
============================================================
Write records to a parquet file
============================================================
15:48:23,059 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
15:48:23,518 DEBUG [main] datapipeline:615 - Job[1,job-1,Wed Jan 25 15:48:23 IST 2023]::Start
==========================================================
debug - 0
------------------------------------------
Record (MODIFIED) {
0:[unsigned_field_8]:INT=[255]:Integer
1:[signed_field_8]:INT=[127]:Integer
2:[unsigned_field_16]:INT=[65535]:Integer
3:[signed_field_16]:INT=[32767]:Integer
4:[unsigned_field_32]:LONG=[4294967295]:Long
5:[signed_field_32]:INT=[2147483647]:Integer
6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
7:[signed_field_64]:LONG=[9223372036854775807]:Long
}
==========================================================
debug - 1
------------------------------------------
Record (MODIFIED) {
0:[unsigned_field_8]:INT=[127]:Integer
1:[signed_field_8]:INT=[-127]:Integer
2:[unsigned_field_16]:INT=[65535]:Integer
3:[signed_field_16]:INT=[-32767]:Integer
4:[unsigned_field_32]:LONG=[4294967295]:Long
5:[signed_field_32]:INT=[-2147483647]:Integer
6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
7:[signed_field_64]:LONG=[-9223372036854775807]:Long
}
15:48:25,712 DEBUG [main] datapipeline:661 - job::Success
============================================================
Prepared Schema
============================================================
message schema {
optional int32 unsigned_field_8 (INTEGER(8,false));
optional int32 signed_field_8 (INTEGER(8,true));
optional int32 unsigned_field_16 (INTEGER(16,false));
optional int32 signed_field_16 (INTEGER(16,true));
optional int32 unsigned_field_32 (INTEGER(32,false));
optional int32 signed_field_32 (INTEGER(32,true));
optional int64 unsigned_field_64 (INTEGER(64,false));
optional int64 signed_field_64 (INTEGER(64,true));
}
============================================================
Read the parquet file
============================================================
15:48:25,712 DEBUG [main] datapipeline:615 - Job[2,job-2,Wed Jan 25 15:48:25 IST 2023]::Start
-----------------------------------------------
0 - Record (MODIFIED) {
0:[unsigned_field_8]:LONG=[255]:Long
1:[signed_field_8]:INT=[127]:Integer
2:[unsigned_field_16]:LONG=[65535]:Long
3:[signed_field_16]:INT=[32767]:Integer
4:[unsigned_field_32]:LONG=[4294967295]:Long
5:[signed_field_32]:INT=[2147483647]:Integer
6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
7:[signed_field_64]:LONG=[9223372036854775807]:Long
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[unsigned_field_8]:LONG=[127]:Long
1:[signed_field_8]:INT=[-127]:Integer
2:[unsigned_field_16]:LONG=[65535]:Long
3:[signed_field_16]:INT=[-32767]:Integer
4:[unsigned_field_32]:LONG=[4294967295]:Long
5:[signed_field_32]:INT=[-2147483647]:Integer
6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
7:[signed_field_64]:LONG=[-9223372036854775807]:Long
}
-----------------------------------------------
2 records
15:48:25,852 DEBUG [main] datapipeline:661 - job::Success
