Write a Parquet file using schema from EntityDef

Updated: Sep 3, 2023

This example shows how to write Parquet files based on a schema from the EntityDef instance. This is valuable for users who need to extract and transform data from a database into Parquet files for optimized storage and analysis. The example can be used in ETL (Extract, Transform, Load) processes, data warehousing, and data migration tasks.

More details on schema and how it works can be found in Schema Validation documentation.

Schema and entities are flexible components that can also be used for validating and mapping records in DataPipeline.

 

Input SQL File

CREATE TABLE user_role (
  user_role_id INT NOT NULL,
  user_role VARCHAR(32) NOT NULL,
  PRIMARY KEY (user_role_id)
 );
  
CREATE TABLE user (
  user_id INT NOT NULL,
  user_name VARCHAR(16) NOT NULL,
  first_name VARCHAR(255) NULL,
  last_name VARCHAR(255) NULL,
  email VARCHAR(255) NULL,
  password VARCHAR(255) NOT NULL,
  user_role_id INT NULL,
  PRIMARY KEY (user_id),
  FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id)
  );


INSERT INTO user_role
(user_role_id, user_role)
VALUES
(1, 'Admin'), (2, 'Viewer');

INSERT INTO user 
(user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES
(1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1),
(2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1),
(3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2);

 

Java Code Listing

package com.northconcepts.datapipeline.examples.parquet;

import java.io.File;
import java.sql.SQLException;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.FieldType;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.foundations.schema.EntityDef;
import com.northconcepts.datapipeline.foundations.schema.NumericFieldDef;
import com.northconcepts.datapipeline.foundations.schema.TextFieldDef;
import com.northconcepts.datapipeline.internal.jdbc.JdbcFacade;
import com.northconcepts.datapipeline.jdbc.JdbcConnectionFactory;
import com.northconcepts.datapipeline.jdbc.JdbcReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataReader;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteAParquetFileUsingSchemaFromEntityDef {

    public static void main(String[] args) throws SQLException {
        EntityDef entity = new EntityDef("user_information")
                .addField(new NumericFieldDef("USER_ID", FieldType.INT))
                .addField(new TextFieldDef("USER_NAME", FieldType.STRING))
                .addField(new TextFieldDef("FIRST_NAME", FieldType.STRING))
                .addField(new TextFieldDef("LAST_NAME", FieldType.STRING))
                .addField(new TextFieldDef("EMAIL", FieldType.STRING))
                .addField(new TextFieldDef("PASSWORD", FieldType.STRING))
                .addField(new NumericFieldDef("USER_ROLE_ID", FieldType.INT))
                ;

        ParquetDataWriter writer = new ParquetDataWriter(new File("example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet"));
        // Set schema using an entity
        writer.setSchema(entity);

        System.out.println("===================Generated Parquet Schema========================");
        System.out.println(writer.getSchema());
        System.out.println("===================================================================");

        JdbcConnectionFactory jdbcConnectionFactory = JdbcConnectionFactory.wrap("org.h2.Driver", "jdbc:h2:mem:jdbcTableSort;MODE=MySQL", "sa", "");

        JdbcFacade jdbcFacade = new JdbcFacade(jdbcConnectionFactory);
        jdbcFacade.executeFile(new File("example/data/input/user_information.sql"));

        // Reading records from table user and writing to parquet file.
        DataReader reader = new JdbcReader(jdbcConnectionFactory.createConnection(), "select * from user").setAutoCloseConnection(true);
        Job.run(reader, writer);

        System.out.println("=======================Reading Parquet File============================================");
        reader = new ParquetDataReader(new File("example/data/output/WriteAParquetFileUsingSchemaFromDatabase.parquet"));
        Job.run(reader, new StreamWriter(System.out));
    }
}

 

Code Walkthrough

  1. EntityDef user_information is created with field names and data types.
  2. ParquetDataWriter is created to write records to the specified Parquet file WriteAParquetFileUsingSchemaFromDatabase.parquet.
  3. The entity is set as the schema for ParquetDataWriter.
  4. The Parquet schema is printed on the console.
  5. The database connection is established using JdbcConnectionFactory and JdbcFacade.
  6. The input file user_information.sql is executed.
  7. JdbcReader is created to obtain records from the database table.
  8. Job.run(reader, writer) is used to transfer the data from JdbcReader to ParquetDataWriter.
  9. ParquetDataReader is created to read data from WriteAParquetFileFromJson.parquet.
  10. An array of objects is then printed on the console via Job.run() and StreamWriter class which takes System.out as a parameter to its constructor. See how to compile and run data pipeline jobs. 

 

Console Output

===================Generated Parquet Schema========================
message schema {
  optional int32 USER_ID (INTEGER(32,true));
  optional binary USER_NAME (STRING);
  optional binary FIRST_NAME (STRING);
  optional binary LAST_NAME (STRING);
  optional binary EMAIL (STRING);
  optional binary PASSWORD (STRING);
  optional int32 USER_ROLE_ID (INTEGER(32,true));
}

===================================================================
execute file /Users/akmaljons/IdeaProjects/DataPipeline-Examples/example/data/input/user_information.sql
    execute:  CREATE TABLE user_role ( user_role_id INT NOT NULL, user_role VARCHAR(32) NOT NULL, PRIMARY KEY (user_role_id) )
        - recordsAffected=0
    execute:  CREATE TABLE user ( user_id INT NOT NULL, user_name VARCHAR(16) NOT NULL, first_name VARCHAR(255) NULL, last_name VARCHAR(255) NULL, email VARCHAR(255) NULL, password VARCHAR(255) NOT NULL, user_role_id INT NULL, PRIMARY KEY (user_id), FOREIGN KEY (user_role_id) REFERENCES user_role (user_role_id) )
        - recordsAffected=0
    execute:  INSERT INTO user_role (user_role_id, user_role) VALUES (1, 'Admin'), (2, 'Viewer')
        - recordsAffected=2
    execute:  INSERT INTO user (user_id, user_name, first_name, last_name, email, password, user_role_id) VALUES (1, 'john_doe', 'John', 'Doe', 'john.doe@example.com', '**********', 1), (2, 'jane_doe', 'jane', 'Doe', 'jane.doe@example.com', '**********', 1), (3, 'view_user', 'viewer', '', 'view_user@example.com', '**********', 2)
        - recordsAffected=3
=======================Reading Parquet File============================================
-----------------------------------------------
0 - Record (MODIFIED) {
    0:[USER_ID]:INT=[1]:Integer
    1:[USER_NAME]:STRING=[john_doe]:String
    2:[FIRST_NAME]:STRING=[John]:String
    3:[LAST_NAME]:STRING=[Doe]:String
    4:[EMAIL]:STRING=[john.doe@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[1]:Integer
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[USER_ID]:INT=[2]:Integer
    1:[USER_NAME]:STRING=[jane_doe]:String
    2:[FIRST_NAME]:STRING=[jane]:String
    3:[LAST_NAME]:STRING=[Doe]:String
    4:[EMAIL]:STRING=[jane.doe@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[1]:Integer
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[USER_ID]:INT=[3]:Integer
    1:[USER_NAME]:STRING=[view_user]:String
    2:[FIRST_NAME]:STRING=[viewer]:String
    3:[LAST_NAME]:STRING=[]:String
    4:[EMAIL]:STRING=[view_user@example.com]:String
    5:[PASSWORD]:STRING=[**********]:String
    6:[USER_ROLE_ID]:INT=[2]:Integer
}

-----------------------------------------------
3 records
Mobile Analytics