Write A Parquet File Containing Spaces And Symbols In Column Names
Updated: Jan 25, 2023
Parquet does not support some symbols and whitespace characters in column names. In this example, you are going to learn how to remove such characters to only keep lowercase letters, uppercase letters, numbers, hyphens (-), and underscores (_) in column names while writing Parquet files.
There is already an open JIRA ticket PARQUET-677 to support column names within quoted identifier.
Java Code listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DebugReader; import com.northconcepts.datapipeline.core.Field; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; import com.northconcepts.datapipeline.transform.Transformer; import com.northconcepts.datapipeline.transform.TransformingReader; public class WriteAParquetFileContainingSpacesAndSymbolsInColumnNames { private static final File PARQUET_FILE = new File("example/data/output/WriteAParquetFileWithSpacesInColumns.parquet"); public static void main(String[] args) { System.out.println("============================================================"); System.out.println("Write records to a parquet file"); System.out.println("============================================================"); DataReader reader = new CSVReader(new File("example/data/input/bank_account_spaces_in_column_names.csv")) .setFieldNamesInFirstRow(true); reader = new DebugReader(reader); reader = new TransformingReader(reader) .add(new Transformer() { @Override public boolean transform(Record record) throws Throwable { for (Field field : record) { // Remove all unsupported symbols using regular expression in field name field.setName(field.getName().replaceAll("[^a-zA-Z0-9_\\-]+", "")); } return true; } }); ParquetDataWriter writer = new ParquetDataWriter(PARQUET_FILE); Job.run(reader, writer); System.out.println("============================================================"); System.out.println("Prepared Schema"); System.out.println("============================================================"); System.out.println(writer.getSchema()); System.out.println("============================================================"); System.out.println("Read the parquet file"); System.out.println("============================================================"); Job.run(new ParquetDataReader(PARQUET_FILE), new StreamWriter(System.out)); } }
Code walkthrough
- Create an instance of CSVReader to read records from a CSV file.
- To print/debug records read by CSVReader, create an instance of DebugReader
- Create an instance of TransformingReader and preserve only letters, numbers, hyphen & underscore in field name.
- Create an instance of ParquetWriterReader to write records to Parquet file.
- Job.run(reader, writer) is used to transfer the data from the
reader
to the write.. This will write the data to the Parquet file.
Console Output
============================================================ Write records to a parquet file ============================================================ 15:14:15,348 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. 15:14:15,996 DEBUG [main] datapipeline:615 - Job[1,job-1,Wed Jan 25 15:14:15 IST 2023]::Start 15:14:22,617 DEBUG [job-2] datapipeline:615 - Job[2,job-2,Wed Jan 25 15:14:22 IST 2023]::Start ========================================================== debug - 0 ------------------------------------------ Record { 0:[Id2]:STRING=[1]:String 1:[Account No]:STRING=[101]:String 2:[Last Name]:STRING=[Reeves]:String 3:[(First Name)]:STRING=[Keanu]:String 4:[[Balance]]:STRING=[12]:String 5:[{Credit Limit}]:STRING=[1000.23]:String 6:[Account, Created]:STRING=[17-01-1998]:String 7:[Rating]:STRING=[A]:String } ========================================================== debug - 1 ------------------------------------------ Record { 0:[Id2]:STRING=[2]:String 1:[Account No]:STRING=[102]:String 2:[Last Name]:STRING=[Butler]:String 3:[(First Name)]:STRING=[Gerard]:String 4:[[Balance]]:STRING=[123456]:String 5:[{Credit Limit}]:STRING=[1234567890.98]:String 6:[Account, Created]:STRING=[06-08-2003]:String 7:[Rating]:STRING=[B]:String } 15:14:22,622 DEBUG [job-2] datapipeline:661 - job::Success 15:14:25,563 DEBUG [main] datapipeline:661 - job::Success ============================================================ Prepared Schema ============================================================ message schema { optional binary Id2 (STRING); optional binary AccountNo (STRING); optional binary LastName (STRING); optional binary FirstName (STRING); optional binary Balance (STRING); optional binary CreditLimit (STRING); optional binary AccountCreated (STRING); optional binary Rating (STRING); } ============================================================ Read the parquet file ============================================================ 15:14:26,023 DEBUG [main] datapipeline:615 - Job[3,job-3,Wed Jan 25 15:14:26 IST 2023]::Start ----------------------------------------------- 0 - Record (MODIFIED) { 0:[Id2]:STRING=[1]:String 1:[AccountNo]:STRING=[101]:String 2:[LastName]:STRING=[Reeves]:String 3:[FirstName]:STRING=[Keanu]:String 4:[Balance]:STRING=[12]:String 5:[CreditLimit]:STRING=[1000.23]:String 6:[AccountCreated]:STRING=[17-01-1998]:String 7:[Rating]:STRING=[A]:String } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[Id2]:STRING=[2]:String 1:[AccountNo]:STRING=[102]:String 2:[LastName]:STRING=[Butler]:String 3:[FirstName]:STRING=[Gerard]:String 4:[Balance]:STRING=[123456]:String 5:[CreditLimit]:STRING=[1234567890.98]:String 6:[AccountCreated]:STRING=[06-08-2003]:String 7:[Rating]:STRING=[B]:String } ----------------------------------------------- 2 records 15:14:26,173 DEBUG [main] datapipeline:661 - job::Success