Write Parquet to Amazon S3

Updated: Jun 28, 2023

This example reads records from an input file and saves/writes the data in Parquet file format within Amazon S3's file system. It offers an efficient and scalable way to transform and store data for various data processing and analysis tasks. Real-life use cases for this library include data ingestion pipelines, ETL (Extract, Transform, Load) processes, and data lake architectures, where data can be seamlessly read, transformed, and stored in Parquet format on Amazon S3, enabling efficient querying and analysis at scale.

 

Input CSV

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000,17-01-1998,A
312,Butler,Gerard,90,1000,06-08-2003,B
101,Hewitt,Jennifer Love,0,17000,25-05-1985,B
312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A
317,Murray,Bill,789.65,5000,05-02-2007,C
317,Murray,Bill,1,5000,05-02-2007,D

 

Java Code Listing

package com.northconcepts.datapipeline.examples.amazons3;

import java.io.File;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopOutputFile;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteParquetToAmazonS3 {

    private static final String ACCESS_KEY = "YOUR ACCESS KEY";
    private static final String SECRET_KEY = "YOUR SECRET KEY";

    public static void main(String[] args) throws Throwable {
        Path path = new Path("s3a://bucketName/output.parquet");

        Configuration configuration = new Configuration();
        configuration.set("fs.s3a.access.key", ACCESS_KEY);
        configuration.set("fs.s3a.secret.key", SECRET_KEY);
        configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");

        HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, configuration);

        DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv"))
                    .setFieldNamesInFirstRow(true);

        ParquetDataWriter writer = new ParquetDataWriter(outputFile);

        Job.run(reader, writer);
    }
}

 

Code Walkthrough

  1. First, configuration parameters are specified using ACCESS_KEY and SECRET_KEY fields.
  2. Path specifies the target Parquet file (i.e s3a://bucketName/output.parquet) in the Amazon s3 bucket.
  3. Configuration instance holds all config parameters.
  4. configuration.set() binds the name specified in the first parameter to the value specified in the second parameter.
  5. HadoopOutputFile is created to write to a Parquet file on the path specified in the first parameter using the configuration specified in the second parameter.
  6. CSVReader is created corresponding to an input file example/data/input/credit-balance.csv. It is used to obtain records from the specified CSV file.
  7. setFieldNamesInFirstRow(true) is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  8. ParquetDataWriter is created to write records to the specified HadoopOutputFile object.
  9. Data is transferred from the CSVReader to ParquetDataWriter via the Job.run() method.

 

Output

Obtained records from the input CSV file will be written to output.parquet file in the Amazon s3 file system.

Mobile Analytics