Group Records by TimePeriod or Count

Updated: May 30, 2022

In this example we will use DataPipline to group records by time period or Count.

We are going to explore how we can use Data Pipeline readers that let us perform SQL-like "group by" operations on any dataset.

Aggregate operations can be applied to data coming from flat files (CSV, XML, JSON), streaming data, Java Beans, APIs, or any format Data Pipeline supports, including your own custom readers.



Java Code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.util.concurrent.TimeUnit;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.group.CloseWindowStrategy;
import com.northconcepts.datapipeline.group.GroupByReader;
import com.northconcepts.datapipeline.job.Job;


public class GroupRecordsByTimePeriodOrCount {

    private static final int MAX_ORDERS = 10;
    private static final long MAX_TRANSACTIONS = 200;
    private static final int RECORD_DELAY_MILLISECONDS = 250;

    public static void main(String[] args) {
        
        DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS);
        
        // group records by "order_id" and release them every 10 seconds even if no new records are received
        reader = new GroupByReader(reader, "order_id")
                .count("count_of_transactions")
                .sum("amount")
                .setCloseWindowStrategy(CloseWindowStrategy.limitedTime(TimeUnit.SECONDS.toMillis(10)))
                ;
        
        
        DataWriter writer = StreamWriter.newSystemOutWriter();
        
        Job job = Job.run(reader, writer);
        
        System.out.println(job.getRecordsTransferred() + "  -  " + job.getRunningTimeAsString());
    }

    
    //==================================================
    
    public static class FakeMessageQueueReader extends DataReader {
        
        private final int maxOrders;
        private final long maxTransactions;
        private long nextTransactionId;
        private final long recordDelay;
        
        public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) {
            this.maxOrders = maxOrders;
            this.maxTransactions = maxTransactions;
            this.recordDelay = recordDelay;
        }
        
        @Override
        protected Record readImpl() throws Throwable {
            if (nextTransactionId >= maxTransactions) {
                return null;
            }
            
            if (recordDelay > 0) {
                Thread.sleep(recordDelay);
            }
            
            Record record = new Record();
            record.setField("transaction_id", nextTransactionId++);
            record.setField("order_id", "order" + nextTransactionId % maxOrders);
            record.setField("amount", nextTransactionId + 0.01);
            return record;
        }
        
    }

}

Code walkthrough

  1. The GroupByReader class performs aggregate operations on-the-fly without a database while the data is flowing.
  2. StreamWriter used to write records to a stream in a human-readable format.
  3. Job.run() method transfers data from the reader to the writer.
  4. FakeMessageQueueReader is used to write the data to the records.

GroupByReader

  1. It is a proxy that divides records into groups and applies summary operations to each group; similar to "group by" in SQL, but applied to streaming data.
  2. .count("count_of_transactions") returns number of records in each group.
  3. .sum("amount") returns sum of the argument in each group.
  4. .setCloseWindowStrategy() is used to create adjacent windows that stay open for 10 secons i.e. TimeUnit.SECONDS.toMillis(10).

Output

-----------------------------------------------
0 - Record (MODIFIED) {
    0:[order_id]:STRING=[order1]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[64.04]:BigDecimal
}

-----------------------------------------------
1 - Record (MODIFIED) {
    0:[order_id]:STRING=[order2]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[68.04]:BigDecimal
}

-----------------------------------------------
2 - Record (MODIFIED) {
    0:[order_id]:STRING=[order3]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[72.04]:BigDecimal
}

-----------------------------------------------
3 - Record (MODIFIED) {
    0:[order_id]:STRING=[order4]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[76.04]:BigDecimal
}

-----------------------------------------------
4 - Record (MODIFIED) {
    0:[order_id]:STRING=[order5]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[80.04]:BigDecimal
}

-----------------------------------------------
5 - Record (MODIFIED) {
    0:[order_id]:STRING=[order6]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[84.04]:BigDecimal
}

-----------------------------------------------
6 - Record (MODIFIED) {
    0:[order_id]:STRING=[order7]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[88.04]:BigDecimal
}

-----------------------------------------------
7 - Record (MODIFIED) {
    0:[order_id]:STRING=[order8]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[92.04]:BigDecimal
}

-----------------------------------------------
8 - Record (MODIFIED) {
    0:[order_id]:STRING=[order9]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[96.04]:BigDecimal
}

-----------------------------------------------
9 - Record (MODIFIED) {
    0:[order_id]:STRING=[order0]:String
    1:[count_of_transactions]:LONG=[3]:Long
    2:[amount]:BIG_DECIMAL=[60.03]:BigDecimal
}

-----------------------------------------------
10 - Record (MODIFIED) {
    0:[order_id]:STRING=[order0]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[220.04]:BigDecimal
}

-----------------------------------------------
11 - Record (MODIFIED) {
    0:[order_id]:STRING=[order1]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[224.04]:BigDecimal
}

-----------------------------------------------
12 - Record (MODIFIED) {
    0:[order_id]:STRING=[order2]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[228.04]:BigDecimal
}

-----------------------------------------------
13 - Record (MODIFIED) {
    0:[order_id]:STRING=[order3]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[232.04]:BigDecimal
}

-----------------------------------------------
14 - Record (MODIFIED) {
    0:[order_id]:STRING=[order4]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[236.04]:BigDecimal
}

-----------------------------------------------
15 - Record (MODIFIED) {
    0:[order_id]:STRING=[order5]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[240.04]:BigDecimal
}

-----------------------------------------------
16 - Record (MODIFIED) {
    0:[order_id]:STRING=[order6]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[244.04]:BigDecimal
}

-----------------------------------------------
17 - Record (MODIFIED) {
    0:[order_id]:STRING=[order7]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[248.04]:BigDecimal
}

-----------------------------------------------
18 - Record (MODIFIED) {
    0:[order_id]:STRING=[order8]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[252.04]:BigDecimal
}

-----------------------------------------------
19 - Record (MODIFIED) {
    0:[order_id]:STRING=[order9]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[256.04]:BigDecimal
}

-----------------------------------------------
20 - Record (MODIFIED) {
    0:[order_id]:STRING=[order0]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[380.04]:BigDecimal
}

-----------------------------------------------
21 - Record (MODIFIED) {
    0:[order_id]:STRING=[order1]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[384.04]:BigDecimal
}

-----------------------------------------------
22 - Record (MODIFIED) {
    0:[order_id]:STRING=[order2]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[388.04]:BigDecimal
}

-----------------------------------------------
23 - Record (MODIFIED) {
    0:[order_id]:STRING=[order3]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[392.04]:BigDecimal
}

-----------------------------------------------
24 - Record (MODIFIED) {
    0:[order_id]:STRING=[order4]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[396.04]:BigDecimal
}

-----------------------------------------------
25 - Record (MODIFIED) {
    0:[order_id]:STRING=[order5]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[400.04]:BigDecimal
}

-----------------------------------------------
26 - Record (MODIFIED) {
    0:[order_id]:STRING=[order6]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[404.04]:BigDecimal
}

-----------------------------------------------
27 - Record (MODIFIED) {
    0:[order_id]:STRING=[order7]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[408.04]:BigDecimal
}

-----------------------------------------------
28 - Record (MODIFIED) {
    0:[order_id]:STRING=[order8]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[412.04]:BigDecimal
}

-----------------------------------------------
29 - Record (MODIFIED) {
    0:[order_id]:STRING=[order9]:String
    1:[count_of_transactions]:LONG=[3]:Long
    2:[amount]:BIG_DECIMAL=[297.03]:BigDecimal
}

-----------------------------------------------
30 - Record (MODIFIED) {
    0:[order_id]:STRING=[order9]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[536.04]:BigDecimal
}

-----------------------------------------------
31 - Record (MODIFIED) {
    0:[order_id]:STRING=[order0]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[540.04]:BigDecimal
}

-----------------------------------------------
32 - Record (MODIFIED) {
    0:[order_id]:STRING=[order1]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[544.04]:BigDecimal
}

-----------------------------------------------
33 - Record (MODIFIED) {
    0:[order_id]:STRING=[order2]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[548.04]:BigDecimal
}

-----------------------------------------------
34 - Record (MODIFIED) {
    0:[order_id]:STRING=[order3]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[552.04]:BigDecimal
}

-----------------------------------------------
35 - Record (MODIFIED) {
    0:[order_id]:STRING=[order4]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[556.04]:BigDecimal
}

-----------------------------------------------
36 - Record (MODIFIED) {
    0:[order_id]:STRING=[order5]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[560.04]:BigDecimal
}

-----------------------------------------------
37 - Record (MODIFIED) {
    0:[order_id]:STRING=[order6]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[564.04]:BigDecimal
}

-----------------------------------------------
38 - Record (MODIFIED) {
    0:[order_id]:STRING=[order7]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[568.04]:BigDecimal
}

-----------------------------------------------
39 - Record (MODIFIED) {
    0:[order_id]:STRING=[order8]:String
    1:[count_of_transactions]:LONG=[3]:Long
    2:[amount]:BIG_DECIMAL=[414.03]:BigDecimal
}

-----------------------------------------------
40 - Record (MODIFIED) {
    0:[order_id]:STRING=[order8]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[692.04]:BigDecimal
}

-----------------------------------------------
41 - Record (MODIFIED) {
    0:[order_id]:STRING=[order9]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[696.04]:BigDecimal
}

-----------------------------------------------
42 - Record (MODIFIED) {
    0:[order_id]:STRING=[order0]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[700.04]:BigDecimal
}

-----------------------------------------------
43 - Record (MODIFIED) {
    0:[order_id]:STRING=[order1]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[704.04]:BigDecimal
}

-----------------------------------------------
44 - Record (MODIFIED) {
    0:[order_id]:STRING=[order2]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[708.04]:BigDecimal
}

-----------------------------------------------
45 - Record (MODIFIED) {
    0:[order_id]:STRING=[order3]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[712.04]:BigDecimal
}

-----------------------------------------------
46 - Record (MODIFIED) {
    0:[order_id]:STRING=[order4]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[716.04]:BigDecimal
}

-----------------------------------------------
47 - Record (MODIFIED) {
    0:[order_id]:STRING=[order5]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[720.04]:BigDecimal
}

-----------------------------------------------
48 - Record (MODIFIED) {
    0:[order_id]:STRING=[order6]:String
    1:[count_of_transactions]:LONG=[4]:Long
    2:[amount]:BIG_DECIMAL=[724.04]:BigDecimal
}

-----------------------------------------------
49 - Record (MODIFIED) {
    0:[order_id]:STRING=[order7]:String
    1:[count_of_transactions]:LONG=[3]:Long
    2:[amount]:BIG_DECIMAL=[531.03]:BigDecimal
}

-----------------------------------------------
50 - Record (MODIFIED) {
    0:[order_id]:STRING=[order7]:String
    1:[count_of_transactions]:LONG=[1]:Long
    2:[amount]:BIG_DECIMAL=[197.01]:BigDecimal
}

-----------------------------------------------
51 - Record (MODIFIED) {
    0:[order_id]:STRING=[order8]:String
    1:[count_of_transactions]:LONG=[1]:Long
    2:[amount]:BIG_DECIMAL=[198.01]:BigDecimal
}

-----------------------------------------------
52 - Record (MODIFIED) {
    0:[order_id]:STRING=[order9]:String
    1:[count_of_transactions]:LONG=[1]:Long
    2:[amount]:BIG_DECIMAL=[199.01]:BigDecimal
}

-----------------------------------------------
53 - Record (MODIFIED) {
    0:[order_id]:STRING=[order0]:String
    1:[count_of_transactions]:LONG=[1]:Long
    2:[amount]:BIG_DECIMAL=[200.01]:BigDecimal
}

-----------------------------------------------
54 records
14:53:42,230 DEBUG [main] datapipeline:661 - job::Success
54  -  51 Seconds, 784 Milliseconds
Mobile Analytics