Add Values To An Array Using GroupByReader
Updated: Nov 22, 2025
Data Pipeline's GroupByReader provides powerful aggregation capabilities similar to SQL's GROUP BY clause. Beyond standard functions like sum, average, and count, it can also collect values from multiple records into a new array field within a single summary record.
This example demonstrates how to group sales data by category. For each category, it calculates the total sales by summing the prices and also collects all the product names into a new list (array) field named Products.
Java Code
package com.northconcepts.datapipeline.examples.cookbook;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.RecordList;
import com.northconcepts.datapipeline.core.StreamWriter;
import com.northconcepts.datapipeline.group.GroupByReader;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.memory.MemoryReader;
public class AddValuesToAnArrayUsingGroupByReader {
public static void main(String[] args) {
RecordList recordList = new RecordList();
recordList.add(createSale("Electronics", "Laptop", 1200));
recordList.add(createSale("Books", "Novel", 15));
recordList.add(createSale("Electronics", "Mouse", 25));
recordList.add(createSale("Clothing", "Shirt", 30));
recordList.add(createSale("Books", "Textbook", 90));
GroupByReader groupByReader = new GroupByReader(new MemoryReader(recordList), "Category");
groupByReader
// Sum the "Price" field and store the result in a new "TotalSales" field.
.sum("Price", "TotalSales")
// Collect values from the "Product" field into a new array field named "Products".
.collect("Product", "Products", false, false, true);
Job.run(groupByReader, new StreamWriter(System.out));
}
private static Record createSale(String category, String product, double price) {
Record record = new Record();
record.setField("Category", category);
record.setField("Product", product);
record.setField("Price", price);
return record;
}
}
Code Walkthrough
- An in-memory data source is created using a
RecordListand a MemoryReader. It's populated with five sales records, each containing a "Category", "Product", and "Price". - A GroupByReader is initialized, wrapping the
MemoryReader. It is configured to group the incoming records based on the values in the "Category" field. - Two aggregation operations are chained to the
GroupByReader:.sum("Price", "TotalSales")calculates the sum of the "Price" field for all records within each group and places the result in a new field named "TotalSales"..collect("Product", "Products", false, false, true)iterates through all records in a group, takes the value from the "Product" field, and adds it to a new array. This resulting array is then assigned to a new field named "Products" in the output record.
- A StreamWriter is set up to print the output records to the console.
- Job.run() executes the pipeline. The
GroupByReaderprocesses all records from theMemoryReader, creates summary records for each unique category, and passes them to theStreamWriter.
Console Output
-----------------------------------------------
0 - Record (MODIFIED) {
0:[Category]:STRING=[Electronics]:String
1:[TotalSales]:BIG_DECIMAL=[1225]:BigDecimal
2:[Products]:ARRAY of STRING=[[Laptop, Mouse]]:ArrayValue
}
-----------------------------------------------
1 - Record (MODIFIED) {
0:[Category]:STRING=[Books]:String
1:[TotalSales]:BIG_DECIMAL=[105]:BigDecimal
2:[Products]:ARRAY of STRING=[[Novel, Textbook]]:ArrayValue
}
-----------------------------------------------
2 - Record (MODIFIED) {
0:[Category]:STRING=[Clothing]:String
1:[TotalSales]:BIG_DECIMAL=[30]:BigDecimal
2:[Products]:ARRAY of STRING=[[Shirt]]:ArrayValue
}
-----------------------------------------------
3 records
