Read Selected Fields from a Parquet File
Updated: Jun 29, 2023
In this example, you learn how to read selected fields from Parquet files. Parquet is a columnar storage file format used for efficient data processing and analytics. It provides a streamlined way to extract specific fields of interest from Parquet files, optimizing data retrieval and reducing unnecessary processing.
Input File
{"id":4,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0} {"id":5,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000} {"id":6,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0} {"id":7,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000} {"id":2,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0} {"id":3,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000} {"id":0,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0} {"id":1,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
Java Code Listing
package com.northconcepts.datapipeline.examples.parquet; import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import java.io.File; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.parquet.ParquetDataReader; public class ReadSelectedFieldsFromAParquetFile { public static void main(String[] args) { // Prepare schema with fields to be read from file, ignoring other fields MessageType schema = new MessageType("input_schema", Types.optional(INT32).named("id"), Types.optional(INT32).named("int_col"), Types.optional(DOUBLE).named("double_col"), Types.optional(BINARY).as(stringType()).named("date_string_col"), Types.optional(BOOLEAN).named("bool_col") ); /* Exclude the following fields from being read: optional int32 tinyint_col; optional int32 smallint_col; optional int64 bigint_col; optional float float_col; optional binary string_col; optional int96 timestamp_col; */ ParquetDataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet")) .setSchema(schema) // Remove this line to see all fields (and in original arrangement) ; Job.run(reader, new StreamWriter(System.out)); System.out.println(reader.getSchema()); } }
Code Walkthrough
- A
MessageType
instance (defined asschema
here) is configured to only include the specified fields from the Parquet file. - Then, a
ParquetDataReader
instance is initialized with the input fileread_parquet_file.parquet
and the schema defined above. Job.run()
method is invoked with the reader and aStreamWriter
which writes the output to the console.- Lastly,
getSchema()
method is invoked to display the field types included in the schema.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[id]:INT=[4]:Integer 1:[int_col]:INT=[0]:Integer 2:[double_col]:DOUBLE=[0.0]:Double 3:[date_string_col]:STRING=[03/01/09]:String 4:[bool_col]:BOOLEAN=[true]:Boolean } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[id]:INT=[5]:Integer 1:[int_col]:INT=[1]:Integer 2:[double_col]:DOUBLE=[10.1]:Double 3:[date_string_col]:STRING=[03/01/09]:String 4:[bool_col]:BOOLEAN=[false]:Boolean } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[id]:INT=[6]:Integer 1:[int_col]:INT=[0]:Integer 2:[double_col]:DOUBLE=[0.0]:Double 3:[date_string_col]:STRING=[04/01/09]:String 4:[bool_col]:BOOLEAN=[true]:Boolean } ----------------------------------------------- 3 - Record (MODIFIED) { 0:[id]:INT=[7]:Integer 1:[int_col]:INT=[1]:Integer 2:[double_col]:DOUBLE=[10.1]:Double 3:[date_string_col]:STRING=[04/01/09]:String 4:[bool_col]:BOOLEAN=[false]:Boolean } ----------------------------------------------- 4 - Record (MODIFIED) { 0:[id]:INT=[2]:Integer 1:[int_col]:INT=[0]:Integer 2:[double_col]:DOUBLE=[0.0]:Double 3:[date_string_col]:STRING=[02/01/09]:String 4:[bool_col]:BOOLEAN=[true]:Boolean } ----------------------------------------------- 5 - Record (MODIFIED) { 0:[id]:INT=[3]:Integer 1:[int_col]:INT=[1]:Integer 2:[double_col]:DOUBLE=[10.1]:Double 3:[date_string_col]:STRING=[02/01/09]:String 4:[bool_col]:BOOLEAN=[false]:Boolean } ----------------------------------------------- 6 - Record (MODIFIED) { 0:[id]:INT=[0]:Integer 1:[int_col]:INT=[0]:Integer 2:[double_col]:DOUBLE=[0.0]:Double 3:[date_string_col]:STRING=[01/01/09]:String 4:[bool_col]:BOOLEAN=[true]:Boolean } ----------------------------------------------- 7 - Record (MODIFIED) { 0:[id]:INT=[1]:Integer 1:[int_col]:INT=[1]:Integer 2:[double_col]:DOUBLE=[10.1]:Double 3:[date_string_col]:STRING=[01/01/09]:String 4:[bool_col]:BOOLEAN=[false]:Boolean } ----------------------------------------------- 8 records 10:57:54,639 DEBUG [main] datapipeline:661 - job::Success message input_schema { optional int32 id; optional int32 int_col; optional double double_col; optional binary date_string_col (STRING); optional boolean bool_col; }