Write Parquet to Amazon S3 Using a Temporary File

Updated: Jun 29, 2023

This example writes Parquet files to Amazon S3 by using a temporary file stored locally. It reads records from an input file, saves the data in Parquet format to a temporary file on the local storage, and then uploads the temporary file to the Amazon S3 file system.

This example is particularly useful in scenarios where users need to process and transform large volumes of data before persisting it in Parquet format on Amazon S3. Leveraging a local temporary file allows for efficient and optimized data processing, enabling users to handle complex transformations and operations on the data before uploading it to the cloud storage. Real-life use cases for this library include data preprocessing tasks, data enrichment pipelines, and batch processing workflows, where data is read from different sources, processed locally, and then efficiently stored in Parquet format on Amazon S3 for further analysis and consumption.

 

Input File

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000,17-01-1998,A
312,Butler,Gerard,90,1000,06-08-2003,B
101,Hewitt,Jennifer Love,0,17000,25-05-1985,B
312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A
317,Murray,Bill,789.65,5000,05-02-2007,C
317,Murray,Bill,1,5000,05-02-2007,D

 

Java Code

package com.northconcepts.datapipeline.examples.amazons3;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;

import com.northconcepts.datapipeline.amazons3.AmazonS3FileSystem;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.parquet.ParquetDataWriter;

public class WriteParquetToAmazonS3UsingATemporaryFile {

    private static final String ACCESS_KEY = "YOUR ACCESS KEY";
    private static final String SECRET_KEY = "YOUR SECRET KEY";

    public static void main(String[] args) throws Throwable {

        File parquetFile = File.createTempFile("credit-balance", ".parquet");
        parquetFile.deleteOnExit();

        try {
            DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv"))
                .setFieldNamesInFirstRow(true);
            ParquetDataWriter writer = new ParquetDataWriter(parquetFile);

            Job.run(reader, writer);

            uploadFileToS3(parquetFile);
        } finally {
            parquetFile.delete();
        }
    }

    private static void uploadFileToS3(File parquetFile) throws Throwable {
        AmazonS3FileSystem s3 = new AmazonS3FileSystem();
        try {
            s3.setBasicAWSCredentials(ACCESS_KEY, SECRET_KEY);
            s3.open();

            OutputStream out = s3.writeMultipartFile("datapipeline-test-01", "output/credit-balance.parquet");
            InputStream in = new BufferedInputStream(new FileInputStream(parquetFile));

            byte[] buffer = new byte[1024];
            int lengthRead;
            while ((lengthRead = in.read(buffer)) > 0) {
                out.write(buffer, 0, lengthRead);
            }
        } finally {
            s3.close();
        }
    }
}

 

Code Walkthrough

  1. A temporary Parquet file credit-balance.parquet is created and set to delete itself on exit.
  2. CSVReader instance is created corresponding to the local input file credit-balance.csv.
  3. A ParquetDataWriter is then created pointing to the temporary file for writing.
  4. Data is transferred from CSVReader to ParquetDataWriter via Job.run() method.
  5. The file is uploaded to S3. The implementation of this operation is written in a separate method uploadFileToS3().
  6. First, AmazonS3FileSystem object is instantiated.
  7. Inside the try block, access credentials initially declared at a class level are assigned to the AmazonS3FileSystem object and connection to the system is established.
  8. OutputStream is created to write file data to the specified file path within a declared bucket.
  9. BufferedInputStream is created to read data from the input data using a buffer.
  10. Inside a while loop, data coming from BufferedInputStream is transferred to OutputStream. 

 

Output

Obtained records from the input CSV file will be written to credit-balance.parquet file in the Amazon s3 file system.

Mobile Analytics