Read Selected Fields from a Parquet File

Updated: Jun 29, 2023

In this example, you learn how to read selected fields from Parquet files.  Parquet is a columnar storage file format used for efficient data processing and analytics. It provides a streamlined way to extract specific fields of interest from Parquet files, optimizing data retrieval and reducing unnecessary processing.

 

Input File

  1. {"id":4,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
  2. {"id":5,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,51,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
  3. {"id":6,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
  4. {"id":7,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,52,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
  5. {"id":2,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
  6. {"id":3,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,50,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}
  7. {"id":0,"bool_col":true,"tinyint_col":0,"smallint_col":0,"int_col":0,"bigint_col":0,"float_col":0,"double_col":0,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[48]},"timestamp_col":0}
  8. {"id":1,"bool_col":false,"tinyint_col":1,"smallint_col":1,"int_col":1,"bigint_col":10,"float_col":1.100000023841858,"double_col":10.1,"date_string_col":{"type":"Buffer","data":[48,49,47,48,49,47,48,57]},"string_col":{"type":"Buffer","data":[49]},"timestamp_col":60000000000}

 

Java Code Listing

  1. package com.northconcepts.datapipeline.examples.parquet;
  2.  
  3. import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
  4. import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
  5. import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
  6. import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
  7. import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
  8.  
  9. import java.io.File;
  10.  
  11. import org.apache.parquet.schema.MessageType;
  12. import org.apache.parquet.schema.Types;
  13.  
  14. import com.northconcepts.datapipeline.core.StreamWriter;
  15. import com.northconcepts.datapipeline.job.Job;
  16. import com.northconcepts.datapipeline.parquet.ParquetDataReader;
  17.  
  18. public class ReadSelectedFieldsFromAParquetFile {
  19.  
  20. public static void main(String[] args) {
  21. // Prepare schema with fields to be read from file, ignoring other fields
  22. MessageType schema = new MessageType("input_schema",
  23. Types.optional(INT32).named("id"),
  24. Types.optional(INT32).named("int_col"),
  25. Types.optional(DOUBLE).named("double_col"),
  26. Types.optional(BINARY).as(stringType()).named("date_string_col"),
  27. Types.optional(BOOLEAN).named("bool_col")
  28. );
  29. /*
  30. Exclude the following fields from being read:
  31. optional int32 tinyint_col;
  32. optional int32 smallint_col;
  33. optional int64 bigint_col;
  34. optional float float_col;
  35. optional binary string_col;
  36. optional int96 timestamp_col;
  37. */
  38. ParquetDataReader reader = new ParquetDataReader(new File("example/data/input/read_parquet_file.parquet"))
  39. .setSchema(schema) // Remove this line to see all fields (and in original arrangement)
  40. ;
  41. Job.run(reader, new StreamWriter(System.out));
  42.  
  43. System.out.println(reader.getSchema());
  44. }
  45. }
  46.  

 

Code Walkthrough

  1. A MessageType instance (defined as schema here) is configured to only include the specified fields from the Parquet file.
  2. Then, a ParquetDataReader instance is initialized with the input file read_parquet_file.parquet and the schema defined above.
  3. Job.run() method is invoked with the reader and a StreamWriter which writes the output to the console.
  4. Lastly, getSchema() method is invoked to display the field types included in the schema.

 

Console Output

  1. -----------------------------------------------
  2. 0 - Record (MODIFIED) {
  3. 0:[id]:INT=[4]:Integer
  4. 1:[int_col]:INT=[0]:Integer
  5. 2:[double_col]:DOUBLE=[0.0]:Double
  6. 3:[date_string_col]:STRING=[03/01/09]:String
  7. 4:[bool_col]:BOOLEAN=[true]:Boolean
  8. }
  9.  
  10. -----------------------------------------------
  11. 1 - Record (MODIFIED) {
  12. 0:[id]:INT=[5]:Integer
  13. 1:[int_col]:INT=[1]:Integer
  14. 2:[double_col]:DOUBLE=[10.1]:Double
  15. 3:[date_string_col]:STRING=[03/01/09]:String
  16. 4:[bool_col]:BOOLEAN=[false]:Boolean
  17. }
  18.  
  19. -----------------------------------------------
  20. 2 - Record (MODIFIED) {
  21. 0:[id]:INT=[6]:Integer
  22. 1:[int_col]:INT=[0]:Integer
  23. 2:[double_col]:DOUBLE=[0.0]:Double
  24. 3:[date_string_col]:STRING=[04/01/09]:String
  25. 4:[bool_col]:BOOLEAN=[true]:Boolean
  26. }
  27.  
  28. -----------------------------------------------
  29. 3 - Record (MODIFIED) {
  30. 0:[id]:INT=[7]:Integer
  31. 1:[int_col]:INT=[1]:Integer
  32. 2:[double_col]:DOUBLE=[10.1]:Double
  33. 3:[date_string_col]:STRING=[04/01/09]:String
  34. 4:[bool_col]:BOOLEAN=[false]:Boolean
  35. }
  36.  
  37. -----------------------------------------------
  38. 4 - Record (MODIFIED) {
  39. 0:[id]:INT=[2]:Integer
  40. 1:[int_col]:INT=[0]:Integer
  41. 2:[double_col]:DOUBLE=[0.0]:Double
  42. 3:[date_string_col]:STRING=[02/01/09]:String
  43. 4:[bool_col]:BOOLEAN=[true]:Boolean
  44. }
  45.  
  46. -----------------------------------------------
  47. 5 - Record (MODIFIED) {
  48. 0:[id]:INT=[3]:Integer
  49. 1:[int_col]:INT=[1]:Integer
  50. 2:[double_col]:DOUBLE=[10.1]:Double
  51. 3:[date_string_col]:STRING=[02/01/09]:String
  52. 4:[bool_col]:BOOLEAN=[false]:Boolean
  53. }
  54.  
  55. -----------------------------------------------
  56. 6 - Record (MODIFIED) {
  57. 0:[id]:INT=[0]:Integer
  58. 1:[int_col]:INT=[0]:Integer
  59. 2:[double_col]:DOUBLE=[0.0]:Double
  60. 3:[date_string_col]:STRING=[01/01/09]:String
  61. 4:[bool_col]:BOOLEAN=[true]:Boolean
  62. }
  63.  
  64. -----------------------------------------------
  65. 7 - Record (MODIFIED) {
  66. 0:[id]:INT=[1]:Integer
  67. 1:[int_col]:INT=[1]:Integer
  68. 2:[double_col]:DOUBLE=[10.1]:Double
  69. 3:[date_string_col]:STRING=[01/01/09]:String
  70. 4:[bool_col]:BOOLEAN=[false]:Boolean
  71. }
  72.  
  73. -----------------------------------------------
  74. 8 records
  75. 10:57:54,639 DEBUG [main] datapipeline:661 - job::Success
  76. message input_schema {
  77. optional int32 id;
  78. optional int32 int_col;
  79. optional double double_col;
  80. optional binary date_string_col (STRING);
  81. optional boolean bool_col;
  82. }
  83.  
Mobile Analytics