Load Snapshot of Dataset
Updated: Jun 24, 2023
The example is designed to load a snapshot of a dataset to the memory, providing users with the ability to work with a fixed version of the data at a specific point in time. This library facilitates the process of reading the dataset from a source, such as a file or a database and loading it into memory or a data processing environment.
Real-life use cases for this library include historical data analysis, data auditing, data versioning, and creating data backups. For example, financial institutions may use this library to analyze historical market data for trend analysis or compliance purposes.
Input CSV file
stock,time,price,shares JHX,09:30:00.00,57,95 JNJ,09:30:00.00,91.14,548 OPK,09:30:00.00,8.3,300 OPK,09:30:00.00,8.3,63 OMC,09:30:00.00,74.53,100 OMC,09:30:00.00,74.53,24 TWTR,09:30:00.00,64.89,100 TWTR,09:30:00.00,64.89,25 TWTR,09:30:00.00,64.89,245 TWTR,09:30:00.00,64.89,55 USB,09:30:00.00,39.71,400 USB,09:30:00.00,39.71,359
...
Java Code
package com.northconcepts.datapipeline.foundations.examples.pipeline; import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.foundations.file.LocalFileSource; import com.northconcepts.datapipeline.foundations.pipeline.Pipeline; import com.northconcepts.datapipeline.foundations.pipeline.dataset.Dataset; import com.northconcepts.datapipeline.foundations.pipeline.dataset.MemoryDataset; import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput; public class LoadSnapshotOfDataset { public static void main(String[] args) { CsvPipelineInput pipelineInput = new CsvPipelineInput() .setFileSource(new LocalFileSource().setPath("example/data/input/trades.csv")) .setFieldNamesInFirstRow(true); Pipeline pipeline = new Pipeline(); pipeline.setInput(pipelineInput); Dataset dataset = new MemoryDataset(pipeline); dataset.load().waitForRecordsToLoad(10000, TimeUnit.SECONDS.toMillis(4)); System.out.println("Total Records:- " + dataset.getRecordCount()); } }
Code Walkthrough
- CsvPipelineInput instance is created to specify the input file
trades.csv
. - A new Pipeline object is created and
pipelineInput
is declared as an input for that object. new MemoryDataset(pipeline)
is used to create the dataset in memory, it acceptspipeline
as a parameter which is the source of the dataset's data.dataset.load()
starts the asynchronous loading of records from the pipeline into this dataset. This method returns immediately and does not wait for loading to complete..waitForRecordsToLoad()
is used to ensure that all the records have been loaded before proceeding forward.- The record count is printed on the console.
Console Output
14:42:27,002 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc. 14:42:27,130 DEBUG [job-1] datapipeline:615 - Job[1,job-1,Mon Jun 19 14:42:27 EEST 2023]::Start 14:42:27,538 DEBUG [job-1] datapipeline:661 - job::Success Total Records:- 246013