Read Selected Fields from a Parquet File

Updated: Jun 29, 2023

In this example, you learn how to read selected fields from Parquet files.  Parquet is a columnar storage file format used for efficient data processing and analytics. It provides a streamlined way to extract specific fields of interest from Parquet files, optimizing data retrieval and reducing unnecessary processing.

 

Input File

{"id":4,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":5,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
{"id":6,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":7,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
{"id":2,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":3,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
{"id":0,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":1,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}

 

Java Code Listing

package com.northconcepts.datapipeline.examples.parquet;

import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;

import java.io.File;

import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;

import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;

public class ReadSelectedFieldsFromAParquetFile {

    public static void main(String[] args) {
        // Prepare schema with fields to be read from file, ignoring other fields
        MessageType schema = new MessageType("input_schema",
                Types.optional(INT32).named("id"),
                Types.optional(INT32).named("int_col"),
                Types.optional(DOUBLE).named("double_col"),
                Types.optional(BINARY).as(stringType()).named("date_string_col"),
                Types.optional(BOOLEAN).named("bool_col")
                );
/* 
       Exclude the following fields from being read:
            optional int32 tinyint_col;
            optional int32 smallint_col;
            optional int64 bigint_col;
            optional float float_col;
            optional binary string_col;
            optional int96 timestamp_col;
*/
        
        ParquetDataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet"))
                .setSchema(schema)  // Remove this line to see all fields (and in original arrangement)
                ;
        Job.run(reader, new StreamWriter(System.out));

        System.out.println(reader.getSchema());
    }
}

 

Code Walkthrough

  1. A MessageType instance (defined as schema here) is configured to only include the specified fields from the Parquet file.
  2. Then, a ParquetDataReader instance is initialized with the input file read_parquet_file.parquet and the schema defined above.
  3. Job.run() method is invoked with the reader and a StreamWriter which writes the output to the console.
  4. Lastly, getSchema() method is invoked to display the field types included in the schema.

 

Console Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[id]:INT=[4]:Integer
    1:[int_col]:INT=[0]:Integer
    2:[double_col]:DOUBLE=[0.0]:Double
    3:[date_string_col]:STRING=[03/01/09]:String
    4:[bool_col]:BOOLEAN=[true]:Boolean
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[id]:INT=[5]:Integer
    1:[int_col]:INT=[1]:Integer
    2:[double_col]:DOUBLE=[10.1]:Double
    3:[date_string_col]:STRING=[03/01/09]:String
    4:[bool_col]:BOOLEAN=[false]:Boolean
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[id]:INT=[6]:Integer
    1:[int_col]:INT=[0]:Integer
    2:[double_col]:DOUBLE=[0.0]:Double
    3:[date_string_col]:STRING=[04/01/09]:String
    4:[bool_col]:BOOLEAN=[true]:Boolean
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[id]:INT=[7]:Integer
    1:[int_col]:INT=[1]:Integer
    2:[double_col]:DOUBLE=[10.1]:Double
    3:[date_string_col]:STRING=[04/01/09]:String
    4:[bool_col]:BOOLEAN=[false]:Boolean
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[id]:INT=[2]:Integer
    1:[int_col]:INT=[0]:Integer
    2:[double_col]:DOUBLE=[0.0]:Double
    3:[date_string_col]:STRING=[02/01/09]:String
    4:[bool_col]:BOOLEAN=[true]:Boolean
}

-----------------------------------------------
5 - Record (MODIFIED) {
    0:[id]:INT=[3]:Integer
    1:[int_col]:INT=[1]:Integer
    2:[double_col]:DOUBLE=[10.1]:Double
    3:[date_string_col]:STRING=[02/01/09]:String
    4:[bool_col]:BOOLEAN=[false]:Boolean
}

-----------------------------------------------
6 - Record (MODIFIED) {
    0:[id]:INT=[0]:Integer
    1:[int_col]:INT=[0]:Integer
    2:[double_col]:DOUBLE=[0.0]:Double
    3:[date_string_col]:STRING=[01/01/09]:String
    4:[bool_col]:BOOLEAN=[true]:Boolean
}

-----------------------------------------------
7 - Record (MODIFIED) {
    0:[id]:INT=[1]:Integer
    1:[int_col]:INT=[1]:Integer
    2:[double_col]:DOUBLE=[10.1]:Double
    3:[date_string_col]:STRING=[01/01/09]:String
    4:[bool_col]:BOOLEAN=[false]:Boolean
}

-----------------------------------------------
8 records
10:57:54,639 DEBUG [main] datapipeline:661 - job::Success
message input_schema {
  optional int32 id;
  optional int32 int_col;
  optional double double_col;
  optional binary date_string_col (STRING);
  optional boolean bool_col;
}

Mobile Analytics