Write Parquet to Amazon S3

Updated: Jun 1, 2023

This example shows how to use DataPipeline's ParquetDataWriter to write a Parquet file to Amazon S3's file system.

Input CSV

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000,17-01-1998,A
312,Butler,Gerard,90,1000,06-08-2003,B
101,Hewitt,Jennifer Love,0,17000,25-05-1985,B
312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A
317,Murray,Bill,789.65,5000,05-02-2007,C
317,Murray,Bill,1,5000,05-02-2007,D

Java code

/*
 * Copyright (c) 2006-2022 North Concepts Inc.  All rights reserved.
 * Proprietary and Confidential.  Use is subject to license terms.
 * 
 * https://northconcepts.com/data-pipeline/licensing/
 */
package com.northconcepts.datapipeline.examples.amazons3;

import java.io.File;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopOutputFile;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteParquetToAmazonS3 {

    private static final String ACCESS_KEY = "YOUR ACCESS KEY";
    private static final String SECRET_KEY = "YOUR SECRET KEY";

    public static void main(String[] args) throws Throwable {
        Path path = new Path("s3a://bucketName/output.parquet");

        Configuration configuration = new Configuration();
        configuration.set("fs.s3a.access.key", ACCESS_KEY);
        configuration.set("fs.s3a.secret.key", SECRET_KEY);
        configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");

        HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, configuration);

        DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv"))
                    .setFieldNamesInFirstRow(true);

        ParquetDataWriter writer = new ParquetDataWriter(outputFile);

        Job.run(reader, writer);
    }
}

Code Walkthrough

  1. First, configuration parameters are specified using ACCESS_KEY and SECRET_KEY fields.
  2. Path specifies the target Parquet file (i.e s3a://bucketName/output.parquet) in the Amazon s3 bucket.
  3. Configuration provides access to configuration parameters.
  4. configuration.set() binds the the name specified in the first parameter to the value specified in the second parameter.
  5. HadoopOutputFile is created to write to a Parquet file on the path specified in the first parameter using the configuration specified in the second parameter.
  6. CSVReader is created corresponding to an input file example/data/input/credit-balance.csv. It is used to obtain records from the specified CSV file.
  7. setFieldNamesInFirstRow(true) is invoked to specify that the names specified in the first row should be used as field names (disabled by default). If this method is not invoked, the fields would be named as A1, A2 by default.
  8. ParquetDataWriter is created to write records to the specified HadoopOutputFile object.
  9. Data is transferred from the CSVReader to ParquetDataWriter via the Job.run() method.

Output

Obtained records from the input CSV file will be written to output.parquet file in the Amazon s3 file system.

Mobile Analytics