Buffer Records by Time Period or Count

Updated: May 26, 2022

For this example we are going to see how we can use DataPipline to buffer records by time period or count.

We are going to use BufferedReader which is a proxy that organizes incoming data by collecting records of the same type (using values in a subset of fields) to release them downstream together.

Java Code listing

/*
 * Copyright (c) 2006-2022 North Concepts Inc.  All rights reserved.
 * Proprietary and Confidential.  Use is subject to license terms.
 * 
 * https://northconcepts.com/data-pipeline/licensing/
 */
package com.northconcepts.datapipeline.examples.cookbook;

import java.util.concurrent.TimeUnit;

import com.northconcepts.datapipeline.buffer.BufferStrategy;
import com.northconcepts.datapipeline.buffer.BufferedReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;


public class BufferRecordsByTimePeriodOrCount {

    private static final int MAX_TRUCKS = 10;
    private static final long MAX_PACKAGES = 200;
    private static final int RECORD_DELAY_MILLISECONDS = 250;

    public static void main(String[] args) {
        
        DataReader reader = new FakePackageReader(MAX_TRUCKS, MAX_PACKAGES, RECORD_DELAY_MILLISECONDS);
        
        // group records by "truck_id" and release all records for each "truck_id" downstream every 10 seconds or 
        // when 10 records for that "truck_id" have been collected
        // and limit the internal buffer size to 100 records
        
        reader = new BufferedReader(reader, 100, "truck_id")
                .setBufferStrategy(BufferStrategy.limitedTimeFromOpenOrLimitedRecords(TimeUnit.SECONDS.toMillis(10), 10))
//                .setBufferStrategy(BufferStrategy.limitedTimeFromLastUpdateOrLimitedRecords(10000, 10))
//                .setDebug(true)
                ;
        
        
        DataWriter writer = StreamWriter.newSystemOutWriter();
        
        Job job = Job.run(reader, writer);
        
        System.out.println(job.getRecordsTransferred() + "  -  " + job.getRunningTimeAsString());
    }

    
    //==================================================
    
    public static class FakePackageReader extends DataReader {
        
        private final int maxTrucks;
        private final long maxPackages;
        private long nextPackageId;
        private final long recordDelay;
        
        public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) {
            this.maxTrucks = maxTrucks;
            this.maxPackages = maxPackages;
            this.recordDelay = recordDelay;
        }
        
        @Override
        protected Record readImpl() throws Throwable {
            if (nextPackageId >= maxPackages) {
                return null;
            }
            
            if (recordDelay > 0) {
                Thread.sleep(recordDelay);
            }
            
            Record record = new Record();
            record.setField("package_id", nextPackageId++);
            record.setField("truck_id", "truck" + nextPackageId % maxTrucks);
            record.setField("amount", nextPackageId + 0.01);
            return record;
        }
        
    }

}

Code walkthrough

  1. BufferedReader accept three arguments reader that contains the input data, 100 which is the maximum number of records the buffer will be limited to store and lastly truck_id which is the field that will be used for grouping.
  2. setBufferStrategy() defines the strategy that will be used. For this example we chose a strategy that allows us to release all records for each "truck_id" downstream every 10 seconds or when 10 records for that "truck_id" have been collected.
  3. StreamWriter is used to write records to a stream in a human-readable format..
  4. Job.run() method which transfers data from the reader to the writeris then called.
  5. FakePackageReader basically generates fake records to be used as input data and also puts a delay after each record is generated. e.g. for this case RECORD_DELAY_MILLISECONDS = 250.

Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[package_id]:LONG=[0]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[1.01]:Double
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[package_id]:LONG=[10]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[11.01]:Double
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[package_id]:LONG=[20]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[21.01]:Double
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[package_id]:LONG=[30]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[31.01]:Double
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[package_id]:LONG=[1]:Long
    1:[truck_id]:STRING=[truck2]:String
    2:[amount]:DOUBLE=[2.01]:Double
}

-----------------------------------------------
5 - Record (MODIFIED) {
    0:[package_id]:LONG=[11]:Long
    1:[truck_id]:STRING=[truck2]:String
    2:[amount]:DOUBLE=[12.01]:Double
}

-----------------------------------------------
.
.
.
-----------------------------------------------
198 - Record (MODIFIED) {
    0:[package_id]:LONG=[189]:Long
    1:[truck_id]:STRING=[truck0]:String
    2:[amount]:DOUBLE=[190.01]:Double
}

-----------------------------------------------
199 - Record (MODIFIED) {
    0:[package_id]:LONG=[199]:Long
    1:[truck_id]:STRING=[truck0]:String
    2:[amount]:DOUBLE=[200.01]:Double
}

-----------------------------------------------
200 records

All Examples

Mobile Analytics