Write a Parquet file using the schema from a database query
Updated: Sep 19, 2023
This example shows how to create a Parquet file where its schema is derived from the results returned from a database query. This can be used to export and transmit data in Parquet files that closely matches the database structure for financial, e-commerce, and healthcare data.
Input SQL File
CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) ); CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) ); INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer'); INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2);
Java Code Listing
package com.northconcepts.datapipeline.examples.parquet; import java.io.File; import java.sql.Connection; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.examples.database.DB; import com.northconcepts.datapipeline.jdbc.JdbcReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.parquet.ParquetDataReader; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class WriteAParquetFileUsingSchemaFromDatabaseQuery { private static final String PARQUET_FILE = "example/data/output/WriteAParquetFileUsingSchemaFromDatabaseQuery.parquet"; public static void main(String[] args) throws Throwable { DB db = new DB(); // creates HSQL DB Connection connection = db.getConnection(); db.executeFile(new File("example/data/input/user_information.sql")); DataReader reader = new JdbcReader(connection, "SELECT * FROM user").setAutoCloseConnection(true); ParquetDataWriter writer = new ParquetDataWriter(new File(PARQUET_FILE)); // Set Parquet schema using a query // The query returns zero records as 1<0 condition is always false. writer.setSchema(connection, "SELECT * FROM user where 1<0"); // Set Parquet schema using a query with parameters // Optimized query returns only one record for give user_id. //writer.setSchema(connection, "SELECT * FROM user WHERE user_id=?", 1); // Set Parquet schema using a query with parameters & JdbcValueReader (default is OPINIONATED) // Optimized query retruns only limited records. // writer.setSchema(connection, JdbcValueReader.STRICT, "SELECT * FROM user WHERE user_role_id=?", 1); System.out.println("===================Generated Parquet Schema========================"); System.out.println(writer.getSchema()); System.out.println("==================================================================="); Job.run(reader, writer); System.out.println("=======================Reading Parquet File============================================"); reader = new ParquetDataReader(new File(PARQUET_FILE)); Job.run(reader, new StreamWriter(System.out)); } }
Code Walkthrough
- The DB instance is created which will initiate HSQL DB.
- Connection instance to the database is created and called inside JdbcReader.
- The input file
user_information.sql
is executed. - JdbcReader is created to obtain records from the database table.
- ParquetDataWriter is created to write records to the specified Parquet file
WriteAParquetFileUsingSchemaFromDatabaseQuery.parquet
. - The schema for ParquetDataWriter is set using an SQL query with parameters and JdbcValueReader (default is OPINIONATED). The optimized query returns only limited records.
- The Parquet schema is printed on the console.
- Job.run(reader, writer) is used to transfer the data from JdbcReader to ParquetDataWriter.
- ParquetDataReader is created to read data from
WriteAParquetFileFromJson.parquet
. - An array of objects is then printed on the console via
Job.run()
and StreamWriter class which takesSystem.out
as a parameter to its constructor. See how to compile and run data pipeline jobs.
Console Output
execute file /DataPipeline-Examples/example/data/input/user_information.sql execute: CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) ) - recordsAffected=0 execute: CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) ) - recordsAffected=0 execute: INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer') - recordsAffected=2 execute: INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2) - recordsAffected=3 20:59:48,976 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0 by North Concepts Inc. ===================Generated Parquet Schema======================== message schema { required int32 USER_ID (INTEGER(32,true)); required binary USER_NAME (STRING); optional binary FIRST_NAME (STRING); optional binary LAST_NAME (STRING); optional binary EMAIL (STRING); required binary PASSWORD (STRING); optional int32 USER_ROLE_ID (INTEGER(32,true)); } =================================================================== =======================Reading Parquet File============================================ ----------------------------------------------- 0 - Record (MODIFIED) { 0:[USER_ID]:INT=[1]:Integer 1:[USER_NAME]:STRING=[john_doe]:String 2:[FIRST_NAME]:STRING=[John]:String 3:[LAST_NAME]:STRING=[Doe]:String 4:[EMAIL]:STRING=[john.doe@example.com]:String 5:[PASSWORD]:STRING=[**********]:String 6:[USER_ROLE_ID]:INT=[1]:Integer } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[USER_ID]:INT=[2]:Integer 1:[USER_NAME]:STRING=[jane_doe]:String 2:[FIRST_NAME]:STRING=[jane]:String 3:[LAST_NAME]:STRING=[Doe]:String 4:[EMAIL]:STRING=[jane.doe@example.com]:String 5:[PASSWORD]:STRING=[**********]:String 6:[USER_ROLE_ID]:INT=[1]:Integer } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[USER_ID]:INT=[3]:Integer 1:[USER_NAME]:STRING=[view_user]:String 2:[FIRST_NAME]:STRING=[viewer]:String 3:[LAST_NAME]:STRING=[]:String 4:[EMAIL]:STRING=[view_user@example.com]:String 5:[PASSWORD]:STRING=[**********]:String 6:[USER_ROLE_ID]:INT=[2]:Integer } ----------------------------------------------- 3 records Shutting down... shutdown complete