Buffer Records by Time Period or Count
Updated: Jun 1, 2023
For this example you will learn how to use DataPipline to buffer records by time period or count.
You will use BufferedReader which is a proxy that organizes incoming data by collecting records of the same type (using values in a subset of fields) to release them downstream together.
Java Code listing
/* * Copyright (c) 2006-2022 North Concepts Inc. All rights reserved. * Proprietary and Confidential. Use is subject to license terms. * * https://northconcepts.com/data-pipeline/licensing/ */ package com.northconcepts.datapipeline.examples.cookbook; import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.buffer.BufferStrategy; import com.northconcepts.datapipeline.buffer.BufferedReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; public class BufferRecordsByTimePeriodOrCount { private static final int MAX_TRUCKS = 10; private static final long MAX_PACKAGES = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakePackageReader(MAX_TRUCKS, MAX_PACKAGES, RECORD_DELAY_MILLISECONDS); // group records by "truck_id" and release all records for each "truck_id" downstream every 10 seconds or // when 10 records for that "truck_id" have been collected // and limit the internal buffer size to 100 records reader = new BufferedReader(reader, 100, "truck_id") .setBufferStrategy(BufferStrategy.limitedTimeFromOpenOrLimitedRecords(TimeUnit.SECONDS.toMillis(10), 10)) // .setBufferStrategy(BufferStrategy.limitedTimeFromLastUpdateOrLimitedRecords(10000, 10)) // .setDebug(true) ; DataWriter writer = StreamWriter.newSystemOutWriter(); Job job = Job.run(reader, writer); System.out.println(job.getRecordsTransferred() + " - " + job.getRunningTimeAsString()); } //================================================== public static class FakePackageReader extends DataReader { private final int maxTrucks; private final long maxPackages; private long nextPackageId; private final long recordDelay; public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) { this.maxTrucks = maxTrucks; this.maxPackages = maxPackages; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextPackageId >= maxPackages) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("package_id", nextPackageId++); record.setField("truck_id", "truck" + nextPackageId % maxTrucks); record.setField("amount", nextPackageId + 0.01); return record; } } }
Code walkthrough
- BufferedReader accept three arguments
reader
that contains the input data,100
which is the maximum number of records the buffer will be limited to store and lastlytruck_id
which is the field that will be used for grouping. setBufferStrategy()
defines the strategy that will be used. For this example the strategy chosen allows for release of all records for each "truck_id" downstream every 10 seconds or when 10 records for that "truck_id" have been collected.- StreamWriter is used to write records to a stream in a human-readable format..
Job.run()
method which transfers data from thereader
to thewriter
is then called.FakePackageReader
basically generates fake records to be used as input data and also puts a delay after each record is generated. e.g. for this caseRECORD_DELAY_MILLISECONDS = 250
.
Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[package_id]:LONG=[0]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[1.01]:Double } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[package_id]:LONG=[10]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[11.01]:Double } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[package_id]:LONG=[20]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[21.01]:Double } ----------------------------------------------- 3 - Record (MODIFIED) { 0:[package_id]:LONG=[30]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[31.01]:Double } ----------------------------------------------- 4 - Record (MODIFIED) { 0:[package_id]:LONG=[1]:Long 1:[truck_id]:STRING=[truck2]:String 2:[amount]:DOUBLE=[2.01]:Double } ----------------------------------------------- 5 - Record (MODIFIED) { 0:[package_id]:LONG=[11]:Long 1:[truck_id]:STRING=[truck2]:String 2:[amount]:DOUBLE=[12.01]:Double } ----------------------------------------------- . . . ----------------------------------------------- 198 - Record (MODIFIED) { 0:[package_id]:LONG=[189]:Long 1:[truck_id]:STRING=[truck0]:String 2:[amount]:DOUBLE=[190.01]:Double } ----------------------------------------------- 199 - Record (MODIFIED) { 0:[package_id]:LONG=[199]:Long 1:[truck_id]:STRING=[truck0]:String 2:[amount]:DOUBLE=[200.01]:Double } ----------------------------------------------- 200 records