Write A Parquet File Using Database Table Schema
Updated: Jan 29, 2023
In this example, you are going to learn how to generate a Parquet schema from a database table.
Java Code listing
package com.northconcepts.datapipeline.examples.parquet;
import java.io.File;
import java.sql.Connection;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.examples.database.DB;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;
public class WriteAParquetFileUsingDatabaseTableSchema {
private static final String PARQUET_FILE = "example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet";
public static void main(String[] args) throws Throwable {
DB db = new DB(); // creates HSQL DB
Connection connection = db.getConnection();
db.executeFile(new File("example/data/input/user_information.sql"));
DataReader reader = new JdbcReader(connection, "SELECT * FROM user").setAutoCloseConnection(false);
ParquetDataWriter writer = new ParquetDataWriter(new File(PARQUET_FILE));
// Set Parquet schema using a catalog, schema and table name with default JdbcValueReader.DEFAULT
writer.setSchema(connection, null, "PUBLIC", "USER");
// Set Parquet schema using a catalog, schema and table name with JdbcValueReader.STRICT
//writer.setSchema(connection, JdbcValueReader.STRICT, null, "PUBLIC", "USER");
System.out.println("===================Generated Parquet Schema========================");
System.out.println(writer.getSchema());
System.out.println("===================================================================");
Job.run(reader, writer);
System.out.println("=======================Reading Parquet File============================================");
reader = new ParquetDataReader(new File(PARQUET_FILE));
Job.run(reader, new StreamWriter(System.out));
}
}
Code walkthrough
- Create
DBinstance which will create HSQL DB. - Execute database script to create tables and insert records in the database.
- Create JdbcReader to read records from the specified query.
- Create an instance of ParquetWriterReader to write records to Parquet file.
- Generate Parquet schema using the connection, catalog name, schema name & table name.
- Job.run(reader, writer) is used to transfer the data from the
readerto the write.. This will write the data to the Parquet file.
Console Output
execute file E:\NorthConcept\git\DataPipeline-Examples\example\data\input\user_information.sql
execute: CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) )
- recordsAffected=0
execute: CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) )
- recordsAffected=0
execute: INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer')
- recordsAffected=2
execute: INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2)
- recordsAffected=3
16:59:05,679 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
===================Generated Parquet Schema========================
message schema {
required int32 USER_ID (INTEGER(32,true));
required binary USER_NAME (STRING);
optional binary FIRST_NAME (STRING);
optional binary LAST_NAME (STRING);
optional binary EMAIL (STRING);
required binary PASSWORD (STRING);
optional int32 USER_ROLE_ID (INTEGER(32,true));
}
===================================================================
16:59:08,164 DEBUG [main] datapipeline:615 - Job[1,job-1,Sun Jan 29 16:59:08 IST 2023]::Start
16:59:14,223 DEBUG [main] datapipeline:661 - job::Success
=======================Reading Parquet File============================================
16:59:14,649 DEBUG [main] datapipeline:615 - Job[2,job-2,Sun Jan 29 16:59:14 IST 2023]::Start
-----------------------------------------------
0 - Record (MODIFIED) {
0:[USER_ID]:INT=[1]:Integer
1:[USER_NAME]:STRING=[john_doe]:String
2:[FIRST_NAME]:STRING=[John]:String
3:[LAST_NAME]:STRING=[Doe]:String
4:[EMAIL]:STRING=[john.doe@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[1]:Integer
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[USER_ID]:INT=[2]:Integer
1:[USER_NAME]:STRING=[jane_doe]:String
2:[FIRST_NAME]:STRING=[jane]:String
3:[LAST_NAME]:STRING=[Doe]:String
4:[EMAIL]:STRING=[jane.doe@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[1]:Integer
}
-----------------------------------------------
2 - Record (MODIFIED) {
0:[USER_ID]:INT=[3]:Integer
1:[USER_NAME]:STRING=[view_user]:String
2:[FIRST_NAME]:STRING=[viewer]:String
3:[LAST_NAME]:STRING=[]:String
4:[EMAIL]:STRING=[view_user@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[2]:Integer
}
-----------------------------------------------
3 records
16:59:14,790 DEBUG [main] datapipeline:661 - job::Success
Shutting down...
shutdown complete
