Write A Parquet File Using Database Table Schema

In this example, you are going to learn how to generate a Parquet schema from a database table.

Java Code listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;
import java.sql.Connection;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.examples.database.DB;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteAParquetFileUsingDatabaseTableSchema {

    private static final String PARQUET_FILE = "example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet";

    public static void main(String[] args) throws Throwable {
        DB db = new DB(); // creates HSQL DB

        Connection connection = db.getConnection();

        db.executeFile(new File("example/data/input/user_information.sql"));

        DataReader reader = new JdbcReader(connection, "SELECT * FROM user").setAutoCloseConnection(false);

        ParquetDataWriter writer = new ParquetDataWriter(new File(PARQUET_FILE));

        // Set Parquet schema using a catalog, schema and table name with default JdbcValueReader.DEFAULT
        writer.setSchema(connection, null, "PUBLIC", "USER");

        // Set Parquet schema using a catalog, schema and table name with JdbcValueReader.STRICT
        //writer.setSchema(connection, JdbcValueReader.STRICT, null, "PUBLIC", "USER");

        System.out.println("===================Generated Parquet Schema========================");
        System.out.println(writer.getSchema());
        System.out.println("===================================================================");

        Job.run(reader, writer);

        System.out.println("=======================Reading Parquet File============================================");
        reader = new ParquetDataReader(new File(PARQUET_FILE));
        Job.run(reader, new StreamWriter(System.out));
    }

}

Code walkthrough

  1. Create DB instance which will create HSQL DB.
  2. Execute database script to create tables and insert records in the database.
  3. Create JdbcReader to read records from the specified query.
  4. Create an instance of ParquetWriterReader to write records to Parquet file.
  5. Generate Parquet schema using the connection, catalog name, schema name & table name.
  6. Job.run(reader, writer) is used to transfer the data from the reader to the write.. This will write the data to the Parquet file.

Console Output

execute file E:\NorthConcept\git\DataPipeline-Examples\example\data\input\user_information.sql
    execute:  CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) )
        - recordsAffected=0
    execute:  CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) )
        - recordsAffected=0
    execute:  INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer')
        - recordsAffected=2
    execute:  INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2)
        - recordsAffected=3
16:59:05,679 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
===================Generated Parquet Schema========================
message schema {
  required int32 USER_ID (INTEGER(32,true));
  required binary USER_NAME (STRING);
  optional binary FIRST_NAME (STRING);
  optional binary LAST_NAME (STRING);
  optional binary EMAIL (STRING);
  required binary PASSWORD (STRING);
  optional int32 USER_ROLE_ID (INTEGER(32,true));
}

===================================================================
16:59:08,164 DEBUG [main] datapipeline:615 - Job[1,job-1,Sun Jan 29 16:59:08 IST 2023]::Start
16:59:14,223 DEBUG [main] datapipeline:661 - job::Success
=======================Reading Parquet File============================================
16:59:14,649 DEBUG [main] datapipeline:615 - Job[2,job-2,Sun Jan 29 16:59:14 IST 2023]::Start
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[USER_ID]:INT=[1]:Integer
    1:[USER_NAME]:STRING=[john_doe]:String
    2:[FIRST_NAME]:STRING=[John]:String
    3:[LAST_NAME]:STRING=[Doe]:String
    4:[EMAIL]:STRING=[john.doe@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[1]:Integer
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[USER_ID]:INT=[2]:Integer
    1:[USER_NAME]:STRING=[jane_doe]:String
    2:[FIRST_NAME]:STRING=[jane]:String
    3:[LAST_NAME]:STRING=[Doe]:String
    4:[EMAIL]:STRING=[jane.doe@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[1]:Integer
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[USER_ID]:INT=[3]:Integer
    1:[USER_NAME]:STRING=[view_user]:String
    2:[FIRST_NAME]:STRING=[viewer]:String
    3:[LAST_NAME]:STRING=[]:String
    4:[EMAIL]:STRING=[view_user@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[2]:Integer
}

-----------------------------------------------
3 records
16:59:14,790 DEBUG [main] datapipeline:661 - job::Success
Shutting down...
shutdown complete
Mobile Analytics