Write a Parquet file using Schema

Updated: Jul 1, 2023

This example writes Parquet files based on an explicit schema definition. It reads data from a database connection and then streams it to the output Parquet file.

This example is valuable for users who need to extract and transform data from a database into Parquet files for optimized storage and analysis. Real-life use cases for this library include ETL (Extract, Transform, Load) processes, data warehousing, and data migration tasks. By leveraging the Parquet format, users can benefit from reduced storage space and improved query performance when working with large datasets. Additionally, the ability to define a schema for the Parquet file ensures data integrity and compatibility across different systems and tools that support the Parquet format.

 

Input File

CREATE TABLE user_role (
  user_role_id INT NOT NULL,
  user_role VARCHAR(32) NOT NULL,
  PRIMARY KEY (user_role_id)
 );
  
CREATE TABLE user (
  user_id INT NOT NULL,
  user_name VARCHAR(16) NOT NULL,
  first_name VARCHAR(255) NULL,
  last_name VARCHAR(255) NULL,
  email VARCHAR(255) NULL,
  password VARCHAR(255) NOT NULL,
  user_role_id INT NULL,
  PRIMARY KEY (user_id),
  FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id)
  );


INSERT INTO user_role
(user_role_id, user_role)
VALUES
(1, 'Admin'), (2, 'Viewer');

INSERT INTO user 
(user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES
(1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1),
(2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1),
(3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2);

 

Java Code

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;
import java.sql.SQLException;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteAParquetFileUsingSchema {

    public static void main(String[] args) throws SQLException {
        String schema = "message schema {" +
                "  optional int32 USER_ID (INTEGER(32,true));" +
                "  optional binary USER_NAME (STRING);" +
                "  optional binary FIRST_NAME (STRING);" +
                "  optional binary LAST_NAME (STRING);" +
                "  optional binary EMAIL (STRING);" +
                "  optional binary PASSWORD (STRING);" +
                "  optional int32 USER_ROLE_ID (INTEGER(32,true));" +
                "}";

        ParquetDataWriter writer = new ParquetDataWriter(new File("example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet"));
        // Set schema using a record
        writer.setSchema(schema);

        System.out.println("===================Generated Parquet Schema========================");
        System.out.println(writer.getSchema());
        System.out.println("===================================================================");

        JdbcConnectionFactory jdbcConnectionFactory = JdbcConnectionFactory.wrap("org.h2.Driver", "jdbc:h2:mem:jdbcTableSort;MODE=MySQL", "sa", "");

        JdbcFacade jdbcFacade = new JdbcFacade(jdbcConnectionFactory);
        jdbcFacade.executeFile(new File("example/data/input/user_information.sql"));

        // Reading records from table user and writing to parquet file.
        DataReader reader = new JdbcReader(jdbcConnectionFactory.createConnection(), "select * from user").setAutoCloseConnection(true);
        Job.run(reader, writer);

        System.out.println("=======================Reading Parquet File============================================");
        reader = new ParquetDataReader(new File("example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet"));
        Job.run(reader, new StreamWriter(System.out));
    }
}

 

Code Walkthrough

  1. The schema definition is declared as a string variable.
  2. ParquetDataWriter is created corresponding to the output Parquet file.
  3. The schema of writer is set using the string declared in the previous step and printed in the console.
  4. MySql database connection parameters (driver class, URL, username, password) are declared in the JdbcConnectionFactory object.
  5. JdbcFacade instance accepts the JdbcConnectionFactory object and executes commands specified in the input SQL file user_information.sql.
  6. jdbcConnectionFactory.createConnection() method is used to open a connection to the database. JdbcReader accepts that Connection object and query string to get all records from the "user" table.
  7. Data from the JDBCReader is transferred to the output file of ParquetDataWriter via Job.run() method. See how to compile and run data pipeline jobs
  8. Records from the new file are read using ParquetDataReader and, then transferred to StreamWriter(System.out) to be printed in the console.

 

Console Output

===================Generated Parquet Schema========================
message schema {
  optional int32 USER_ID (INTEGER(32,true));
  optional binary USER_NAME (STRING);
  optional binary FIRST_NAME (STRING);
  optional binary LAST_NAME (STRING);
  optional binary EMAIL (STRING);
  optional binary PASSWORD (STRING);
  optional int32 USER_ROLE_ID (INTEGER(32,true));
}

===================================================================
=======================Reading Parquet File============================================
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[USER_ID]:INT=[1]:Integer
1:[USER_NAME]:STRING=[john_doe]:String
2:[FIRST_NAME]:STRING=[John]:String
3:[LAST_NAME]:STRING=[Doe]:String
4:[EMAIL]:STRING=[john.doe@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[1]:Integer
} ----------------------------------------------- 1 - Record (MODIFIED) { 0:[USER_ID]:INT=[2]:Integer
1:[USER_NAME]:STRING=[jane_doe]:String
2:[FIRST_NAME]:STRING=[jane]:String
3:[LAST_NAME]:STRING=[Doe]:String
4:[EMAIL]:STRING=[jane.doe@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[1]:Integer
} ----------------------------------------------- 2 - Record (MODIFIED) { 0:[USER_ID]:INT=[3]:Integer
1:[USER_NAME]:STRING=[view_user]:String
2:[FIRST_NAME]:STRING=[viewer]:String
3:[LAST_NAME]:STRING=[]:String
4:[EMAIL]:STRING=[view_user@example.com]:String
5:[PASSWORD]:STRING=[**********]:String
6:[USER_ROLE_ID]:INT=[2]:Integer
} ----------------------------------------------- 3 records

 

Mobile Analytics