How to Convert Tabular Data to Trees Using Aggregation

How to Convert Tabular Data to Trees Using Aggregation

We recently received an email from a Java developer asking how to convert records in a table (like you get in a relational database, CSV, or Excel file) to a composite tree structure.  Normally, we’d point to one of Data Pipeline’s XML or JSON data writers, but for good reasons those options didn’t apply here.  The developer emailing us needed the hierarchical structures in object form for use in his API calls.

Since we didn’t have a general purpose, table-tree mapper, we built one.  We looked at several options, but ultimately decided to add a new operator to the GroupByReader.  This not only answered the immediate mapping question, but also allowed him to use the new operator with sliding window aggregation if the need ever arose.

The rest of this blog will walk you through the implementation in case you ever need to add your own custom aggregate operator to Data Pipeline.

First let’s look at what this operator does.

Input CSV File

Let’s say you have the following CSV file.  It’s pretty straightforward: it has a transaction date, the account it applies to, the type of transaction, and the amount applied.

Input CSV File

Output Composite Structure

The goal is to take the input file and create a separate record for each account (123, 456, 789, and 147).  Each of the new account records will contain nested records for the different transaction types, dates, and amounts related to that account.  If the first record were converted to JSON, it would look structurally like the following.

Output JSON Tree Structure

Since we’re not creating JSON, the actual Data Pipeline records will look very similar to JSON, but will contain rich types like BigDecimal and Date that JSON hasn’t traditionally supported.  Here’s the output of Recors.toString().

Data Pipeline Output Record Structure

The Data Pipeline Job

The work to read the CSV file, filter, transform, and aggregate it is seven steps.

1. Read the CSV file.

2. Filter credit and debit transactions.

3. Select and arrange the output fields.

4. Convert the fields from strings to their actual types

5. Sort the data to ensure child branches are grouped together.

6. Use the new GroupTree aggregate operator – that we’ll implement below – to build the new records grouped-by account as the root of each branch.

7. Write to the desired target.

This example writes to the console (System.out), but the developer can implement his custom API calls as a new DataWriter subclass.

Data Pipeline Aggregation API

Before we get into the GroupTree implementation, you need to understand the classes involved in aggregation and sliding windows.

Data Pipeline Aggregation API


GroupByReader is the controlling class and is what we used in the example above.  GroupByReader holds a list of open windows and closed windows (while they still have data to emit).  It also holds the GroupOperations that will be applied in each window along with the window strategies.

Window Strategies

The CreateWindowStrategy classes is responsible for determining when new windows should be opened, while CloseWindowStrategy determines when open windows should be closed.  There are several built-in implementations based on time and data.  You can implement your own strategies or combine strategies together using Boolean logic.


Windows receive incoming data while they’re open and emit their results after they’re closed.  Open windows take the input records and apply GroupOperations (sum, count, etc.) to them via a GroupField instance.  Once a window is closed, the values in its GroupFields are collected into records and returned by GroupByReader.


You can think of GroupOperation as a factory that creates GroupOperationFields.  Whenever an open window determines that a new aggregate field needs to be created for a record in the output group, it calls GroupOperation.createField().  The window then calls GroupOperationField.apply(Record, Field) for each input record it sees.  Most of the operator’s logic is actually done inside each implementation of the field’s apply() method.

GroupTree Implementation

First the code.


The new operator starts by extending the GroupOperation class and overriding its createField() factory method.  In our example, createField() will be called once for every new account and the resulting field’s apply() will be called once for every record related to that account.  The apply() method will be responsible for adding each tabular record to the appropriate place in the nested tree record that it will eventually return.  As an example, the simplest apply() method we have is in the GroupCount class and it just increments a counter.

Get Value

The created field’s getValue() method is called by closed windows whenever they are asked by GroupByReader for records.  In this case, it will return the first field in the root record as an array.

Next Release

The GroupTree operator will be available in the next release.  If you can’t wait, just use the code from this blog and get started.

Happy Coding!

About Dele Taylor

We make Data Pipeline — a lightweight ETL framework for Java. Use it to filter, transform, and aggregate data on-the-fly in your web, mobile, and desktop apps. Learn more about it at

Leave a Reply

Your email address will not be published. Required fields are marked *
You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">