We’re excited to introduce Data Pipeline version 4.1, the second update on our 2016 roadmap.
This release features MongoDB integration, expression language additions, and improved transformations and joins. We’ve also thrown in a ton of examples for all the new 4.1 and 4.0 features. Enjoy.
The changelog has the complete list of update. Here are the highlights.
Version 4.1 – Features and Changes
1. MongoDB
Data Pipeline now includes endpoints for reading and writing to MongoDB. You can read/write to a local instance with a single line of code or pass in a MongoClient
(similar to a JDBC connection) when working with clusters.
1 2 3 |
DataReader reader = new MongoReader("database", "collection"); DataWriter writer = new CSVWriter(File("purchases.csv")); Job.run(reader, writer); |
2. Joins/Lookup
The CachedLookup class add maxEntries
to cap the number of elements cached and resetSchedule
to indicate when the cache should be completely cleared.
3. More Examples
The examples page includes more code and categories, covering new and existing features, to help you get started faster. Examples are now linked directly from Data Formats and Operators so no more hunting around.
4. Transformations
The new TransformingWriter class now allows transformations to be added during the writing phase of any transfer. This is especially useful when used with a TeeReader to tap a flowing pipeline for a secondary workflow without incurring the overhead and memory involved with a new thread and data buffer.
5. Transformation Failure
Both TransformingReader and TransformingWriter contain a discardWriter
and discardReasonFieldName
to receive failing records without stopping the pipeline (similar to filters and validators).
6. Expression Language
The dynamic expression language now fully supports methods with variable arguments. Things can get a bit tricky since it determines which overloaded function to call at runtime using each argument’s actual type (multimethods) — unlike Java which does so at compile using their declared types. But don’t worry, you won’t even notice since it just works.
The built-in, dynamic expression language also adds a slew of new functions, including aliases for everything in the java.lang.Math package. This lets you reference any of the aliased functions in your expressions without their package name.
1 2 3 4 |
reader = new TransformingReader(reader) .add(new SetCalculatedField("UserFollowingFollowerRatio", "coalesce(toDouble(UserFollowingCount) / nullif(UserFollowersCount, 0), 0.0)")) .add(new SetCalculatedField("UserTweetFavoriteRatio", "coalesce(toDouble(UserTweets) / nullif(UserFavoritesCount,0), 0.0)")) ; |
Of course, you can continue to reference any static method using its fully-qualified name.
https://northconcepts.com/docs/expression-language/#BuiltInFunctions
7. Collections
The new CollectionsWriter class allows you to copy all values passing through the pipeline for a field into any Java collection.
1 2 3 4 5 |
LinkedHashSet<String> keywords = new LinkedHashSet<String>(); DataWriter writer = new CollectionWriter().collect("keyword", keywords); writer = new MultiWriter(writer, new StreamWriter(System.out)); Job.run(reader, writer); |
8. Binary Records
Much like the record class’ to/fromJson
and to/fromXml
methods, the new to/fromBinary
methods allow records to be directly converted to byte arrays or streams for transmission or persistence.
9. JDBC Upserts
The JdbcUpsertWriter includes a new nonUpdateFields property to identify fields that are written once on creation and never updated (like create time or ID fields).
10. Jobs
The Jobs class adds new, convenience, static methods to execute synchronous and asynchronous transfers in a single call. They include overloaded versions that accept callbacks and of course both return their Job instance.
Job example (reading from Gmail)
11. Simple JSON and XML
Two new readers are added — SimpleJsonReader
and SimpleXmlReader
— to mirror the existing SimpleJsonWriter and SimpleXmlWriter classes.
- Write a simple JSON file example
- Read a simple JSON file example
- Write a simple XML file example
- Read a simple XML file example
12. Pretty XML
The programmatic XML writer, XmlWriter
, now includes a pretty property to force formatting (indentation and line breaks) of the resulting XML stream.
13. Binary Logs
The FileWriter class can now append to previously written files, making it ideal for writing compact journal and audit logs.
Version 4.0 – Features and Changes
1. Email/Gmail
Data Pipeline’s new EmailReader let’s you ingest email content and attachments directly from IMAP and POP3 servers (including Gmail).
2. Jobs
Pipeline execution and management is now handled by a new Job class. The existing JobTemplate
now delegates to this new implementation.
The new Job class provides you with the ability to:
- Run pipelines synchronously or asynchronously
- Obtain stats during and after job execution
- Pause, resume, and cancel jobs
- Log job execution state in a consistent manner to any DataWriter
- Hook a callback into the main execution path
- Listen for lifecycle events
- Monitor and manage execution using JMX
3. Event Bus
Pipelines can now be decoupled using a new one-to-many, in-memory event bus.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
final String PURCHASES_TOPIC = "purchases"; EventBus bus = new EventBus().setName("App1 Bus"); // read bus, write purchases to console reader = new EventBusReader(bus, PURCHASES_TOPIC); writer = new StreamWriter(System.out); new Job(reader, writer).runAsync(); // read bus, write purchases to file reader = new EventBusReader(bus, PURCHASES_TOPIC); writer = new SimpleXmlWriter(new File("example/data/output/purchases.xml")) .setPretty(true); new Job(reader, writer).runAsync(); // write purchases to bus reader = new CSVReader(new File("example/data/input/purchases.csv")) .setFieldNamesInFirstRow(true); writer = new EventBusWriter(bus, PURCHASES_TOPIC); new Job(reader, writer).run(); bus.shutdown(); |
The event bus is general enough to be used outside of pipelines as a replacement for the observer pattern. See the event bus section of the user guide for details.
4. JMX Monitoring & Management
All jobs and event buses automatically register as JMX beans once monitoring is enabled: DataEndpoint.
. See JMX monitoring in the user guide for more details.
5. Users Guide
The new user guide provides an in-depth alternative to the quick code examples and Javadocs.
6. Filter and Validation
The FilteringReader and ValidatingReader now include a discardDataWriter and discardReasonFieldName to accept records that are either filtered out or don’t pass validation.
7. Pretty XML
XML and JSON streams written using SimpleJsonWriter and SimpleXmlWriter can now be formatted using the setPretty(boolean)
flag.
8. Conditional Transformations
The TransformingReader.filter
property has been deprecated and replaced with the new TransformingReader.condition property to clarify its intent to make the transformations conditionally applicable.