Data Pipeline User Guide

Introduction

Getting Started

This section helps get your Data Pipeline application up and running quickly. You should already have working knowledge of your development and execution environment since this guide won't cover how to configure your build system or IDE.

Download Data Pipeline

Data Pipeline comes in several editions ranging from a free, Express edition to several premium editions.

Data Pipeline Download Data Pipeline Express or sign up for a free trial of the other editions.
Java SDK Download Java Standard Edition 6 (or higher) SDK if you don't already have one.
JDBC Drivers Find a suitable JDBC driver if you plan on using a database.

Unpackage the Distribution

Extract NorthConcepts-DataPipeline-XXX.zip to a new folder. The jar tool works fine for this:

jar xf NorthConcepts-DataPipeline-4.4.0.zip.

Eclipse Users

The zip contains an Eclipse project (.project and .classpath files) that you can import to get started right away.

Non-Eclipse Users

Add NorthConcepts-DataPipeline-XXX.jar along with the other jars under the distribution's lib folder to your classpath.

Run Examples

The examples on this site can be found in the examples/src folder in Eclipse or in the distribution.

Additional Resources

In addition to this user guide, there are several places you can look for more information.

  • The examples section contain code you can copy and paste into your projects.
  • The Javadocs contain the detailed listing of the API.
  • You can contact us with any questions you may have.

How Data Pipeline Works

Data Pipeline is easy to understand and use if you already have experience with the java.io classes.

While Java's I/O classes work with bytes and characters, Data Pipeline works with records.

Records

Each record is a mutable data structure containing zero or more fields. Each field has a name, type, and value.

Field Names

Fields have a default name that are assigned automatically when not explicitly set. The first field in a record is named A, the second is B, the 26th is Z, the 27th is AA, then AB, AC, and so on. If a field with the default name already exists, then a number is added to the name (starting with 2) until a unique name is found.

Field Values

Field values can contain:

  • Single values (like a string, integer, boolean, or date)
  • Byte arrays
  • Any Java object
  • Other records
  • Arrays containing any combination of the above (including other arrays)

Records, fields, and values all extend a base Node class allowing you to work with tabular data (Excel, CSV, JDBC) and hierarchical data (JSON, XML) using the same API.

Since each record contains its own set of fields, it's possible for each record in a stream to contain a completely (or somewhat) different set of fields.

Field Types

Field types can be any one of the values defined in the FieldType enum. If Data Pipeline cannot match a field's value to one of the enum values — for example your Customer class — it will set the type as FieldType.UNDEFINED.

Data Readers and Writers

Data Pipeline is built on two very simple concepts.

  • DataReaders — to stream records into the pipeline from your source formats and locations.
  • DataWriters — to stream records out of the pipeline to your target formats and locations.

Readers and writers are available for many formats which you can read about in detail (see Data Pipeline formats).

Stream Operators

Once data is flowing into the pipeline, Stream Operators provide the ability to filter, enrich, transform, and summarize data in-flight.

Stream Operators are not a true third concept. Instead, they're a special kind of reader or writer that operates on other readers and writers. They use the decorator pattern to wrap other readers and writers to intercept (and possibly alter) records going into them or coming out from them.

This allows stream operators to transform and process data in-flight, in a common way, regardless of the data's original source or ultimate target. This also allows simple operators to be chained together to form complex processing sequences.

Running Jobs

Data Pipeline transfers records from readers to writers using jobs. The chain of readers and writers in a job is called a pipeline. You might find the terms job and pipeline used interchangeably.

As with the Java I/O classes, you can program the transfer yourself (open streams, read-write records in a loop, close streams), however the Job class offers many benefits:

  1. It can be monitored and managed using JMX (Java Management Extensions)
  2. It can be run synchronously or asynchronously
  3. It can be cancelled, paused, and resumed
  4. It's thread-safe
  5. It's state can safely be logged at any time or on completion
  6. It publishes lifecycle events to the system event bus
  7. It has built-in error handling
  8. It supports callbacks
  9. It tracks stats about itself

Synchronous Jobs

The easiest way to run a job is synchronously. You instantiate a Job instance, passing in the DataReader and DataWriter, and call run().

The run method will execute in the current thread, causing it to block until the job is complete.

The old way of running jobs with the JobTemplate — JobTemplate.DEFAULT.transfer(reader, writer); — still works. Behind the scenes, JobTemplate now uses the new Job class to receive all the benefits mentioned above.

Asynchronous Jobs

If you don't want the current thread to block or you need to run several jobs in parallel, Data Pipeline gives you several options. You can always create a new thread and call job.run() in the thread's run method, but here are a few nicer ways.

RunAsync

The easiest option is just to call runAsync() instead of run().

Thread

The second option is to use your own thread. Since Job implements Runnable, it can be passed to a Thread's constructor to be executed concurrently.

ExecutorService

The third option is to use an ExecutorService. This approach gives you the most control over how (and how many) jobs are executed in parallel.

Waiting for Jobs to Start and Finish

Regardless of how you chose to execute your job, you can always choose to block until the job has started or finished. This can come in handy when coordinating several jobs.

Both the start and finish wait method allows you to specify a maximum time to wait.

Managing Running Jobs

Once a job has started, Data Pipeline gives you the ability to either cancel it outright or pause it temporarily and resume it later.

Cancel

Running jobs can be cancelled by calling the cancel() method. It's important to remember that cancelling a job does not guarantee that it will stop right away. Depending on what the job is doing, it may take a noticeable time before the job stops.

Pause & Resume

Running jobs can be temporarily paused by calling the pause() method. It's important to remember that cancelling a job does not guarantee that it will stop right away. Depending on what the job is doing, it may take a noticeable time before the job stops.

Monitoring Jobs

Data Pipeline provides you with several ways to monitor your jobs. These range from simply calling methods on the Job class all the way up to sophisticated, enterprise monitoring tools.

Directly Access Job Properties

As you've already seen, jobs allow you to retrieve information on their state at any time — even when they're not running.

Here's the list of data each job directly exposes via its getter methods.

Method Description
getId() The unique (sequential) ID of this job (within this JVM/classloader).
getUuid() The universally unique identifier (UUID) for this job.
getName() The name assigned to this job or its default name if one was not assigned.
getCreatedOn() The time this job instance was created.
getStartedOn() The time this job instance started running
getCancelledOn() The time this job instance was cancelled.
isCancelled() Indicates if this job was cancelled
getPausedOn() The time this job was paused or null if this job is not currently paused
isPaused() Indicates if this job is currently paused
getFinishedOn() The time this job finished running
isFinished() Indicates if this job has finished running
getException() The last exception thrown while running
isFailed() Indicates if this job has completed unsuccessfully.
getRunThread() The thread currently executing this job or null if the job is not running.
isRunning() Indicates if this job is currently running
getRunningTime() Time in milliseconds this job was (or has been) running for or zero (0) if it hasn't yet started running.
getRunningTimeAsString() The time this job was (or has been) running for as a human readable string or null if it hasn't yet started running.
getRecordsTransferred() The number of records sent from the supplied reader to the supplied writer.
getLastRecord() The most recent record seen by this job while it is running or null.
getReader() The reader supplied to this job.
getWriter() The writer supplied to this job.
getReaders() The read pipeline — the chain of readers records pass through, starting with the root instance and ending with the reader supplied to this job.
getWriters() The write pipeline — the chain of writers records pass through, starting with the writer supplied to this job and ending with the root instance.

Callback Hooks

Each job allows you to specify one object to receive notifications of progress and completion. Since the methods of the callback object are used directly on the execution path, the more you do in them the slower your job will run.

Job Lifecycle Listener

In addition to callback hooks, Data Pipeline publishes job lifecycle events on the system event bus. These events let you know when jobs start running, finish running, are paused, or are resumed.

The Job class has addJobLifecycleListener and removeJobLifecycleListener convenience methods for you to register JobLifecycleListenerinstances.

Logging

A fourth way of monitoring your jobs is to provide a DataWriter to the job where it can record its stats once the job finishes. However, you can request the job log its current stats any time by calling the log() method. Calling log too often will cause your job to run slow since the job momentarily stops running to take a snapshot of its entire state.

Here's an example of the data sent to the log writer once the job finishes.

Fields Description
0 The timestamp for this log entry.
1-15 Field names beginning with "job_" are job-level stats and will be the same for all records logged a the same time.
16-24 Field names beginning with "endpoint_" are reader/writer-level stats. One record will be emitted for each reader/writer in this job's pipeline.
The first record corresponds to the first reader in the pipeline chain and the last record corresponds to the last writer in the chain.
Field 16 (endpoint_index) identifies the reader or writer's position in the pipeline.
Field's 17 and 18 (endpoint_simple_type and endpoint_qualified_type) indicate the reader/writer class for that step of the pipeline.

JMX Monitoring

The final way to monitor your jobs is using JMX (Java Management Extensions).

Data Pipeline's JMX integration is disabled by default, but you can enable it by add a call to DataEndpoint.enableJmx(); somewhere early in your code (like in a static initializer).

Once JMX has been enabled, Data Pipeline will automatically register your running jobs with the platform MBean server.

Using a JMX client like VisualVM, you'll be able to see and manage your running jobs, as well each reader and writer that make up their pipeline.

Data Formats

Read about data formats built into Data Pipeline here: Data Formats.

Filtering Data

See filter examples here: Filter Examples.

Data Validation

See data validation examples here: Data Validation Examples.

Transforming Data

See data transformation examples here: Data Transformation Examples.

Conditional Transformations

See conditional transformation example here: Conditional Transformation Example.

Lookup & Joins

See lookup examples here: Lookup Examples.

Data Conversions

See data conversions examples here: Data Conversions.

Data Aggregation

Data Pipeline provides a reader that lets you perform SQL-like "group by" operations on any dataset.

The GroupByReader class performs these aggregate operations on-the-fly — without a database — while the data is flowing.

Aggregate operations can be applied to data coming from flat files (CSV, XML, JSON), streaming data, Java Beans, APIs, or any format Data Pipeline supports, including your own custom readers.

Simple Aggregation

The simplest example of aggregation is to summarize an entire stream. This summary acts like a SQL group-by.

The following example accepts the input records from the CSV file and outputs one record for each unique combination of year and month, having calculated the count, sum, avg, min, and max for each combination.

The output looks something like this.

Aggregate Operators

Operator Class Convenience Method Description
Count GroupCount GroupByReader.count(String) The number of records in each group.
Sum GroupSum GroupByReader.sum(String, String) The sum of the argument in each group.
Avg GroupAverage GroupByReader.avg(String, String) The average of the argument in each group.
Max GroupMaximum GroupByReader.max(String, String) The maximum value in each group.
Min GroupMinimum GroupByReader.min(String, String) The minimum value in each group.
First GroupFirst GroupByReader.first(String, String) The first value seen in each group.
Last GroupLast GroupByReader.first(String, String) The last value seen in each group.

* all operators are in the com.northconcepts.datapipeline.group.operation package.

Sliding Window Aggregation

Data Pipeline uses a concept called sliding window aggregation to summarize streaming data inside GroupByReader. Sliding windows collect input data while open and emit summary data once closed.

Windows are opened (created) and closed based on configurable strategies that rely on record count, time, record content, or a combination of strategies.

You've already seen the default strategy that uses a single window for the entire dataset. However, you'll have to change strategies if you're dealing with continuous, streaming data or just want to summarize data in batches.

Batch/Adjacent Windows

The first configuration described here allows you to obtain regular summaries from fixed numbers of records. This effectively simulates the output you receive in batch-only environments.

When GroupByReader starts it perform the following actions:

  1. Opens single window until its accepted a set number of records
  2. Closes the window and emits the summary data to the next step in the pipeline
  3. Opens a new window and starts collecting again

The following example uses a create strategy that opens a new window only if one isn't already open (line 8). The close strategy caps the window to 50 records at most.

The example also turns on the debug flag to log when windows are opened and closed.

While it's possible for a group operator (like sum or max) to hold onto the actual records while its window is open, none of the built-in operators work that way. This allow windows to be fairly memory-cheap, which is important as you'll see in the next section.

Overlapping Windows

Another way to configure your windows is to have them overlap each other. You may want overlapping windows in order to track things like moving averages or to help recognize related events that can span adjacent windows.

Depending on your configuration, the number of overlapping windows can grow fairly large, that's why it's important that group operators attempt to minimize the amount of memory they consume.

The following example uses a create strategy that opens a new window every 10 records and a close strategy to cap them to 40 records.

Sparse/Sample Windows

Sliding windows don't have to overlap or even be adjacent to one other. If you only need a sample of data at set periods, you can create a configuration where the close strategy is shorter than the open strategy.

The following example uses a create strategy that opens every 40 records, but closes after just 10 records are collected.

Time-Based Windows

The previous examples all used record count as the basis for their window strategies. However, Data Pipeline also provides several built-in strategies for working with time based windows.

You can create adjacent windows that stay open for 10 minutes.

You can also mix strategies and create windows that open every hour until they've collected 100,000 records. Depending on how long it takes to collect that many records, you may end up with some overlapping windows and some sparse windows.

If you wanted to limit the hourly windows to 100,000 records or 15 minutes — whichever came first — you can combine a record strategy with a time strategy using the or() operator function.

Built-in Window Strategies

The following built-in strategies are used by GroupByReader to determine when it should open a new window.

Create Window Strategy Description
CreateWindowStrategy.limitOpened(int) Opens a new window if there are less currently open than the specified number.
CreateWindowStrategy.recordPeriod(int) Opens a new window at set record count intervals or on the first record.
CreateWindowStrategy.startInterval(long) Opens a new window at set time intervals.
CreateWindowStrategy.or(CreateWindowStrategy...) Opens a new window if any of the supplied strategies request it.
CreateWindowStrategy.and(CreateWindowStrategy...) Opens a new window if all of the supplied strategies request it.

The following built-in strategies are used by GroupByReader to determine when it should close open window.

Close Window Strategy Description
CloseWindowStrategy.never() Never closes open windows.
CloseWindowStrategy.limitedTime(long) Closes windows after they've been open for the specified time.
CloseWindowStrategy.limitedTime(String, long) Closes windows after the specified datetime field has increased by a set amount of time since it was first seen. This is useful for aggregating hourly logs by their timestamp field.
CloseWindowStrate@gy.limitedRecords(long) Closes the window after the specified amount of records has been collected.
CloseWindowStrategy.scheduled(Scheduler) Closes windows on a set clock schedule. For example, every 30 minutes on the hour and half hour.
CloseWindowStrategy.scheduled(String, Scheduler) Closes windows on a set clock schedule for the specified datetime field. For example, every 30 minutes at 15 and 45 past the hour. This is useful for aggregating hourly logs at set times by their timestamp field.
CloseWindowStrategy.hourlyScheduleInMinutes(int...) Closes windows at fixed minutes past each hour (for example: 0, 15, 30, and 45 minutes past each hour).
CloseWindowStrategy.hourlyScheduleInMinutes(String, int...) Closes windows at fixed minutes past each hour for the specified datetime field (for example: 0, 15, 30, and 45 minutes past each hour).
CloseWindowStrategy.or(CloseWindowStrategy...) Closes windows if any of the supplied strategies request it.
CloseWindowStrategy.and(CloseWindowStrategy...) Closes windows if all of the supplied strategies request it.

Custom Window Strategies

In addition to the built-in strategies, you can create your own by implementing com.northconcepts.datapipeline.group.CreateWindowStrategy and com.northconcepts.datapipeline.group.CloseWindowStrategy.

Below you can see the implementation for the limitOpened() method used before. It simple compares the number of opened windows to the specified value.

The never() method has a similarly simple implementation — it just always returns false.

Custom Aggregate Operators

Creating custom aggregate operators is a bit more complicated than custom window strategies, but not by much.

Group operators are actually two classes working together com.northconcepts.datapipeline.group.GroupOperation and com.northconcepts.datapipeline.group.GroupOperationField. Think of GroupOperation (sum, min, max, avg) as the factory a for creating instantiates of GroupOperationField for each field in each window.

For example, when you code groupByReader.sum("price").sum("qty");, each open window uses the sum GroupOperation (called GroupSum) to create two instances of GroupOperationField. One for each field the window needs to sum.

From there on, each GroupOperationField receives every record passing through its window while the window is open (GroupOperationField.apply(Record, Field)).

When the window closes, each GroupOperationField emits its summary data when GroupOperationField.getValue() is called.

With that explanation, here's how the sum operator is implemented.

Detail and Summarize in a Single Pipeline

This section describes a scenario where you have a single input file (or stream) and wish to perform multiple streams of work with it in parallel.

The following example takes an input CSV stream and converts the detail records to XML in one job, while using the GroupByReader to summarize it in another job.

Both jobs run asynchronously, but the splitting happens in the main thread. If you need the splitting to run in a separate thread, either call its runAsync() method or pass it as a Runnable to a new Thread or ExecutorService.

You can read more about the DeMux splitter class in the multi-threading section Reading One-to-Many Asynchronously.

Expression Language

Read about the dynamic expression language built into Data Pipeline here: Expression Language.

Multi-threaded Processing

Data Pipeline runs each job in a single thread by default. The Job class uses its thread to pull data from readers and push it to writers one record at a time .

Each record flowing through the pipeline goes from step to step until it's either written out or discarded. Only after a record is finished traveling through the pipeline does the next record get read in to start the journey again. This idea called one-piece flow (or single-piece flow) processing is a principle of lean manufacturing.

You've already seen how individual jobs can be made to run synchronously (blocking the main thread) or asynchronously (in their own thread). This section will show you how to increase your record flow by running parts of a job in parallel, in their own threads — regardless of how the job itself is executed.

Asynchronous Reading

One approach to increase record throughput is to decouple the reading of data from the rest of the pipeline.

If you're working with a particularly slow data source, it might not make sense to stop reading from it while the rest of the pipeline executes.

For this reason Data Pipeline provides an AsyncReader that eagerly reads data from other readers into a buffer using a separate thread.

Downstream requests for data can be satisfied directly from the buffer without blocking (waiting). Blocking only occurs if the buffer is empty when a downstream request for data is made.

This example eagerly buffers up to 1 MB of data from a CSV file in parallel to the pipeline.

Since AsyncReader can read from any other DataReader, you can place if anywhere in a pipeline you feel will benefit from parallel processing.

This example adds a second thread to ensure the filter step happens in parallel with the CSV reading and the rest of the pipeline.

Asynchronous Writing

If instead of a slow source you're working with a slow target, Data Pipeline provides an AsyncWriter that uses a separate thread to continuously write data from its buffer to any downstream DataWriter.

Data written to the AsyncWriter is placed in the buffer, allowing the pipeline to continue with the next record without waiting for the previous record to be written. The only time the pipeline must wait is when the buffer is full.

The following example uses an AsyncWriter with a 1000 record buffer to write to a StreamWriter in a separate thread. The StreamWriter here can be replaced with a JdbcWriter or any other writer.

Reading Many-to-One Asynchronously

Perhaps instead of a single slow datasource, you have several slow datasources that produce complementary data.

In this scenario, you can use the AsyncMultiReader class to read from all sources concurrently. This reader uses a separate thread to load each upstream source into its buffer. Like the AsyncReader, downstream requests won't have to wait when pulling from the buffer unless it's empty.

This example reads from two CSV files in parallel and merges their data into a union to be written out downstream. Each CSVReader can be replaced with other readers, including filters, transformers, and aggregators.

Reading One-to-Many Asynchronously

Turning an input stream coming from a single DataReader into several is accomplished using the DeMux class. DeMux is a special case because it's neither a reader or writer, it acts almost like a job, taking data from a reader and sending to the buffers of downstream readers.

The constructor for DeMux accepts a strategy that's responsible for what the downstream readers receive. The built-in implementations are DeMux.Strategy.BROADCAST

DeMux.Strategy.BROADCASTSends a copy of each record to every downstream reader.
DeMux.Strategy.ROUND_ROBINSends each record to a single downstream reader by cycling through the downstream readers one-at-a-time.

You can see a code example of DeMux in action in the data aggregation section on how to detail and summarize in a single pipeline.

Writing One-to-Many Asynchronously

Data Pipeline has a few ways to do one-to-many writer pipelining.

The first approach is not asynchronous at all. It uses MultiWriter to write to two or more DataWriters sequentially, in the same thread.

You can make this pipeline asynchronous by wrapping the MultiWriter in an AsyncWriter.

Another way to make this pipeline asynchronous is to wrap the writers passed to MultiWriter with AsyncWriter.

Event Bus

Data Pipeline's EventBus class is an delivery service for applications wishing to publish and subscribe to events. This service handles event registration, queuing, and delivery and also takes care of failures, monitoring, and graceful shut down.

The following sections first describe the event bus as a general application service and then show how to use it as part of your pipelines.

How the Event Bus Works

The event bus is general enough to be used in any application, not just those employing Data Pipeline's readers and writers. For example, you can use the event bus in your Swing, SWT, or Android apps. You can also use it in your web apps to send real-time updates to web users.

EventListener Interface

One big difference between this event bus and other implementations of the observer pattern is that it uses the same interface for both publishing and subscribing to events.

If an event publisher calls userAdded(User), that same method will be called sometime later (using a different thread) on all the event subscribers. There's no need to a create separate UserEvent class or call a notifyUserAdded(UserEvent) method. Just call a method on your interface and have it propagate automatically.

The only requirement is that your listener must be a Java interface for the event bus to use it.

Starting and Stopping a Bus

Starting and stopping an event bus is simple. Just create one and it's started. To stop a bus, call its shutdown() method and it terminates gracefully, allowing queued events up to 30 seconds to be delivered.

You can change the shutdown timeout by calling EventBus.setShutdownTimeout(long). Once the timeout expires, the bus is forcibly terminated and no more events will be delivered.

Using The System Event Bus

Data Pipeline has a system event bus it uses to deliver lifecycle events for jobs and event buses. You can use the system event bus in your apps instead of creating new ones. Just remember not to shut it down or lifecycle events will stop being delivered and JMX monitoring will stop working.

Publish and Subscribe to Events

The first step in publishing events is to create your own listener interface.

The next step is to add subscribers to the bus using your new listener interface (lines 4 & 12). finally, you then create a publisher you can call when you need to send a notification (line 20).

Accessing the Current Event

Behind the scenes, the event bus wraps up your method call in an event object. This object is what gets queued and delivered to be executed against your subscribers. You can retrieve this object at any time during event delivery using its thread-local accessor EventBus.getCurrentEvent().

Connecting Pipelines to an Event Bus

While the event bus is general enough to be used in any application, it comes with built-in connectors for reading and writing record to a bus.

EventBusWriter

The EventBusWriter allows you to send records from any pipeline to an event bus. When constructing an EventBusWriter, you need to supply it with the event bus you are targeting, plus a topic. The topic can be a string, enum or other object and is used by the bus to distinguish one stream of records from another.

EventBusReader

The EventBusReader pulls records from an event bus into your pipeline. It's constructor takes the source event bus, plus zero or more topics to read from. If the topic is null or missing, the reader will read all records regardless of their topic.

One advantage of using an event bus is that you can connect any number of readers to a single topic. You can also connect any number of writers or the same or different topics. In many cases, and event bus can be a more flexible option to DeMux-based multi-threading.

Event Bus Pipelines

The event bus treats data as a flowing stream. Unlike its bigger, Enterprise Service Bus (ESB) cousin, the event bus does not persist records. If your reader isn't connected to the bus when the event is published, it will not be able to see it later. That's why it's important to connect your readers to the bus prior to sending records from a writer.

When records are published to the bus, the writer send it immediately to the bus which queues it in its buffer or blocks the writer if the buffer is full. On the delivery side, the bus puts records into the reader's queue, unless the reader is full in which case the bus will block.

This example uses a single writer to send records to the bus using the purchases topic (lines 23-26). Two separate readers then read from purchases topic and write to their individual targets (lines 11-13 and 16-19).

Monitoring Event Buses

Like job monitoring, event buses can also be monitored in several ways.

Directly Access Event Bus Properties

Event buses can be monitored by directly accessing information on their state at any time.

Here's the list of data each event bus exposes via its getter methods.

Method Description
getId() The unique (sequential) ID of this bus (within this JVM/classloader).
getUuid() The universally unique identifier (UUID) for this bus.
getName() The name assigned to this bus or its default name if one was not assigned.
getState() The current state of this bus (ALIVE, SHUTTING_DOWN, TERMINATED).
isAlive() Indicates if this bus has not been shutdown.
getCreatedOn() The time this job instance was created.
getStartedOn() The time this job instance started running
isDebug() Indicates if the bus should emit debug log messages when events are published or delivered and during the shutdown process.
getShutdownTimeout() The time in milliseconds this bus will wait for events to complete before forcibly shutting down (defaults to 30 seconds).
getShutdownTimeoutAsString() The time as human readable string this bus will wait for events to complete before forcibly shutting down (defaults to 30 seconds).
getPublishersCreated() The number of event publishers created by this bus.
getEventsPublished() The number of events sent to this event bus for delivery to listeners.
getEventsDelivered() The combined number of events each listener on this event bus has received.
getEventsActive() The approximate number of threads that are actively delivering events using this bus' executorService or -1 if unknown.
getEventsQueued() The number of events waiting to be executed by this bus' executorService or -1 if unknown.
getTaskCount() The approximate total number of tasks that have ever been scheduled for execution by this bus' executorService or -1 if unknown.
getCompletedTaskCount() The approximate total number of tasks that have completed execution by this bus' executorService or -1 if unknown.
getCorePoolSize() The core number of threads in this bus' executorService or -1 if unknown.
getLargestPoolSize() The largest number of threads that have ever simultaneously been in the pool in this bus' executorService or -1 if unknown.
getMaximumPoolSize() The maximum allowed number of threads in this bus' executorService or -1 if unknown.
getPoolSize() The current number of threads in this bus' executorService pool or -1 if unknown.
getErrorCount() The number of exceptions this bus has experienced.
getTypedListenerCount() The number of type-specific listeners subscribed to this bus.
getUntypedListenerCount() The number of non-type-specific listeners subscribed to this bus.
getListenerCount() The total number of listeners subscribed to this bus.

Event Bus Lifecycle Listener

Similar to the life cycle events provided by each job, event buses also publish their life cycle events on the system bus. These events let you know when new event buses are created and when existing buses start shutting down. No event get sent when a bus finishes shutting down, your code will have to treat the shutting down event as a successful termination.

The EventBus class has addEventBusLifecycleListener and removeEventBusLifecycleListener convenience methods for you to register EventBusLifecycleListenerinstances.

Watching All Records on a Bus

A third way to monitor the bus is to monitor the records passing through the bus.

There are a couple ways to do this. The first way is to create a reader with no topic.

The second way is to register a listener on the bus. This option is arguably better since it bypasses the EventBusReader's buffer.

Here is the complete example.

Watching All Events on a Bus

A fourth way of monitoring a bus is to watch all events using the UntypedEventListener. This option captures all events, including non-record events and lifecycle events for jobs and event buses (when monitoring the system bus).

Here's the complete example using this approach.

Here's some of the output you'd see running this example. You can see it captures both the lifecycle events and the different record events for all topics.

Monitoring Event Buses with JMX

The final approach to monitoring is to use JMX (Java Management Extensions). With this option, you can plug in your enterprise monitoring tools (or a JMX client like VisualVM) to track event buses and jobs alike.

To enable monitoring, just add a call to DataEndpoint.enableJmx(); somewhere early in your code (like in a static initializer).

Handling Exceptions

Exceptions occurring on the event bus are treated like any other event.

You simply register an instance of ExceptionListener using EventBus.addExceptionListener(EventFilter, ExceptionListener) and wait for exceptions to occur.

Here's what the output from above would look like.

Meter and Throttle

See metering and throttling examples here: Meter Examples and Throttle Examples.

Debugging

See debugging examples here: Debugging Examples.

Error Handling

See error handling examples here: Error Handling Examples.

Creating Custom Components

See custom components examples here: Custom Components Examples.

Mobile Analytics