Data Pipeline User Guide
Introduction
Getting Started
This section helps get your DataPipeline application up and running quickly. You'll need to do the following steps:
- Add your Maven or Gradle dependencies.
- Download a license (it only takes a few seconds).
- Add the license file to your app.
- Start coding https://github.com/NorthConcepts/DataPipeline-Examples.
1. Add Maven or Gradle Dependencies
DataPipeline comes in several editions, you'll have to choose one to add to your build.
Add the code below to your pom.xml file.
<repositories> <repository> <id>datapipeline</id> <url>https://maven.northconcepts.com/public/repositories/datapipeline</url> <releases/> </repository> </repositories> <dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-express</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies>Gradle
Add the code below to your build.gradle file.
repositories { jcenter() maven { url = 'https://maven.northconcepts.com/public/repositories/datapipeline' } } dependencies { compile 'com.northconcepts:northconcepts-datapipeline-express:6.0.0' }
Add the code below to your pom.xml file.
<repositories> <repository> <id>datapipeline</id> <url>https://maven.northconcepts.com/public/repositories/datapipeline</url> <releases/> </repository> </repositories> <dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-team</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies>Gradle
Add the code below to your build.gradle file.
repositories { jcenter() maven { url = 'https://maven.northconcepts.com/public/repositories/datapipeline' } } dependencies { compile 'com.northconcepts:northconcepts-datapipeline-team:6.0.0' }
Add the code below to your pom.xml file.
<repositories> <repository> <id>datapipeline</id> <url>https://maven.northconcepts.com/public/repositories/datapipeline</url> <releases/> </repository> </repositories> <dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-small-business</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies>Gradle
Add the code below to your build.gradle file.
repositories { jcenter() maven { url = 'https://maven.northconcepts.com/public/repositories/datapipeline' } } dependencies { compile 'com.northconcepts:northconcepts-datapipeline-small-business:6.0.0' }
Add the code below to your pom.xml file.
<repositories> <repository> <id>datapipeline</id> <url>https://maven.northconcepts.com/public/repositories/datapipeline</url> <releases/> </repository> </repositories> <dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-enterprise</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies>Gradle
Add the code below to your build.gradle file.
repositories { jcenter() maven { url = 'https://maven.northconcepts.com/public/repositories/datapipeline' } } dependencies { compile 'com.northconcepts:northconcepts-datapipeline-enterprise:6.0.0' }
2. Download a license
Download your DataPipeline license, it only takes a few seconds.
Express License | Download License |
Team License | Download License |
Small Business License | Download License |
Enterprise License | Get an Enterprise Quote |
3. Install License
Your license email will contain an attached NorthConcepts-DataPipeline.license file.
Option 1 - Add License to ClasspathThe easiest way to install your license is to add it to your classpath. Place it into your app's src/main/resources folder to be automatically picked up.
Option 2 - Load license from a file pathIn addition to the classpath-based license loading, you can also point to a file outside of your application using a JVM parameter.
-Ddatapipeline.license.file=/license-folder/NorthConcepts-DataPipeline.license
You can also load your license from an InputStream in case it's stored in a database, remote server, or anywhere else that can produce a stream. Create a class that implements InputStreamFactory and point to if via the datapipeline.license.factory JVM param.
-Ddatapipeline.license.factory=com.example.LicenseLoader
4. Start Coding
Start running the examples or demos on Github: https://github.com/NorthConcepts/DataPipeline-Examples and https://github.com/NorthConcepts/DataPipeline-Demos.
Add-ons
DataPipeline comes with several add-ons for you to integrate with a variety of systems.
Name | Description |
---|---|
Amazon S3 | Amazon S3 file system.
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-filesystems-amazons3</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-filesystems-amazons3:6.0.0' |
Avro | Apache Avro files.
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-avro</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-avro:6.0.0' |
Email reader.
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-email</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-email:6.0.0' |
|
Google analytics, calendar, contacts, Gmail
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-google</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-google:6.0.0' |
|
Instagram readers
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-instagram</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-instagram:6.0.0' |
|
JMS | Java Message Service reader/writer
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-jms</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-jms:6.0.0' |
Kafka | Kafka
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-kafka</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-kafka:6.0.0' |
Mailchimp | Mailchimp
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-mailchimp</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-mailchimp:6.0.0' |
MongoDB | MongoDB
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-mongodb</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-mongodb:6.0.0' |
PDF
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-pdf</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-pdf:6.0.0' |
|
RTF | MS Word
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-rtf</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-rtf:6.0.0' |
Template | Write text for websites, email, and other text formats using FreeMarker templates
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-template</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-template:6.0.0' |
Trello | Trello
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-trello</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-trello:6.0.0' |
Twitter
Maven
<dependencies> <dependency> <groupId>com.northconcepts</groupId> <artifactId>northconcepts-datapipeline-integrations-twitter</artifactId> <version>6.0.0</version> <scope>compile</scope> </dependency> </dependencies> Gradle
compile 'com.northconcepts:northconcepts-datapipeline-integrations-twitter:6.0.0' |
Additional Resources
In addition to this user guide, there are several places you can look for more information.
- The examples section contain code you can copy and paste into your projects.
- The Javadocs contain the detailed listing of the API.
- You can contact us with any questions you may have.
Running Diagnostics
DataPipeline includes a manual Diagnostic tool that helps troubleshoot setup and environment issues. It gathers relevant info from your system and prints them to the console. You can run the tool and inspect the output or email us the output along with a description of your issue.
How to run the Diagnostic tool:
- On the command line:
java -jar NorthConcepts-DataPipeline-5.1.0.jar com.northconcepts.datapipeline.diagnostic.Diagnostic
- From your app:
new com.northconcepts.datapipeline.diagnostic.Diagnostic().log();
- From your app to collect the output:
String output = new com.northconcepts.datapipeline.diagnostic.Diagnostic().toString();
The following Information is collected by the tool:
- Local Date & Time with TimeZone
- Location of license file (if present) and its contents
- Location of DataReader class file
- Number of records transfered in a single job (out of 100 million records)
- All JVM properties
We do not collect any personal information, IP addresses, or geographic data.
How Data Pipeline Works
Data Pipeline is easy to understand and use if you already have experience with the java.io classes.
While Java's I/O classes work with bytes and characters, Data Pipeline works with records.
Records
Each record is a mutable data structure containing zero or more fields. Each field has a name, type, and value.
Field Names
Fields have a default name that are assigned automatically when not explicitly set. The first field in a record is named A, the second is B, the 26th is Z, the 27th is AA, then AB, AC, and so on. If a field with the default name already exists, then a number is added to the name (starting with 2) until a unique name is found.
Field Values
Field values can contain:
- Single values (like a string, integer, boolean, or date)
- Byte arrays
- Any Java object
- Other records
- Arrays containing any combination of the above (including other arrays)
Records, fields, and values all extend a base Node class allowing you to work with tabular data (Excel, CSV, JDBC) and hierarchical data (JSON, XML) using the same API.
Since each record contains its own set of fields, it's possible for each record in a stream to contain a completely (or somewhat) different set of fields.
Field Types
Field types can be any one of the values defined in the FieldType enum.
If Data Pipeline cannot match a field's value to one of the enum values — for example your Customer class — it will set the type as FieldType.UNDEFINED
.
Data Readers and Writers
Data Pipeline is built on two very simple concepts.
- DataReaders — to stream records into the pipeline from your source formats and locations.
- DataWriters — to stream records out of the pipeline to your target formats and locations.
Readers and writers are available for many formats which you can read about in detail (see Data Pipeline formats).
Stream Operators
Once data is flowing into the pipeline, Stream Operators provide the ability to filter, enrich, transform, and summarize data in-flight.
Stream Operators are not a true third concept. Instead, they're a special kind of reader or writer that operates on other readers and writers. They use the decorator pattern to wrap other readers and writers to intercept (and possibly alter) records going into them or coming out from them.
This allows stream operators to transform and process data in-flight, in a common way, regardless of the data's original source or ultimate target. This also allows simple operators to be chained together to form complex processing sequences.
Running Jobs
Data Pipeline transfers records from readers to writers using jobs. The chain of readers and writers in a job is called a pipeline. You might find the terms job and pipeline used interchangeably.

As with the Java I/O classes, you can program the transfer yourself (open streams, read-write records in a loop, close streams), however the Job
class offers many benefits:
- It can be monitored and managed using JMX (Java Management Extensions)
- It can be run synchronously or asynchronously
- It can be cancelled, paused, and resumed
- It's thread-safe
- It's state can safely be logged at any time or on completion
- It publishes lifecycle events to the system event bus
- It has built-in error handling
- It supports callbacks
- It tracks stats about itself
Synchronous Jobs
The easiest way to run a job is synchronously. You instantiate a Job
instance, passing in the DataReader and DataWriter, and call run()
.
The run method will execute in the current thread, causing it to block until the job is complete.
The old way of running jobs with the JobTemplate — JobTemplate.DEFAULT.transfer(reader, writer);
— still works.
Behind the scenes, JobTemplate now uses the new Job class to receive all the benefits mentioned above.
Asynchronous Jobs
If you don't want the current thread to block or you need to run several jobs in parallel, Data Pipeline gives you several options.
You can always create a new thread and call job.run()
in the thread's run method, but here are a few nicer ways.
RunAsync
The easiest option is just to call runAsync()
instead of run()
.
Thread
The second option is to use your own thread. Since Job implements Runnable
, it can be passed to a Thread's constructor to be executed concurrently.
ExecutorService
The third option is to use an ExecutorService
. This approach gives you the most control over how (and how many) jobs are executed in parallel.
Waiting for Jobs to Start and Finish
Regardless of how you chose to execute your job, you can always choose to block until the job has started or finished. This can come in handy when coordinating several jobs.
Both the start and finish wait method allows you to specify a maximum time to wait.
Managing Running Jobs
Once a job has started, Data Pipeline gives you the ability to either cancel it outright or pause it temporarily and resume it later.
Cancel
Running jobs can be cancelled by calling the cancel()
method. It's important to remember that cancelling a job does not guarantee that it will stop right away.
Depending on what the job is doing, it may take a noticeable time before the job stops.
Pause & Resume
Running jobs can be temporarily paused by calling the pause()
method. It's important to remember that cancelling a job does not guarantee that it will stop right away.
Depending on what the job is doing, it may take a noticeable time before the job stops.
Monitoring Jobs
Data Pipeline provides you with several ways to monitor your jobs. These range from simply calling methods on the Job class all the way up to sophisticated, enterprise monitoring tools.
Directly Access Job Properties
As you've already seen, jobs allow you to retrieve information on their state at any time — even when they're not running.
Here's the list of data each job directly exposes via its getter methods.
Method | Description |
---|---|
getId() | The unique (sequential) ID of this job (within this JVM/classloader). |
getUuid() | The universally unique identifier (UUID) for this job. |
getName() | The name assigned to this job or its default name if one was not assigned. |
getCreatedOn() | The time this job instance was created. |
getStartedOn() | The time this job instance started running |
getCancelledOn() | The time this job instance was cancelled. |
isCancelled() | Indicates if this job was cancelled |
getPausedOn() | The time this job was paused or null if this job is not currently paused |
isPaused() | Indicates if this job is currently paused |
getFinishedOn() | The time this job finished running |
isFinished() | Indicates if this job has finished running |
getException() | The last exception thrown while running |
isFailed() | Indicates if this job has completed unsuccessfully. |
getRunThread() | The thread currently executing this job or null if the job is not running. |
isRunning() | Indicates if this job is currently running |
getRunningTime() | Time in milliseconds this job was (or has been) running for or zero (0) if it hasn't yet started running. |
getRunningTimeAsString() | The time this job was (or has been) running for as a human readable string or null if it hasn't yet started running. |
getRecordsTransferred() | The number of records sent from the supplied reader to the supplied writer. |
getLastRecord() | The most recent record seen by this job while it is running or null . |
getReader() | The reader supplied to this job. |
getWriter() | The writer supplied to this job. |
getReaders() | The read pipeline — the chain of readers records pass through, starting with the root instance and ending with the reader supplied to this job. |
getWriters() | The write pipeline — the chain of writers records pass through, starting with the writer supplied to this job and ending with the root instance. |
Callback Hooks
Each job allows you to specify one object to receive notifications of progress and completion. Since the methods of the callback object are used directly on the execution path, the more you do in them the slower your job will run.
Job Lifecycle Listener
In addition to callback hooks, Data Pipeline publishes job lifecycle events on the system event bus. These events let you know when jobs start running, finish running, are paused, or are resumed.
The Job class has addJobLifecycleListener
and removeJobLifecycleListener
convenience
methods for you to register JobLifecycleListener
instances.
Logging
A fourth way of monitoring your jobs is to provide a DataWriter to the job where it can record its stats once the job finishes. However, you can request the job log
its current stats any time by calling the log()
method. Calling log too often will cause your job to run slow since the job momentarily stops running to take a snapshot of its entire state.
Here's an example of the data sent to the log writer once the job finishes.
Fields | Description |
---|---|
0 | The timestamp for this log entry. |
1-15 | Field names beginning with "job_" are job-level stats and will be the same for all records logged a the same time. |
16-24 | Field names beginning with "endpoint_" are reader/writer-level stats. One record will be emitted for each reader/writer in this job's pipeline.
The first record corresponds to the first reader in the pipeline chain and the last record corresponds to the last writer in the chain. Field 16 (endpoint_index) identifies the reader or writer's position in the pipeline. Field's 17 and 18 (endpoint_simple_type and endpoint_qualified_type) indicate the reader/writer class for that step of the pipeline. |
JMX Monitoring
The final way to monitor your jobs is using JMX (Java Management Extensions).
Data Pipeline's JMX integration is disabled by default, but you can enable it by add a call to DataEndpoint.enableJmx();
somewhere early in your code (like in a static initializer).
Once JMX has been enabled, Data Pipeline will automatically register your running jobs with the platform MBean server.
Using a JMX client like VisualVM, you'll be able to see and manage your running jobs, as well each reader and writer that make up their pipeline.
Data Formats
Read about data formats built into Data Pipeline here: Data Formats.
Filtering Data
See filter examples here: Filter Examples.
Data Validation
See data validation examples here: Data Validation Examples.
Transforming Data
See data transformation examples here: Data Transformation Examples.
Conditional Transformations
See conditional transformation example here: Conditional Transformation Example.
Lookup & Joins
See lookup examples here: Lookup Examples.
Data Conversions
See data conversions examples here: Data Conversions.
Data Aggregation
Data Pipeline provides a reader that lets you perform SQL-like "group by" operations on any dataset.
The GroupByReader class performs these aggregate operations on-the-fly — without a database — while the data is flowing.
Aggregate operations can be applied to data coming from flat files (CSV, XML, JSON), streaming data, Java Beans, APIs, or any format Data Pipeline supports, including your own custom readers.
Simple Aggregation
The simplest example of aggregation is to summarize an entire stream. This summary acts like a SQL group-by.
The following example accepts the input records from the CSV file and outputs one record for each unique combination of year and month, having calculated the count, sum, avg, min, and max for each combination.
The output looks something like this.
Aggregate Operators
Operator | Class | Convenience Method | Description |
---|---|---|---|
Count | GroupCount | GroupByReader.count(String) | The number of records in each group. |
Sum | GroupSum | GroupByReader.sum(String, String) | The sum of the argument in each group. |
Avg | GroupAverage | GroupByReader.avg(String, String) | The average of the argument in each group. |
Max | GroupMaximum | GroupByReader.max(String, String) | The maximum value in each group. |
Min | GroupMinimum | GroupByReader.min(String, String) | The minimum value in each group. |
First | GroupFirst | GroupByReader.first(String, String) | The first value seen in each group. |
Last | GroupLast | GroupByReader.first(String, String) | The last value seen in each group. |
* all operators are in the com.northconcepts.datapipeline.group.operation package.
Sliding Window Aggregation
Data Pipeline uses a concept called sliding window aggregation to summarize streaming data inside GroupByReader
. Sliding windows
collect input data while open and emit summary data once closed.
Windows are opened (created) and closed based on configurable strategies that rely on record count, time, record content, or a combination of strategies.
You've already seen the default strategy that uses a single window for the entire dataset. However, you'll have to change strategies if you're dealing with continuous, streaming data or just want to summarize data in batches.
Batch/Adjacent Windows
The first configuration described here allows you to obtain regular summaries from fixed numbers of records. This effectively simulates the output you receive in batch-only environments.

When GroupByReader
starts it perform the following actions:
- Opens single window until its accepted a set number of records
- Closes the window and emits the summary data to the next step in the pipeline
- Opens a new window and starts collecting again
The following example uses a create strategy that opens a new window only if one isn't already open (line 8). The close strategy caps the window to 50 records at most.
The example also turns on the debug flag to log when windows are opened and closed.
While it's possible for a group operator (like sum or max) to hold onto the actual records while its window is open, none of the built-in operators work that way. This allow windows to be fairly memory-cheap, which is important as you'll see in the next section.
Overlapping Windows
Another way to configure your windows is to have them overlap each other. You may want overlapping windows in order to track things like moving averages or to help recognize related events that can span adjacent windows.
Depending on your configuration, the number of overlapping windows can grow fairly large, that's why it's important that group operators attempt to minimize the amount of memory they consume.

The following example uses a create strategy that opens a new window every 10 records and a close strategy to cap them to 40 records.
Sparse/Sample Windows
Sliding windows don't have to overlap or even be adjacent to one other. If you only need a sample of data at set periods, you can create a configuration where the close strategy is shorter than the open strategy.

The following example uses a create strategy that opens every 40 records, but closes after just 10 records are collected.
Time-Based Windows
The previous examples all used record count as the basis for their window strategies. However, Data Pipeline also provides several built-in strategies for working with time based windows.
You can create adjacent windows that stay open for 10 minutes.
You can also mix strategies and create windows that open every hour until they've collected 100,000 records. Depending on how long it takes to collect that many records, you may end up with some overlapping windows and some sparse windows.
If you wanted to limit the hourly windows to 100,000 records or 15 minutes — whichever came first — you can combine a record strategy with
a time strategy using the or()
operator function.
Built-in Window Strategies
The following built-in strategies are used by GroupByReader
to determine when it should open a new window.
Create Window Strategy | Description |
---|---|
CreateWindowStrategy.limitOpened(int) | Opens a new window if there are less currently open than the specified number. |
CreateWindowStrategy.recordPeriod(int) | Opens a new window at set record count intervals or on the first record. |
CreateWindowStrategy.startInterval(long) | Opens a new window at set time intervals. |
CreateWindowStrategy.or(CreateWindowStrategy...) | Opens a new window if any of the supplied strategies request it. |
CreateWindowStrategy.and(CreateWindowStrategy...) | Opens a new window if all of the supplied strategies request it. |
The following built-in strategies are used by GroupByReader
to determine when it should close open window.
Close Window Strategy | Description |
---|---|
CloseWindowStrategy.never() | Never closes open windows. |
CloseWindowStrategy.limitedTime(long) | Closes windows after they've been open for the specified time. |
CloseWindowStrategy.limitedTime(String, long) | Closes windows after the specified datetime field has increased by a set amount of time since it was first seen. This is useful for aggregating hourly logs by their timestamp field. |
CloseWindowStrate@gy.limitedRecords(long) | Closes the window after the specified amount of records has been collected. |
CloseWindowStrategy.scheduled(Scheduler) | Closes windows on a set clock schedule. For example, every 30 minutes on the hour and half hour. |
CloseWindowStrategy.scheduled(String, Scheduler) | Closes windows on a set clock schedule for the specified datetime field. For example, every 30 minutes at 15 and 45 past the hour. This is useful for aggregating hourly logs at set times by their timestamp field. |
CloseWindowStrategy.hourlyScheduleInMinutes(int...) | Closes windows at fixed minutes past each hour (for example: 0, 15, 30, and 45 minutes past each hour). |
CloseWindowStrategy.hourlyScheduleInMinutes(String, int...) | Closes windows at fixed minutes past each hour for the specified datetime field (for example: 0, 15, 30, and 45 minutes past each hour). |
CloseWindowStrategy.or(CloseWindowStrategy...) | Closes windows if any of the supplied strategies request it. |
CloseWindowStrategy.and(CloseWindowStrategy...) | Closes windows if all of the supplied strategies request it. |
Custom Window Strategies
In addition to the built-in strategies, you can create your own by implementing
com.northconcepts.datapipeline.group.CreateWindowStrategy
and com.northconcepts.datapipeline.group.CloseWindowStrategy
.
Below you can see the implementation for the limitOpened()
method used before. It simple compares the number of opened windows
to the specified value.
The never()
method has a similarly simple implementation — it just always returns false
.
Custom Aggregate Operators
Creating custom aggregate operators is a bit more complicated than custom window strategies, but not by much.
Group operators are actually two classes working together com.northconcepts.datapipeline.group.GroupOperation
and com.northconcepts.datapipeline.group.GroupOperationField
. Think of GroupOperation (sum, min, max, avg) as the
factory a for creating instantiates of GroupOperationField for each field in each window.
For example, when you code groupByReader.sum("price").sum("qty");
, each open window uses the sum
GroupOperation (called GroupSum) to create two instances of GroupOperationField. One for each field the window needs to sum.
From there on, each GroupOperationField receives every record passing through its window while the window is open
(GroupOperationField.apply(Record, Field)
).
When the window closes, each GroupOperationField emits its summary data when GroupOperationField.getValue()
is called.
With that explanation, here's how the sum operator is implemented.
Detail and Summarize in a Single Pipeline
This section describes a scenario where you have a single input file (or stream) and wish to perform multiple streams of work with it in parallel.
The following example takes an input CSV stream and converts the detail records to XML in one job, while using the GroupByReader
to
summarize it in another job.
Both jobs run asynchronously, but the splitting happens in the main thread. If you need the splitting to run in a separate thread,
either call its runAsync()
method or pass it as a Runnable
to a new Thread
or ExecutorService
.
You can read more about the DeMux splitter class in the multi-threading section Reading One-to-Many Asynchronously.
Expression Language
Read about the dynamic expression language built into Data Pipeline here: Expression Language.
Multi-threaded Processing
Data Pipeline runs each job in a single thread by default. The Job
class uses its thread to pull data from readers and push it to writers
one record at a time .
Each record flowing through the pipeline goes from step to step until it's either written out or discarded. Only after a record is finished traveling through the pipeline does the next record get read in to start the journey again. This idea called one-piece flow (or single-piece flow) processing is a principle of lean manufacturing.
You've already seen how individual jobs can be made to run synchronously (blocking the main thread) or asynchronously (in their own thread). This section will show you how to increase your record flow by running parts of a job in parallel, in their own threads — regardless of how the job itself is executed.
Asynchronous Reading
One approach to increase record throughput is to decouple the reading of data from the rest of the pipeline.
If you're working with a particularly slow data source, it might not make sense to stop reading from it while the rest of the pipeline executes.
For this reason Data Pipeline provides an AsyncReader
that eagerly reads data from other readers into a buffer using a separate thread.
Downstream requests for data can be satisfied directly from the buffer without blocking (waiting). Blocking only occurs if the buffer is empty when a downstream request for data is made.
This example eagerly buffers up to 1 MB of data from a CSV file in parallel to the pipeline.
Since AsyncReader
can read from any other DataReader
, you can place if anywhere in a pipeline
you feel will benefit from parallel processing.
This example adds a second thread to ensure the filter step happens in parallel with the CSV reading and the rest of the pipeline.
Asynchronous Writing
If instead of a slow source you're working with a slow target, Data Pipeline provides an AsyncWriter
that uses a separate thread
to continuously write data from its buffer to any downstream DataWriter
.
Data written to the AsyncWriter
is placed in the buffer, allowing the pipeline to continue with the next record without waiting for the previous
record to be written. The only time the pipeline must wait is when the buffer is full.
The following example uses an AsyncWriter
with a 1000 record buffer to write to a StreamWriter
in a separate thread.
The StreamWriter here can be replaced with a JdbcWriter
or any other writer.
Reading Many-to-One Asynchronously
Perhaps instead of a single slow datasource, you have several slow datasources that produce complementary data.
In this scenario, you can use the AsyncMultiReader
class to read from all sources concurrently. This reader uses
a separate thread to load each upstream source into its buffer. Like the AsyncReader
, downstream requests
won't have to wait when pulling from the buffer unless it's empty.
This example reads from two CSV files in parallel and merges their data into a union to be written out downstream. Each
CSVReader
can be replaced with other readers, including filters, transformers, and aggregators.
Reading One-to-Many Asynchronously
Turning an input stream coming from a single DataReader into several is accomplished using the DeMux
class. DeMux is a special case
because it's neither a reader or writer, it acts almost like a job, taking data from a reader and sending to the buffers of downstream readers.
The constructor for DeMux accepts a strategy that's responsible for what the downstream readers receive. The built-in implementations are
DeMux.Strategy.BROADCAST
DeMux.Strategy.BROADCAST | Sends a copy of each record to every downstream reader. |
---|---|
DeMux.Strategy.ROUND_ROBIN | Sends each record to a single downstream reader by cycling through the downstream readers one-at-a-time. |
You can see a code example of DeMux in action in the data aggregation section on how to detail and summarize in a single pipeline.
Writing One-to-Many Asynchronously
Data Pipeline has a few ways to do one-to-many writer pipelining.
The first approach is not asynchronous at all. It uses MultiWriter
to write to two or more DataWriters sequentially, in the same thread.
You can make this pipeline asynchronous by wrapping the MultiWriter
in an AsyncWriter
.
Another way to make this pipeline asynchronous is to wrap the writers passed to MultiWriter
with AsyncWriter
.
Event Bus
Data Pipeline's EventBus
class is an delivery service for applications wishing to publish and subscribe to events.
This service handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shut down.
The following sections first describe the event bus as a general application service and then show how to use it as part of your pipelines.
How the Event Bus Works
The event bus is general enough to be used in any application, not just those employing Data Pipeline's readers and writers. For example, you can use the event bus in your Swing, SWT, or Android apps. You can also use it in your web apps to send real-time updates to web users.
EventListener Interface
One big difference between this event bus and other implementations of the observer pattern is that it uses the same interface for both publishing and subscribing to events.
If an event publisher calls userAdded(User)
, that same method will be called sometime later (using a different thread) on all the event subscribers. There's no
need to a create separate UserEvent
class or call a notifyUserAdded(UserEvent)
method. Just call a method on your interface and have it propagate
automatically.
The only requirement is that your listener must be a Java interface for the event bus to use it.
Starting and Stopping a Bus
Starting and stopping an event bus is simple. Just create one and it's started. To stop a bus, call its shutdown()
method and it terminates gracefully, allowing queued
events up to 30 seconds to be delivered.
You can change the shutdown timeout by calling EventBus.setShutdownTimeout(long)
. Once the timeout expires, the bus is forcibly terminated and no more
events will be delivered.
Using The System Event Bus
Data Pipeline has a system event bus it uses to deliver lifecycle events for jobs and event buses. You can use the system event bus in your apps instead of creating new ones. Just remember not to shut it down or lifecycle events will stop being delivered and JMX monitoring will stop working.
Publish and Subscribe to Events
The first step in publishing events is to create your own listener interface.
The next step is to add subscribers to the bus using your new listener interface (lines 4 & 12). finally, you then create a publisher you can call when you need to send a notification (line 20).
Accessing the Current Event
Behind the scenes, the event bus wraps up your method call in an event object.
This object is what gets queued and delivered to be executed against your subscribers. You can retrieve this object at any time during event delivery using its thread-local
accessor EventBus.getCurrentEvent()
.
Connecting Pipelines to an Event Bus
While the event bus is general enough to be used in any application, it comes with built-in connectors for reading and writing record to a bus.
EventBusWriter
The EventBusWriter
allows you to send records from any pipeline to an event bus. When constructing an EventBusWriter, you
need to supply it with the event bus you are targeting, plus a topic. The topic can be a string, enum or other object and is used by the bus to distinguish
one stream of records from another.
EventBusReader
The EventBusReader
pulls records from an event bus into your pipeline. It's constructor takes the source event bus, plus zero or more topics to read from.
If the topic is null or missing, the reader will read all records regardless of their topic.
One advantage of using an event bus is that you can connect any number of readers to a single topic. You can also connect any number of writers or the same or different topics. In many cases, and event bus can be a more flexible option to DeMux-based multi-threading.
Event Bus Pipelines
The event bus treats data as a flowing stream. Unlike its bigger, Enterprise Service Bus (ESB) cousin, the event bus does not persist records. If your reader isn't connected to the bus when the event is published, it will not be able to see it later. That's why it's important to connect your readers to the bus prior to sending records from a writer.
When records are published to the bus, the writer send it immediately to the bus which queues it in its buffer or blocks the writer if the buffer is full. On the delivery side, the bus puts records into the reader's queue, unless the reader is full in which case the bus will block.
This example uses a single writer to send records to the bus using the purchases topic (lines 23-26). Two separate readers then read from purchases topic and write to their individual targets (lines 11-13 and 16-19).
Monitoring Event Buses
Like job monitoring, event buses can also be monitored in several ways.
Directly Access Event Bus Properties
Event buses can be monitored by directly accessing information on their state at any time.
Here's the list of data each event bus exposes via its getter methods.
Method | Description |
---|---|
getId() | The unique (sequential) ID of this bus (within this JVM/classloader). |
getUuid() | The universally unique identifier (UUID) for this bus. |
getName() | The name assigned to this bus or its default name if one was not assigned. |
getState() | The current state of this bus (ALIVE, SHUTTING_DOWN, TERMINATED). |
isAlive() | Indicates if this bus has not been shutdown. |
getCreatedOn() | The time this job instance was created. |
getStartedOn() | The time this job instance started running |
isDebug() | Indicates if the bus should emit debug log messages when events are published or delivered and during the shutdown process. |
getShutdownTimeout() | The time in milliseconds this bus will wait for events to complete before forcibly shutting down (defaults to 30 seconds). |
getShutdownTimeoutAsString() | The time as human readable string this bus will wait for events to complete before forcibly shutting down (defaults to 30 seconds). |
getPublishersCreated() | The number of event publishers created by this bus. |
getEventsPublished() | The number of events sent to this event bus for delivery to listeners. |
getEventsDelivered() | The combined number of events each listener on this event bus has received. |
getEventsActive() | The approximate number of threads that are actively delivering events using this bus' executorService or -1 if unknown. |
getEventsQueued() | The number of events waiting to be executed by this bus' executorService or -1 if unknown. |
getTaskCount() | The approximate total number of tasks that have ever been scheduled for execution by this bus' executorService or -1 if unknown. |
getCompletedTaskCount() | The approximate total number of tasks that have completed execution by this bus' executorService or -1 if unknown. |
getCorePoolSize() | The core number of threads in this bus' executorService or -1 if unknown. |
getLargestPoolSize() | The largest number of threads that have ever simultaneously been in the pool in this bus' executorService or -1 if unknown. |
getMaximumPoolSize() | The maximum allowed number of threads in this bus' executorService or -1 if unknown. |
getPoolSize() | The current number of threads in this bus' executorService pool or -1 if unknown. |
getErrorCount() | The number of exceptions this bus has experienced. |
getTypedListenerCount() | The number of type-specific listeners subscribed to this bus. |
getUntypedListenerCount() | The number of non-type-specific listeners subscribed to this bus. |
getListenerCount() | The total number of listeners subscribed to this bus. |
Event Bus Lifecycle Listener
Similar to the life cycle events provided by each job, event buses also publish their life cycle events on the system bus. These events let you know when new event buses are created and when existing buses start shutting down. No event get sent when a bus finishes shutting down, your code will have to treat the shutting down event as a successful termination.
The EventBus class has addEventBusLifecycleListener
and removeEventBusLifecycleListener
convenience
methods for you to register EventBusLifecycleListener
instances.
Watching All Records on a Bus
A third way to monitor the bus is to monitor the records passing through the bus.
There are a couple ways to do this. The first way is to create a reader with no topic.
The second way is to register a listener on the bus. This option is arguably better since it bypasses the EventBusReader's buffer.
Here is the complete example.
Watching All Events on a Bus
A fourth way of monitoring a bus is to watch all events using the UntypedEventListener
.
This option captures all events, including non-record events and lifecycle events for jobs and event buses
(when monitoring the system bus).
Here's the complete example using this approach.
Here's some of the output you'd see running this example. You can see it captures both the lifecycle events and the different record events for all topics.
Monitoring Event Buses with JMX
The final approach to monitoring is to use JMX (Java Management Extensions). With this option, you can plug in your enterprise monitoring tools (or a JMX client like VisualVM) to track event buses and jobs alike.
To enable monitoring, just add a call to DataEndpoint.enableJmx();
somewhere early in your code (like in a static initializer).
Handling Exceptions
Exceptions occurring on the event bus are treated like any other event.
You simply register an instance of ExceptionListener
using EventBus.addExceptionListener(EventFilter, ExceptionListener)
and wait for exceptions to occur.
Here's what the output from above would look like.
Meter and Throttle
See metering and throttling examples here: Meter Examples and Throttle Examples.
Debugging
See debugging examples here: Debugging Examples.
Error Handling
See error handling examples here: Error Handling Examples.
Creating Custom Components
See custom components examples here: Custom Components Examples.