Write Parquet to Amazon S3 Using a Temporary File
This example writes Parquet files to Amazon S3 by using a temporary file stored locally. It reads records from an input file, saves the data in Parquet format to a temporary file on the local storage, and then uploads the temporary file to the Amazon S3 file system.
This example is particularly useful in scenarios where users need to process and transform large volumes of data before persisting it in Parquet format on Amazon S3. Leveraging a local temporary file allows for efficient and optimized data processing, enabling users to handle complex transformations and operations on the data before uploading it to the cloud storage. Real-life use cases for this library include data preprocessing tasks, data enrichment pipelines, and batch processing workflows, where data is read from different sources, processed locally, and then efficiently stored in Parquet format on Amazon S3 for further analysis and consumption.
Input File
Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating 101,Reeves,Keanu,9315.45,10000,17-01-1998,A 312,Butler,Gerard,90,1000,06-08-2003,B 101,Hewitt,Jennifer Love,0,17000,25-05-1985,B 312,Pinkett-Smith,Jada,49654.87,100000,05-12-2006,A 317,Murray,Bill,789.65,5000,05-02-2007,C 317,Murray,Bill,1,5000,05-02-2007,D
Java Code
package com.northconcepts.datapipeline.examples.amazons3; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import com.northconcepts.datapipeline.amazons3.AmazonS3FileSystem; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.parquet.ParquetDataWriter; public class WriteParquetToAmazonS3UsingATemporaryFile { private static final String ACCESS_KEY = "YOUR ACCESS KEY"; private static final String SECRET_KEY = "YOUR SECRET KEY"; public static void main(String[] args) throws Throwable { File parquetFile = File.createTempFile("credit-balance", ".parquet"); parquetFile.deleteOnExit(); try { DataReader reader = new CSVReader(new File("example/data/input/credit-balance.csv")) .setFieldNamesInFirstRow(true); ParquetDataWriter writer = new ParquetDataWriter(parquetFile); Job.run(reader, writer); uploadFileToS3(parquetFile); } finally { parquetFile.delete(); } } private static void uploadFileToS3(File parquetFile) throws Throwable { AmazonS3FileSystem s3 = new AmazonS3FileSystem(); try { s3.setBasicAWSCredentials(ACCESS_KEY, SECRET_KEY); s3.open(); OutputStream out = s3.writeMultipartFile("datapipeline-test-01", "output/credit-balance.parquet"); InputStream in = new BufferedInputStream(new FileInputStream(parquetFile)); byte[] buffer = new byte[1024]; int lengthRead; while ((lengthRead = in.read(buffer)) > 0) { out.write(buffer, 0, lengthRead); } } finally { s3.close(); } } }
Code Walkthrough
- A temporary Parquet file
credit-balance.parquet
is created and set to delete itself on exit. - A CSVReader instance is created corresponding to the local input file
credit-balance.csv
. - A ParquetDataWriter is then created pointing to the temporary file for writing.
- Data is transferred from CSVReader to ParquetDataWriter via Job.run() method.
- The file is uploaded to S3. The implementation of this operation is written in a separate method
uploadFileToS3()
. - First, AmazonS3FileSystem object is instantiated.
- Inside the try block, access credentials initially declared at a class level are assigned to the AmazonS3FileSystem object and connection to the system is established.
- OutputStream is created to write file data to the specified file path within a declared bucket.
- BufferedInputStream is created to read data from the input data using a buffer.
- Inside a while loop, data coming from BufferedInputStream is transferred to OutputStream.
Output
Obtained records from the input CSV file will be written to credit-balance.parquet
file in the Amazon s3 file system.