Write a Parquet file using the schema from a database query
Updated: Sep 19, 2023
This example shows how to create a Parquet file where its schema is derived from the results returned from a database query. This can be used to export and transmit data in Parquet files that closely matches the database structure for financial, e-commerce, and healthcare data.
Input SQL File
CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) ); CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) ); INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer'); INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2);
Java Code Listing
package com.northconcepts.datapipeline.examples.parquet;
import java.io.File;
import java.sql.Connection;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.examples.database.DB;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;
public class WriteAParquetFileUsingSchemaFromDatabaseQuery {
private static final String PARQUET_FILE = "example/data/output/WriteAParquetFileUsingSchemaFromDatabaseQuery.parquet";
public static void main(String[] args) throws Throwable {
DB db = new DB(); // creates HSQL DB
Connection connection = db.getConnection();
db.executeFile(new File("example/data/input/user_information.sql"));
DataReader reader = new JdbcReader(connection, "SELECT * FROM user").setAutoCloseConnection(true);
ParquetDataWriter writer = new ParquetDataWriter(new File(PARQUET_FILE));
// Set Parquet schema using a query
// The query returns zero records as 1<0 condition is always false.
writer.setSchema(connection, "SELECT * FROM user where 1<0");
// Set Parquet schema using a query with parameters
// Optimized query returns only one record for give user_id.
//writer.setSchema(connection, "SELECT * FROM user WHERE user_id=?", 1);
// Set Parquet schema using a query with parameters & JdbcValueReader (default is OPINIONATED)
// Optimized query retruns only limited records.
// writer.setSchema(connection, JdbcValueReader.STRICT, "SELECT * FROM user WHERE user_role_id=?", 1);
System.out.println("===================Generated Parquet Schema========================");
System.out.println(writer.getSchema());
System.out.println("===================================================================");
Job.run(reader, writer);
System.out.println("=======================Reading Parquet File============================================");
reader = new ParquetDataReader(new File(PARQUET_FILE));
Job.run(reader, new StreamWriter(System.out));
}
}
Code Walkthrough
- The DB instance is created which will initiate HSQL DB.
- Connection instance to the database is created and called inside JdbcReader.
- The input file
user_information.sqlis executed. - JdbcReader is created to obtain records from the database table.
- ParquetDataWriter is created to write records to the specified Parquet file
WriteAParquetFileUsingSchemaFromDatabaseQuery.parquet. - The schema for ParquetDataWriter is set using an SQL query with parameters and JdbcValueReader (default is OPINIONATED). The optimized query returns only limited records.
- The Parquet schema is printed on the console.
- Job.run(reader, writer) is used to transfer the data from JdbcReader to ParquetDataWriter.
- ParquetDataReader is created to read data from
WriteAParquetFileFromJson.parquet. - An array of objects is then printed on the console via
Job.run()and StreamWriter class which takesSystem.outas a parameter to its constructor. See how to compile and run data pipeline jobs.
Console Output
execute file /DataPipeline-Examples/example/data/input/user_information.sql
execute: CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) )
- recordsAffected=0
execute: CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) )
- recordsAffected=0
execute: INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer')
- recordsAffected=2
execute: INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2)
- recordsAffected=3
20:59:48,976 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0 by North Concepts Inc.
===================Generated Parquet Schema========================
message schema {
required int32 USER_ID (INTEGER(32,true));
required binary USER_NAME (STRING);
optional binary FIRST_NAME (STRING);
optional binary LAST_NAME (STRING);
optional binary EMAIL (STRING);
required binary PASSWORD (STRING);
optional int32 USER_ROLE_ID (INTEGER(32,true));
}
===================================================================
=======================Reading Parquet File============================================
-----------------------------------------------
0 - Record (MODIFIED) {
0:[USER_ID]:INT=[1]:Integer
1:[USER_NAME]:STRING=[john_doe]:String
2:[FIRST_NAME]:STRING=[John]:String
3:[LAST_NAME]:STRING=[Doe]:String
4:[EMAIL]:STRING=[john.doe@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[1]:Integer
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[USER_ID]:INT=[2]:Integer
1:[USER_NAME]:STRING=[jane_doe]:String
2:[FIRST_NAME]:STRING=[jane]:String
3:[LAST_NAME]:STRING=[Doe]:String
4:[EMAIL]:STRING=[jane.doe@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[1]:Integer
}
-----------------------------------------------
2 - Record (MODIFIED) {
0:[USER_ID]:INT=[3]:Integer
1:[USER_NAME]:STRING=[view_user]:String
2:[FIRST_NAME]:STRING=[viewer]:String
3:[LAST_NAME]:STRING=[]:String
4:[EMAIL]:STRING=[view_user@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[2]:Integer
}
-----------------------------------------------
3 records
Shutting down...
shutdown complete
