Group Records by TimePeriod or Count
Updated: May 30, 2022
In this example we will use DataPipline to group records by time period or Count.
We are going to explore how we can use Data Pipeline readers that let us perform SQL-like "group by" operations on any dataset.
Aggregate operations can be applied to data coming from flat files (CSV, XML, JSON), streaming data, Java Beans, APIs, or any format Data Pipeline supports, including your own custom readers.
Java Code listing
package com.northconcepts.datapipeline.examples.cookbook; import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.group.CloseWindowStrategy; import com.northconcepts.datapipeline.group.GroupByReader; import com.northconcepts.datapipeline.job.Job; public class GroupRecordsByTimePeriodOrCount { private static final int MAX_ORDERS = 10; private static final long MAX_TRANSACTIONS = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS); // group records by "order_id" and release them every 10 seconds even if no new records are received reader = new GroupByReader(reader, "order_id") .count("count_of_transactions") .sum("amount") .setCloseWindowStrategy(CloseWindowStrategy.limitedTime(TimeUnit.SECONDS.toMillis(10))) ; DataWriter writer = StreamWriter.newSystemOutWriter(); Job job = Job.run(reader, writer); System.out.println(job.getRecordsTransferred() + " - " + job.getRunningTimeAsString()); } //================================================== public static class FakeMessageQueueReader extends DataReader { private final int maxOrders; private final long maxTransactions; private long nextTransactionId; private final long recordDelay; public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) { this.maxOrders = maxOrders; this.maxTransactions = maxTransactions; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextTransactionId >= maxTransactions) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("transaction_id", nextTransactionId++); record.setField("order_id", "order" + nextTransactionId % maxOrders); record.setField("amount", nextTransactionId + 0.01); return record; } } }
Code walkthrough
- The GroupByReader class performs aggregate operations on-the-fly without a database while the data is flowing.
- StreamWriter used to write records to a stream in a human-readable format.
Job.run()
method transfers data from thereader
to thewriter
.FakeMessageQueueReader
is used to write the data to the records.
GroupByReader
- It is a proxy that divides records into groups and applies summary operations to each group; similar to "group by" in SQL, but applied to streaming data.
.count("count_of_transactions")
returns number of records in each group..sum("amount")
returns sum of the argument in each group..setCloseWindowStrategy()
is used to create adjacent windows that stay open for 10 secons i.e.TimeUnit.SECONDS.toMillis(10)
.
Output
----------------------------------------------- 0 - Record (MODIFIED) { 0:[order_id]:STRING=[order1]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[64.04]:BigDecimal } ----------------------------------------------- 1 - Record (MODIFIED) { 0:[order_id]:STRING=[order2]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[68.04]:BigDecimal } ----------------------------------------------- 2 - Record (MODIFIED) { 0:[order_id]:STRING=[order3]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[72.04]:BigDecimal } ----------------------------------------------- 3 - Record (MODIFIED) { 0:[order_id]:STRING=[order4]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[76.04]:BigDecimal } ----------------------------------------------- 4 - Record (MODIFIED) { 0:[order_id]:STRING=[order5]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[80.04]:BigDecimal } ----------------------------------------------- 5 - Record (MODIFIED) { 0:[order_id]:STRING=[order6]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[84.04]:BigDecimal } ----------------------------------------------- 6 - Record (MODIFIED) { 0:[order_id]:STRING=[order7]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[88.04]:BigDecimal } ----------------------------------------------- 7 - Record (MODIFIED) { 0:[order_id]:STRING=[order8]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[92.04]:BigDecimal } ----------------------------------------------- 8 - Record (MODIFIED) { 0:[order_id]:STRING=[order9]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[96.04]:BigDecimal } ----------------------------------------------- 9 - Record (MODIFIED) { 0:[order_id]:STRING=[order0]:String 1:[count_of_transactions]:LONG=[3]:Long 2:[amount]:BIG_DECIMAL=[60.03]:BigDecimal } ----------------------------------------------- 10 - Record (MODIFIED) { 0:[order_id]:STRING=[order0]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[220.04]:BigDecimal } ----------------------------------------------- 11 - Record (MODIFIED) { 0:[order_id]:STRING=[order1]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[224.04]:BigDecimal } ----------------------------------------------- 12 - Record (MODIFIED) { 0:[order_id]:STRING=[order2]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[228.04]:BigDecimal } ----------------------------------------------- 13 - Record (MODIFIED) { 0:[order_id]:STRING=[order3]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[232.04]:BigDecimal } ----------------------------------------------- 14 - Record (MODIFIED) { 0:[order_id]:STRING=[order4]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[236.04]:BigDecimal } ----------------------------------------------- 15 - Record (MODIFIED) { 0:[order_id]:STRING=[order5]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[240.04]:BigDecimal } ----------------------------------------------- 16 - Record (MODIFIED) { 0:[order_id]:STRING=[order6]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[244.04]:BigDecimal } ----------------------------------------------- 17 - Record (MODIFIED) { 0:[order_id]:STRING=[order7]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[248.04]:BigDecimal } ----------------------------------------------- 18 - Record (MODIFIED) { 0:[order_id]:STRING=[order8]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[252.04]:BigDecimal } ----------------------------------------------- 19 - Record (MODIFIED) { 0:[order_id]:STRING=[order9]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[256.04]:BigDecimal } ----------------------------------------------- 20 - Record (MODIFIED) { 0:[order_id]:STRING=[order0]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[380.04]:BigDecimal } ----------------------------------------------- 21 - Record (MODIFIED) { 0:[order_id]:STRING=[order1]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[384.04]:BigDecimal } ----------------------------------------------- 22 - Record (MODIFIED) { 0:[order_id]:STRING=[order2]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[388.04]:BigDecimal } ----------------------------------------------- 23 - Record (MODIFIED) { 0:[order_id]:STRING=[order3]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[392.04]:BigDecimal } ----------------------------------------------- 24 - Record (MODIFIED) { 0:[order_id]:STRING=[order4]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[396.04]:BigDecimal } ----------------------------------------------- 25 - Record (MODIFIED) { 0:[order_id]:STRING=[order5]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[400.04]:BigDecimal } ----------------------------------------------- 26 - Record (MODIFIED) { 0:[order_id]:STRING=[order6]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[404.04]:BigDecimal } ----------------------------------------------- 27 - Record (MODIFIED) { 0:[order_id]:STRING=[order7]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[408.04]:BigDecimal } ----------------------------------------------- 28 - Record (MODIFIED) { 0:[order_id]:STRING=[order8]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[412.04]:BigDecimal } ----------------------------------------------- 29 - Record (MODIFIED) { 0:[order_id]:STRING=[order9]:String 1:[count_of_transactions]:LONG=[3]:Long 2:[amount]:BIG_DECIMAL=[297.03]:BigDecimal } ----------------------------------------------- 30 - Record (MODIFIED) { 0:[order_id]:STRING=[order9]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[536.04]:BigDecimal } ----------------------------------------------- 31 - Record (MODIFIED) { 0:[order_id]:STRING=[order0]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[540.04]:BigDecimal } ----------------------------------------------- 32 - Record (MODIFIED) { 0:[order_id]:STRING=[order1]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[544.04]:BigDecimal } ----------------------------------------------- 33 - Record (MODIFIED) { 0:[order_id]:STRING=[order2]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[548.04]:BigDecimal } ----------------------------------------------- 34 - Record (MODIFIED) { 0:[order_id]:STRING=[order3]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[552.04]:BigDecimal } ----------------------------------------------- 35 - Record (MODIFIED) { 0:[order_id]:STRING=[order4]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[556.04]:BigDecimal } ----------------------------------------------- 36 - Record (MODIFIED) { 0:[order_id]:STRING=[order5]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[560.04]:BigDecimal } ----------------------------------------------- 37 - Record (MODIFIED) { 0:[order_id]:STRING=[order6]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[564.04]:BigDecimal } ----------------------------------------------- 38 - Record (MODIFIED) { 0:[order_id]:STRING=[order7]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[568.04]:BigDecimal } ----------------------------------------------- 39 - Record (MODIFIED) { 0:[order_id]:STRING=[order8]:String 1:[count_of_transactions]:LONG=[3]:Long 2:[amount]:BIG_DECIMAL=[414.03]:BigDecimal } ----------------------------------------------- 40 - Record (MODIFIED) { 0:[order_id]:STRING=[order8]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[692.04]:BigDecimal } ----------------------------------------------- 41 - Record (MODIFIED) { 0:[order_id]:STRING=[order9]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[696.04]:BigDecimal } ----------------------------------------------- 42 - Record (MODIFIED) { 0:[order_id]:STRING=[order0]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[700.04]:BigDecimal } ----------------------------------------------- 43 - Record (MODIFIED) { 0:[order_id]:STRING=[order1]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[704.04]:BigDecimal } ----------------------------------------------- 44 - Record (MODIFIED) { 0:[order_id]:STRING=[order2]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[708.04]:BigDecimal } ----------------------------------------------- 45 - Record (MODIFIED) { 0:[order_id]:STRING=[order3]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[712.04]:BigDecimal } ----------------------------------------------- 46 - Record (MODIFIED) { 0:[order_id]:STRING=[order4]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[716.04]:BigDecimal } ----------------------------------------------- 47 - Record (MODIFIED) { 0:[order_id]:STRING=[order5]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[720.04]:BigDecimal } ----------------------------------------------- 48 - Record (MODIFIED) { 0:[order_id]:STRING=[order6]:String 1:[count_of_transactions]:LONG=[4]:Long 2:[amount]:BIG_DECIMAL=[724.04]:BigDecimal } ----------------------------------------------- 49 - Record (MODIFIED) { 0:[order_id]:STRING=[order7]:String 1:[count_of_transactions]:LONG=[3]:Long 2:[amount]:BIG_DECIMAL=[531.03]:BigDecimal } ----------------------------------------------- 50 - Record (MODIFIED) { 0:[order_id]:STRING=[order7]:String 1:[count_of_transactions]:LONG=[1]:Long 2:[amount]:BIG_DECIMAL=[197.01]:BigDecimal } ----------------------------------------------- 51 - Record (MODIFIED) { 0:[order_id]:STRING=[order8]:String 1:[count_of_transactions]:LONG=[1]:Long 2:[amount]:BIG_DECIMAL=[198.01]:BigDecimal } ----------------------------------------------- 52 - Record (MODIFIED) { 0:[order_id]:STRING=[order9]:String 1:[count_of_transactions]:LONG=[1]:Long 2:[amount]:BIG_DECIMAL=[199.01]:BigDecimal } ----------------------------------------------- 53 - Record (MODIFIED) { 0:[order_id]:STRING=[order0]:String 1:[count_of_transactions]:LONG=[1]:Long 2:[amount]:BIG_DECIMAL=[200.01]:BigDecimal } ----------------------------------------------- 54 records 14:53:42,230 DEBUG [main] datapipeline:661 - job::Success 54 - 51 Seconds, 784 Milliseconds