Read Selected Fields from a Parquet File
Updated: Jun 29, 2023
In this example, you learn how to read selected fields from Parquet files. Parquet is a columnar storage file format used for efficient data processing and analytics. It provides a streamlined way to extract specific fields of interest from Parquet files, optimizing data retrieval and reducing unnecessary processing.
Input File
{"id":4,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":5,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
{"id":6,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":7,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
{"id":2,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":3,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
{"id":0,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
{"id":1,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
Java Code Listing
package com.northconcepts.datapipeline.examples.parquet;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import java.io.File;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
public class ReadSelectedFieldsFromAParquetFile {
public static void main(String[] args) {
// Prepare schema with fields to be read from file, ignoring other fields
MessageType schema = new MessageType("input_schema",
Types.optional(INT32).named("id"),
Types.optional(INT32).named("int_col"),
Types.optional(DOUBLE).named("double_col"),
Types.optional(BINARY).as(stringType()).named("date_string_col"),
Types.optional(BOOLEAN).named("bool_col")
);
/*
Exclude the following fields from being read:
optional int32 tinyint_col;
optional int32 smallint_col;
optional int64 bigint_col;
optional float float_col;
optional binary string_col;
optional int96 timestamp_col;
*/
ParquetDataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet"))
.setSchema(schema) // Remove this line to see all fields (and in original arrangement)
;
Job.run(reader, new StreamWriter(System.out));
System.out.println(reader.getSchema());
}
}
Code Walkthrough
- A
MessageTypeinstance (defined asschemahere) is configured to only include the specified fields from the Parquet file. - Then, a
ParquetDataReaderinstance is initialized with the input fileread_parquet_file.parquetand the schema defined above. Job.run()method is invoked with the reader and aStreamWriterwhich writes the output to the console.- Lastly,
getSchema()method is invoked to display the field types included in the schema.
Console Output
-----------------------------------------------
0 - Record (MODIFIED) {
0:[id]:INT=[4]:Integer
1:[int_col]:INT=[0]:Integer
2:[double_col]:DOUBLE=[0.0]:Double
3:[date_string_col]:STRING=[03/01/09]:String
4:[bool_col]:BOOLEAN=[true]:Boolean
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[id]:INT=[5]:Integer
1:[int_col]:INT=[1]:Integer
2:[double_col]:DOUBLE=[10.1]:Double
3:[date_string_col]:STRING=[03/01/09]:String
4:[bool_col]:BOOLEAN=[false]:Boolean
}
-----------------------------------------------
2 - Record (MODIFIED) {
0:[id]:INT=[6]:Integer
1:[int_col]:INT=[0]:Integer
2:[double_col]:DOUBLE=[0.0]:Double
3:[date_string_col]:STRING=[04/01/09]:String
4:[bool_col]:BOOLEAN=[true]:Boolean
}
-----------------------------------------------
3 - Record (MODIFIED) {
0:[id]:INT=[7]:Integer
1:[int_col]:INT=[1]:Integer
2:[double_col]:DOUBLE=[10.1]:Double
3:[date_string_col]:STRING=[04/01/09]:String
4:[bool_col]:BOOLEAN=[false]:Boolean
}
-----------------------------------------------
4 - Record (MODIFIED) {
0:[id]:INT=[2]:Integer
1:[int_col]:INT=[0]:Integer
2:[double_col]:DOUBLE=[0.0]:Double
3:[date_string_col]:STRING=[02/01/09]:String
4:[bool_col]:BOOLEAN=[true]:Boolean
}
-----------------------------------------------
5 - Record (MODIFIED) {
0:[id]:INT=[3]:Integer
1:[int_col]:INT=[1]:Integer
2:[double_col]:DOUBLE=[10.1]:Double
3:[date_string_col]:STRING=[02/01/09]:String
4:[bool_col]:BOOLEAN=[false]:Boolean
}
-----------------------------------------------
6 - Record (MODIFIED) {
0:[id]:INT=[0]:Integer
1:[int_col]:INT=[0]:Integer
2:[double_col]:DOUBLE=[0.0]:Double
3:[date_string_col]:STRING=[01/01/09]:String
4:[bool_col]:BOOLEAN=[true]:Boolean
}
-----------------------------------------------
7 - Record (MODIFIED) {
0:[id]:INT=[1]:Integer
1:[int_col]:INT=[1]:Integer
2:[double_col]:DOUBLE=[10.1]:Double
3:[date_string_col]:STRING=[01/01/09]:String
4:[bool_col]:BOOLEAN=[false]:Boolean
}
-----------------------------------------------
8 records
10:57:54,639 DEBUG [main] datapipeline:661 - job::Success
message input_schema {
optional int32 id;
optional int32 int_col;
optional double double_col;
optional binary date_string_col (STRING);
optional boolean bool_col;
}
