Read and Write Unsigned Numbers in Parquet Files

Updated: Jan 25, 2023

In this example, you are going to learn that how to read and write unsigned and signed numbers with Parquet.

Java Code listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;
import java.math.BigInteger;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DebugReader;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class ReadAndWriteUnsignedNumbersInParquet {

    private static final File PARQUET_FILE = new File("example/data/output/ReadAndWriteUnsignedNumbersInParquet.parquet");

    public static void main(String[] args) {
        System.out.println("============================================================");
        System.out.println("Write records to a parquet file");
        System.out.println("============================================================");

        String schema = "message schema {" +
                "  optional int32 unsigned_field_8 (INTEGER(8,false)); " +
                "  optional int32 signed_field_8 (INTEGER(8,true)); " +
                "  optional int32 unsigned_field_16 (INTEGER(16,false)); " +
                "  optional int32 signed_field_16 (INTEGER(16,true)); " +
                "  optional int32 unsigned_field_32 (INTEGER(32,false)); " +
                "  optional int32 signed_field_32 (INTEGER(32,true)); " +
                "  optional int64 unsigned_field_64 (INTEGER(64,false)); " +
                "  optional int64 signed_field_64 (INTEGER(64,true)); " +
                "}";

        DataReader reader = new MemoryReader(createRecordList());
        reader = new DebugReader(reader);
        ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE);
        writer.setSchema(schema);
        Job.run(reader, writer);

        System.out.println("============================================================");
        System.out.println("Prepared Schema");
        System.out.println("============================================================");

        System.out.println(writer.getSchema());

        System.out.println("============================================================");
        System.out.println("Read the parquet file");
        System.out.println("============================================================");

        Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out));

    }

    public static RecordList createRecordList() {
        RecordList recordList = new RecordList();

        Record record1 = new Record();
        record1.setField("unsigned_field_8", 255);
        record1.setField("signed_field_8", 127);
        record1.setField("unsigned_field_16", 65535);
        record1.setField("signed_field_16", 32767);
        record1.setField("unsigned_field_32", 4294967295L);
        record1.setField("signed_field_32", 2147483647);
        record1.setField("unsigned_field_64", new BigInteger("18446744073709551615"));
        record1.setField("signed_field_64", 9223372036854775807L);


        Record record2 = new Record();
        record2.setField("unsigned_field_8", 127);
        record2.setField("signed_field_8", -127);
        record2.setField("unsigned_field_16", 65535);
        record2.setField("signed_field_16", -32767);
        record2.setField("unsigned_field_32", 4294967295L);
        record2.setField("signed_field_32", -2147483647);
        record2.setField("unsigned_field_64", new BigInteger("18446744073709551615"));
        record2.setField("signed_field_64", -9223372036854775807L);

        recordList.add(record1, record2);
        return recordList;
    }
}

Code walkthrough

  1. Create Parquet schema with 8 columns having signed/unsigned and different bits of numbers.
  2. There are 2 records created for MemoryReader.
  3. Create an instance of ParquetWriterReader to write records to Parquet file.
  4. Job.run(reader, writer) is used to transfer the data from the reader to the write.. This will write the data to the Parquet file.
  5. Print Parquet schema
  6. Read records from parquet and write them to console.

Console Output

============================================================
Write records to a parquet file
============================================================
15:48:23,059 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
15:48:23,518 DEBUG [main] datapipeline:615 - Job[1,job-1,Wed Jan 25 15:48:23 IST 2023]::Start
==========================================================
debug - 0
------------------------------------------
Record (MODIFIED) {
    0:[unsigned_field_8]:INT=[255]:Integer
    1:[signed_field_8]:INT=[127]:Integer
    2:[unsigned_field_16]:INT=[65535]:Integer
    3:[signed_field_16]:INT=[32767]:Integer
    4:[unsigned_field_32]:LONG=[4294967295]:Long
    5:[signed_field_32]:INT=[2147483647]:Integer
    6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
    7:[signed_field_64]:LONG=[9223372036854775807]:Long
}

==========================================================
debug - 1
------------------------------------------
Record (MODIFIED) {
    0:[unsigned_field_8]:INT=[127]:Integer
    1:[signed_field_8]:INT=[-127]:Integer
    2:[unsigned_field_16]:INT=[65535]:Integer
    3:[signed_field_16]:INT=[-32767]:Integer
    4:[unsigned_field_32]:LONG=[4294967295]:Long
    5:[signed_field_32]:INT=[-2147483647]:Integer
    6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
    7:[signed_field_64]:LONG=[-9223372036854775807]:Long
}

15:48:25,712 DEBUG [main] datapipeline:661 - job::Success
============================================================
Prepared Schema
============================================================
message schema {
  optional int32 unsigned_field_8 (INTEGER(8,false));
  optional int32 signed_field_8 (INTEGER(8,true));
  optional int32 unsigned_field_16 (INTEGER(16,false));
  optional int32 signed_field_16 (INTEGER(16,true));
  optional int32 unsigned_field_32 (INTEGER(32,false));
  optional int32 signed_field_32 (INTEGER(32,true));
  optional int64 unsigned_field_64 (INTEGER(64,false));
  optional int64 signed_field_64 (INTEGER(64,true));
}

============================================================
Read the parquet file
============================================================
15:48:25,712 DEBUG [main] datapipeline:615 - Job[2,job-2,Wed Jan 25 15:48:25 IST 2023]::Start
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[unsigned_field_8]:LONG=[255]:Long
    1:[signed_field_8]:INT=[127]:Integer
    2:[unsigned_field_16]:LONG=[65535]:Long
    3:[signed_field_16]:INT=[32767]:Integer
    4:[unsigned_field_32]:LONG=[4294967295]:Long
    5:[signed_field_32]:INT=[2147483647]:Integer
    6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
    7:[signed_field_64]:LONG=[9223372036854775807]:Long
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[unsigned_field_8]:LONG=[127]:Long
    1:[signed_field_8]:INT=[-127]:Integer
    2:[unsigned_field_16]:LONG=[65535]:Long
    3:[signed_field_16]:INT=[-32767]:Integer
    4:[unsigned_field_32]:LONG=[4294967295]:Long
    5:[signed_field_32]:INT=[-2147483647]:Integer
    6:[unsigned_field_64]:BIG_INTEGER=[18446744073709551615]:BigInteger
    7:[signed_field_64]:LONG=[-9223372036854775807]:Long
}

-----------------------------------------------
2 records
15:48:25,852 DEBUG [main] datapipeline:661 - job::Success
Mobile Analytics