Read Selected Fields from a Parquet File
Updated: Jun 29, 2023
In this example, you learn how to read selected fields from Parquet files. Parquet is a columnar storage file format used for efficient data processing and analytics. It provides a streamlined way to extract specific fields of interest from Parquet files, optimizing data retrieval and reducing unnecessary processing.
Input File
- {"id":4,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
- {"id":5,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
- {"id":6,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
- {"id":7,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
- {"id":2,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
- {"id":3,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
- {"id":0,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
- {"id":1,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
Java Code Listing
- package com.northconcepts.datapipeline.examples.parquet;
- import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
- import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
- import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
- import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
- import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
- import java.io.File;
- import org.apache.parquet.schema.MessageType;
- import org.apache.parquet.schema.Types;
- import com.northconcepts.datapipeline.core.StreamWriter;
- import com.northconcepts.datapipeline.job.Job;
- import com.northconcepts.datapipeline.parquet.ParquetDataReader;
- public class ReadSelectedFieldsFromAParquetFile {
- public static void main(String[] args) {
- // Prepare schema with fields to be read from file, ignoring other fields
- MessageType schema = new MessageType("input_schema",
- Types.optional(INT32).named("id"),
- Types.optional(INT32).named("int_col"),
- Types.optional(DOUBLE).named("double_col"),
- Types.optional(BINARY).as(stringType()).named("date_string_col"),
- Types.optional(BOOLEAN).named("bool_col")
- );
- /*
- Exclude the following fields from being read:
- optional int32 tinyint_col;
- optional int32 smallint_col;
- optional int64 bigint_col;
- optional float float_col;
- optional binary string_col;
- optional int96 timestamp_col;
- */
- ParquetDataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet"))
- .setSchema(schema) // Remove this line to see all fields (and in original arrangement)
- ;
- Job.run(reader, new StreamWriter(System.out));
- System.out.println(reader.getSchema());
- }
- }
Code Walkthrough
- A
MessageType
instance (defined asschema
here) is configured to only include the specified fields from the Parquet file. - Then, a
ParquetDataReader
instance is initialized with the input fileread_parquet_file.parquet
and the schema defined above. Job.run()
method is invoked with the reader and aStreamWriter
which writes the output to the console.- Lastly,
getSchema()
method is invoked to display the field types included in the schema.
Console Output
- -----------------------------------------------
- 0 - Record (MODIFIED) {
- 0:[id]:INT=[4]:Integer
- 1:[int_col]:INT=[0]:Integer
- 2:[double_col]:DOUBLE=[0.0]:Double
- 3:[date_string_col]:STRING=[03/01/09]:String
- 4:[bool_col]:BOOLEAN=[true]:Boolean
- }
- -----------------------------------------------
- 1 - Record (MODIFIED) {
- 0:[id]:INT=[5]:Integer
- 1:[int_col]:INT=[1]:Integer
- 2:[double_col]:DOUBLE=[10.1]:Double
- 3:[date_string_col]:STRING=[03/01/09]:String
- 4:[bool_col]:BOOLEAN=[false]:Boolean
- }
- -----------------------------------------------
- 2 - Record (MODIFIED) {
- 0:[id]:INT=[6]:Integer
- 1:[int_col]:INT=[0]:Integer
- 2:[double_col]:DOUBLE=[0.0]:Double
- 3:[date_string_col]:STRING=[04/01/09]:String
- 4:[bool_col]:BOOLEAN=[true]:Boolean
- }
- -----------------------------------------------
- 3 - Record (MODIFIED) {
- 0:[id]:INT=[7]:Integer
- 1:[int_col]:INT=[1]:Integer
- 2:[double_col]:DOUBLE=[10.1]:Double
- 3:[date_string_col]:STRING=[04/01/09]:String
- 4:[bool_col]:BOOLEAN=[false]:Boolean
- }
- -----------------------------------------------
- 4 - Record (MODIFIED) {
- 0:[id]:INT=[2]:Integer
- 1:[int_col]:INT=[0]:Integer
- 2:[double_col]:DOUBLE=[0.0]:Double
- 3:[date_string_col]:STRING=[02/01/09]:String
- 4:[bool_col]:BOOLEAN=[true]:Boolean
- }
- -----------------------------------------------
- 5 - Record (MODIFIED) {
- 0:[id]:INT=[3]:Integer
- 1:[int_col]:INT=[1]:Integer
- 2:[double_col]:DOUBLE=[10.1]:Double
- 3:[date_string_col]:STRING=[02/01/09]:String
- 4:[bool_col]:BOOLEAN=[false]:Boolean
- }
- -----------------------------------------------
- 6 - Record (MODIFIED) {
- 0:[id]:INT=[0]:Integer
- 1:[int_col]:INT=[0]:Integer
- 2:[double_col]:DOUBLE=[0.0]:Double
- 3:[date_string_col]:STRING=[01/01/09]:String
- 4:[bool_col]:BOOLEAN=[true]:Boolean
- }
- -----------------------------------------------
- 7 - Record (MODIFIED) {
- 0:[id]:INT=[1]:Integer
- 1:[int_col]:INT=[1]:Integer
- 2:[double_col]:DOUBLE=[10.1]:Double
- 3:[date_string_col]:STRING=[01/01/09]:String
- 4:[bool_col]:BOOLEAN=[false]:Boolean
- }
- -----------------------------------------------
- 8 records
- 10:57:54,639 DEBUG [main] datapipeline:661 - job::Success
- message input_schema {
- optional int32 id;
- optional int32 int_col;
- optional double double_col;
- optional binary date_string_col (STRING);
- optional boolean bool_col;
- }