Buffer Records by Time Period or Count

Updated: Jun 29, 2023

This example provides a buffering mechanism for records based on either a time period or a count. It allows users to collect and process data in batches, improving efficiency and enabling operations that require aggregating or analyzing data within a specific time frame or batch size. Real-life use cases for this library include streaming data processing, log analysis, real-time analytics, and data integration pipelines where buffering records can optimize data handling and improve overall system performance.

We will use BufferedReader which is a proxy that organizes incoming data by collecting records of the same type (using values in a subset of fields) to release them downstream together.

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.util.concurrent.TimeUnit;

import com.northconcepts.datapipeline.buffer.BufferStrategy;
import com.northconcepts.datapipeline.buffer.BufferedReader;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.job.Job;


public class BufferRecordsByTimePeriodOrCount {

    private static final int MAX_TRUCKS = 10;
    private static final long MAX_PACKAGES = 200;
    private static final int RECORD_DELAY_MILLISECONDS = 250;

    public static void main(String[] args) {
        
        DataReader reader = new FakePackageReader(MAX_TRUCKS, MAX_PACKAGES, RECORD_DELAY_MILLISECONDS);
        
        // group records by "truck_id" and release all records for each "truck_id" downstream every 10 seconds or 
        // when 10 records for that "truck_id" have been collected
        // and limit the internal buffer size to 100 records
        
        reader = new BufferedReader(reader, 100, "truck_id")
                .setBufferStrategy(BufferStrategy.limitedTimeFromOpenOrLimitedRecords(TimeUnit.SECONDS.toMillis(10), 10))
//                .setBufferStrategy(BufferStrategy.limitedTimeFromLastUpdateOrLimitedRecords(10000, 10))
//                .setDebug(true)
                ;
        
        
        DataWriter writer = StreamWriter.newSystemOutWriter();
        
        Job job = Job.run(reader, writer);
        
        System.out.println(job.getRecordsTransferred() + "  -  " + job.getRunningTimeAsString());
    }

    
    //==================================================
    
    public static class FakePackageReader extends DataReader {
        
        private final int maxTrucks;
        private final long maxPackages;
        private long nextPackageId;
        private final long recordDelay;
        
        public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) {
            this.maxTrucks = maxTrucks;
            this.maxPackages = maxPackages;
            this.recordDelay = recordDelay;
        }
        
        @Override
        protected Record readImpl() throws Throwable {
            if (nextPackageId >= maxPackages) {
                return null;
            }
            
            if (recordDelay > 0) {
                Thread.sleep(recordDelay);
            }
            
            Record record = new Record();
            record.setField("package_id", nextPackageId++);
            record.setField("truck_id", "truck" + nextPackageId % maxTrucks);
            record.setField("amount", nextPackageId + 0.01);
            return record;
        }
        
    }

}

 

Code Walkthrough

FakePackageReader class is a custom DataReader class with tailored behavior and input data for this example. It has three constant (maxTrucks, maxPackages, recordDelay) and one dynamic (nextPackageId) fields.

The overridden method readImpl() checks for the package count, stops the execution thread for some time, and, lastly, returns the Record instance with the following three attributes: package_id, truck_id and amount.

  1. FakePackageReader instance is created with arguments that are declared at a class level.
  2. BufferedReader accepts three arguments:
    1. reader that contains the input data,
    2. 100 which is the maximum number of records the buffer will be limited to store
    3. truck_id which is the field that will be used for grouping.
  3. setBufferStrategy() defines the strategy that will be used. For this example, the strategy chosen allows for the release of all records for each "truck_id" downstream every 10 milliseconds or when 10 records for that "truck_id" have been collected.
  4. StreamWriter.newSystemOutWriter is used to write records to a standard output stream in a human-readable format.
  5. Job.run() method transfers data from the reader to the writer.
  6. The number of transferred records and execution time are also printed in the console.

 

Console Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[package_id]:LONG=[0]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[1.01]:Double
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[package_id]:LONG=[10]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[11.01]:Double
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[package_id]:LONG=[20]:Long
    1:[truck_id]:STRING=[truck1]:String
    2:[amount]:DOUBLE=[21.01]:Double
}
... ----------------------------------------------- 199 - Record (MODIFIED) { 0:[package_id]:LONG=[199]:Long 1:[truck_id]:STRING=[truck0]:String 2:[amount]:DOUBLE=[200.01]:Double } ----------------------------------------------- 200 records
200 - 51 Seconds, 375 Milliseconds
Mobile Analytics