Buffer Records by Time Period or Count
This example provides a buffering mechanism for records based on either a time period or a count. It allows users to collect and process data in batches, improving efficiency and enabling operations that require aggregating or analyzing data within a specific time frame or batch size. Real-life use cases for this library include streaming data processing, log analysis, real-time analytics, and data integration pipelines where buffering records can optimize data handling and improve overall system performance.
We will use BufferedReader which is a proxy that organizes incoming data by collecting records of the same type (using values in a subset of fields) to release them downstream together.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.buffer.BufferStrategy; import com.northconcepts.datapipeline.buffer.BufferedReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; public class BufferRecordsByTimePeriodOrCount { private static final int MAX_TRUCKS = 10; private static final long MAX_PACKAGES = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakePackageReader(MAX_TRUCKS, MAX_PACKAGES, RECORD_DELAY_MILLISECONDS); // group records by "truck_id" and release all records for each "truck_id" downstream every 10 seconds or // when 10 records for that "truck_id" have been collected // and limit the internal buffer size to 100 records reader = new BufferedReader(reader, 100, "truck_id") .setBufferStrategy(BufferStrategy.limitedTimeFromOpenOrLimitedRecords(TimeUnit.SECONDS.toMillis(10), 10)) // .setBufferStrategy(BufferStrategy.limitedTimeFromLastUpdateOrLimitedRecords(10000, 10)) // .setDebug(true) ; DataWriter writer = StreamWriter.newSystemOutWriter(); Job job = Job.run(reader, writer); System.out.println(job.getRecordsTransferred() + " - " + job.getRunningTimeAsString()); } //================================================== public static class FakePackageReader extends DataReader { private final int maxTrucks; private final long maxPackages; private long nextPackageId; private final long recordDelay; public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) { this.maxTrucks = maxTrucks; this.maxPackages = maxPackages; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextPackageId >= maxPackages) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("package_id", nextPackageId++); record.setField("truck_id", "truck" + nextPackageId % maxTrucks); record.setField("amount", nextPackageId + 0.01); return record; } } }
Code Walkthrough
FakePackageReader class is a custom DataReader class with tailored behavior and input data for this example. It has three constant (maxTrucks
, maxPackages
, recordDelay
) and one dynamic (nextPackageId
) fields.
The overridden method readImpl()
checks for the package count, stops the execution thread for some time, and, lastly, returns the Record instance with the following three attributes: package_id
, truck_id
and amount
.
- FakePackageReader instance is created with arguments that are declared at a class level.
- BufferedReader accepts three arguments:
reader
that contains the input data,100
which is the maximum number of records the buffer will be limited to storetruck_id
which is the field that will be used for grouping.
setBufferStrategy()
defines the strategy that will be used. For this example, the strategy chosen allows for the release of all records for each "truck_id" downstream every 10 milliseconds or when 10 records for that "truck_id" have been collected.- StreamWriter.newSystemOutWriter is used to write records to a standard output stream in a human-readable format.
Job.run()
method transfers data from thereader
to thewriter
.- The number of transferred records and execution time are also printed in the console.
Console Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[package_id]:LONG=[0]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[1.01]:Double } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[package_id]:LONG=[10]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[11.01]:Double } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[package_id]:LONG=[20]:Long 1:[truck_id]:STRING=[truck1]:String 2:[amount]:DOUBLE=[21.01]:Double }
... ----------------------------------------------- 199 - Record (MODIFIED) { 0:[package_id]:LONG=[199]:Long 1:[truck_id]:STRING=[truck0]:String 2:[amount]:DOUBLE=[200.01]:Double } ----------------------------------------------- 200 records
200 - 51 Seconds, 375 Milliseconds