Load Snapshot of Dataset

The example is designed to load a snapshot of a dataset to the memory, providing users with the ability to work with a fixed version of the data at a specific point in time. This library facilitates the process of reading the dataset from a source, such as a file or a database and loading it into memory or a data processing environment.

Real-life use cases for this library include historical data analysis, data auditing, data versioning, and creating data backups. For example, financial institutions may use this library to analyze historical market data for trend analysis or compliance purposes.

 

Input CSV file

stock,time,price,shares
JHX,09:30:00.00,57,95
JNJ,09:30:00.00,91.14,548
OPK,09:30:00.00,8.3,300
OPK,09:30:00.00,8.3,63
OMC,09:30:00.00,74.53,100
OMC,09:30:00.00,74.53,24
TWTR,09:30:00.00,64.89,100
TWTR,09:30:00.00,64.89,25
TWTR,09:30:00.00,64.89,245
TWTR,09:30:00.00,64.89,55
USB,09:30:00.00,39.71,400
USB,09:30:00.00,39.71,359
...

 

Java Code

package com.northconcepts.datapipeline.foundations.examples.pipeline;

import java.util.concurrent.TimeUnit;

import com.northconcepts.datapipeline.foundations.file.LocalFileSource;
import com.northconcepts.datapipeline.foundations.pipeline.Pipeline;
import com.northconcepts.datapipeline.foundations.pipeline.dataset.Dataset;
import com.northconcepts.datapipeline.foundations.pipeline.dataset.MemoryDataset;
import com.northconcepts.datapipeline.foundations.pipeline.input.CsvPipelineInput;

public class LoadSnapshotOfDataset {

    public static void main(String[] args) {
        CsvPipelineInput pipelineInput = new CsvPipelineInput()
                .setFileSource(new LocalFileSource().setPath("example/data/input/trades.csv"))
                .setFieldNamesInFirstRow(true);

        Pipeline pipeline = new Pipeline();

        pipeline.setInput(pipelineInput);
        
        Dataset dataset = new MemoryDataset(pipeline);
        dataset.load().waitForRecordsToLoad(10000, TimeUnit.SECONDS.toMillis(4));

        System.out.println("Total Records:- " + dataset.getRecordCount());
    }
}

 

Code Walkthrough

  1. CsvPipelineInput instance is created to specify the input file trades.csv.
  2. A new Pipeline object is created and pipelineInput is declared as an input for that object.
  3. new MemoryDataset(pipeline) is used to create the dataset in memory, it accepts pipeline as a parameter which is the source of the dataset's data.
  4. dataset.load() starts the asynchronous loading of records from the pipeline into this dataset. This method returns immediately and does not wait for loading to complete.
  5. .waitForRecordsToLoad() is used to ensure that all the records have been loaded before proceeding forward.
  6. The record count is printed on the console.

 

Console Output

14:42:27,002 DEBUG [main] datapipeline:37 - DataPipeline v8.1.0-SNAPSHOT by North Concepts Inc.
14:42:27,130 DEBUG [job-1] datapipeline:615 - Job[1,job-1,Mon Jun 19 14:42:27 EEST 2023]::Start
14:42:27,538 DEBUG [job-1] datapipeline:661 - job::Success
Total Records:- 246013
Mobile Analytics