Write A Parquet File Using Database Table Schema
Updated: Jan 29, 2023
In this example, you are going to learn how to generate a Parquet schema from a database table.
Java Code listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import java.sql.Connection; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.examples.database.DB; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class WriteAParquetFileUsingDatabaseTableSchema { private static final String PARQUET_FILE = "example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet"; public static void main(String[] args) throws Throwable { DB db = new DB(); // creates HSQL DB Connection connection = db.getConnection(); db.executeFile(new File("example/data/input/user_information.sql")); DataReader reader = new JdbcReader(connection, "SELECT * FROM user").setAutoCloseConnection(false); ParquetDataWriter writer = new ParquetDataWriter(new File(PARQUET_FILE)); // Set Parquet schema using a catalog, schema and table name with default JdbcValueReader.DEFAULT writer.setSchema(connection, null, "PUBLIC", "USER"); // Set Parquet schema using a catalog, schema and table name with JdbcValueReader.STRICT //writer.setSchema(connection, JdbcValueReader.STRICT, null, "PUBLIC", "USER"); System.out.println("===================Generated Parquet Schema========================"); System.out.println(writer.getSchema()); System.out.println("==================================================================="); Job.run(reader, writer); System.out.println("=======================Reading Parquet File============================================"); reader = new ParquetDataReader(new File(PARQUET_FILE)); Job.run(reader, new StreamWriter(System.out)); } }
Code walkthrough
- Create
DB
instance which will create HSQL DB. - Execute database script to create tables and insert records in the database.
- Create JdbcReader to read records from the specified query.
- Create an instance of ParquetWriterReader to write records to Parquet file.
- Generate Parquet schema using the connection, catalog name, schema name & table name.
- Job.run(reader, writer) is used to transfer the data from the
reader
to the write.. This will write the data to the Parquet file.
Console Output
execute file E:\NorthConcept\git\DataPipeline-Examples\example\data\input\user_information.sql execute: CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) ) - recordsAffected=0 execute: CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) ) - recordsAffected=0 execute: INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer') - recordsAffected=2 execute: INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2) - recordsAffected=3 16:59:05,679 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. ===================Generated Parquet Schema======================== message schema { required int32 USER_ID (INTEGER(32,true)); required binary USER_NAME (STRING); optional binary FIRST_NAME (STRING); optional binary LAST_NAME (STRING); optional binary EMAIL (STRING); required binary PASSWORD (STRING); optional int32 USER_ROLE_ID (INTEGER(32,true)); } =================================================================== 16:59:08,164 DEBUG [main] datapipeline:615 - Job[1,job-1,Sun Jan 29 16:59:08 IST 2023]::Start 16:59:14,223 DEBUG [main] datapipeline:661 - job::Success =======================Reading Parquet File============================================ 16:59:14,649 DEBUG [main] datapipeline:615 - Job[2,job-2,Sun Jan 29 16:59:14 IST 2023]::Start ----------------------------------------------- 0 - Record (MODIFIED) { 0:[USER_ID]:INT=[1]:Integer 1:[USER_NAME]:STRING=[john_doe]:String 2:[FIRST_NAME]:STRING=[John]:String 3:[LAST_NAME]:STRING=[Doe]:String 4:[EMAIL]:STRING=[john.doe@example.com]:String 5:[PASSWORD]:STRING=[**********]:String 6:[USER_ROLE_ID]:INT=[1]:Integer } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[USER_ID]:INT=[2]:Integer 1:[USER_NAME]:STRING=[jane_doe]:String 2:[FIRST_NAME]:STRING=[jane]:String 3:[LAST_NAME]:STRING=[Doe]:String 4:[EMAIL]:STRING=[jane.doe@example.com]:String 5:[PASSWORD]:STRING=[**********]:String 6:[USER_ROLE_ID]:INT=[1]:Integer } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[USER_ID]:INT=[3]:Integer 1:[USER_NAME]:STRING=[view_user]:String 2:[FIRST_NAME]:STRING=[viewer]:String 3:[LAST_NAME]:STRING=[]:String 4:[EMAIL]:STRING=[view_user@example.com]:String 5:[PASSWORD]:STRING=[**********]:String 6:[USER_ROLE_ID]:INT=[2]:Integer } ----------------------------------------------- 3 records 16:59:14,790 DEBUG [main] datapipeline:661 - job::Success Shutting down... shutdown complete