Write a Parquet file using the schema from a database query

Updated: Sep 19, 2023

This example shows how to create a Parquet file where its schema is derived from the results returned from a database query. This can be used to export and transmit data in Parquet files that closely matches the database structure for financial, e-commerce, and healthcare data.

 

Input SQL File

CREATE TABLE user_role (
  user_role_id INT NOT NULL,
  user_role VARCHAR(32) NOT NULL,
  PRIMARY KEY (user_role_id)
 );
  
CREATE TABLE user (
  user_id INT NOT NULL,
  user_name VARCHAR(16) NOT NULL,
  first_name VARCHAR(255) NULL,
  last_name VARCHAR(255) NULL,
  email VARCHAR(255) NULL,
  password VARCHAR(255) NOT NULL,
  user_role_id INT NULL,
  PRIMARY KEY (user_id),
  FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id)
  );


INSERT INTO user_role
(user_role_id, user_role)
VALUES
(1, 'Admin'), (2, 'Viewer');

INSERT INTO user 
(user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES
(1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1),
(2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1),
(3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2);

 

Java Code Listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;
import java.sql.Connection;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.examples.database.DB;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteAParquetFileUsingSchemaFromDatabaseQuery {

    private static final String PARQUET_FILE = "example/data/output/WriteAParquetFileUsingSchemaFromDatabaseQuery.parquet";

    public static void main(String[] args) throws Throwable {
        DB db = new DB(); // creates HSQL DB

        Connection connection = db.getConnection();

        db.executeFile(new File("example/data/input/user_information.sql"));

        DataReader reader = new JdbcReader(connection, "SELECT * FROM user").setAutoCloseConnection(true);

        ParquetDataWriter writer = new ParquetDataWriter(new File(PARQUET_FILE));

        // Set Parquet schema using a query
        // The query returns zero records as 1<0 condition is always false.
        writer.setSchema(connection, "SELECT * FROM user where 1<0");

        // Set Parquet schema using a query with parameters
        // Optimized query returns only one record for give user_id.
        //writer.setSchema(connection, "SELECT * FROM user WHERE user_id=?", 1);

        // Set Parquet schema using a query with parameters & JdbcValueReader (default is OPINIONATED)
        // Optimized query retruns only limited records.
        // writer.setSchema(connection, JdbcValueReader.STRICT, "SELECT * FROM user WHERE user_role_id=?", 1);

        System.out.println("===================Generated Parquet Schema========================");
        System.out.println(writer.getSchema());
        System.out.println("===================================================================");

        Job.run(reader, writer);

        System.out.println("=======================Reading Parquet File============================================");
        reader = new ParquetDataReader(new File(PARQUET_FILE));
        Job.run(reader, new StreamWriter(System.out));
    }

}

 

Code Walkthrough

  1. The DB instance is created which will initiate HSQL DB.
  2. Connection instance to the database is created and called inside JdbcReader.
  3. The input file user_information.sql is executed.
  4. JdbcReader is created to obtain records from the database table.
  5. ParquetDataWriter is created to write records to the specified Parquet file WriteAParquetFileUsingSchemaFromDatabaseQuery.parquet.
  6. The schema for ParquetDataWriter is set using an SQL query with parameters and JdbcValueReader (default is OPINIONATED).  The optimized query returns only limited records.
  7. The Parquet schema is printed on the console.
  8. Job.run(reader, writer) is used to transfer the data from JdbcReader to ParquetDataWriter.
  9. ParquetDataReader is created to read data from WriteAParquetFileFromJson.parquet.
  10. An array of objects is then printed on the console via Job.run() and StreamWriter class which takes System.out as a parameter to its constructor. See how to compile and run data pipeline jobs. 

 

Console Output

execute file /DataPipeline-Examples/example/data/input/user_information.sql
    execute:  CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) )
        - recordsAffected=0
    execute:  CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) )
        - recordsAffected=0
    execute:  INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer')
        - recordsAffected=2
    execute:  INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2)
        - recordsAffected=3
20:59:48,976 DEBUG [main] datapipeline:37 - DataPipeline v8.2.0 by North Concepts Inc.
===================Generated Parquet Schema========================
message schema {
  required int32 USER_ID (INTEGER(32,true));
  required binary USER_NAME (STRING);
  optional binary FIRST_NAME (STRING);
  optional binary LAST_NAME (STRING);
  optional binary EMAIL (STRING);
  required binary PASSWORD (STRING);
  optional int32 USER_ROLE_ID (INTEGER(32,true));
}

===================================================================
=======================Reading Parquet File============================================
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[USER_ID]:INT=[1]:Integer
    1:[USER_NAME]:STRING=[john_doe]:String
    2:[FIRST_NAME]:STRING=[John]:String
    3:[LAST_NAME]:STRING=[Doe]:String
    4:[EMAIL]:STRING=[john.doe@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[1]:Integer
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[USER_ID]:INT=[2]:Integer
    1:[USER_NAME]:STRING=[jane_doe]:String
    2:[FIRST_NAME]:STRING=[jane]:String
    3:[LAST_NAME]:STRING=[Doe]:String
    4:[EMAIL]:STRING=[jane.doe@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[1]:Integer
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[USER_ID]:INT=[3]:Integer
    1:[USER_NAME]:STRING=[view_user]:String
    2:[FIRST_NAME]:STRING=[viewer]:String
    3:[LAST_NAME]:STRING=[]:String
    4:[EMAIL]:STRING=[view_user@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[2]:Integer
}

-----------------------------------------------
3 records
Shutting down...
shutdown complete
Mobile Analytics