Updated: July 2021
Most examples of creating a Spring Batch ETL Job require an enormous amount of code for such a routine task. In this blog, I will show you how to accomplish the same task of summarizing a million stock trades to find the open, close, high, and low prices for each symbol using our Data Pipeline framework.
1. Read trades from a CSV file
The first step is to load the trading data from a CSV file.
1 2 |
DataReader reader = new CSVReader(new File("example/data/input/trades.csv")) .setFieldNamesInFirstRow(true); |
Despite what you may think, we aren’t loading all million records at the same time. Data Pipeline is a streaming framework, like java.io.InputStream
. Each call to DataReader.read()
fetches the next record from the file and sends it to the next step. This one-piece-flow approach means that you can read huge data streams with less memory overhead, as well as handle batch and streaming data using a single API.
CSV Reader
The first thing you’ll notice that’s different from the Spring Batch example is that CSVReader knows how to read column names from the stream. Unlike Spring, if your input file ever added new fields like date or if the positions of the columns changed, this code would not need to change.
Flexible Records
You may also notice that there is no custom Java bean required. The Record class built into Data Pipeline can handle hierarchical data (like XML and JSON), tabular data (like that found in CSV or Excel files), or any combination of the two. This saves you from having to create new classes to hold the data at each step in your pipeline. It also saves you from having to change those classes when your data stream changes.
2. Convert text fields
The second step is to convert the incoming price field to a double.
1 2 |
reader = new TransformingReader(reader) .add(new BasicFieldTransformer("price").stringToDouble()); |
This step might not be needed for other data sources (like Excel or JDBC). However, values coming from CSV files always enter into the pipeline as text.
3. Summarize price using aggregate operators
The third step applies the Data Pipeline version of a SQL group-by operator to group and summarizes the records by stock symbol.
1 2 3 4 5 |
reader = new GroupByReader(reader, "stock") .first("price", "open") .last("price", "close") .min("price", "low") .max("price", "high"); |
Like SQL, the resulting data stream will contain a unique row for each distinct value combination of the group-by fields passed to the constructor. The remaining fields will contain the result of the specified group operation (like Sum or Count).
For your convenience, Data Pipeline provides methods like first(), last(), min(), and max() to apply common group operations. However, you can add your own custom group operators as well as employ advanced features like sliding windows to the stream.
You can read more about data aggregation in the user guide.
4. Write results to a new CSV file
And finally, the last step in the pipeline is to write the results to a new CSV file.
1 |
DataWriter writer = new CSVWriter(new File("example/data/output/prices.csv")); |
Unlike the Spring Batch example, you don’t need to listen for job completion or write out the target format yourself, unless that’s your preference, in which case, go ahead.
5. Run the pipeline job
Once your pipeline is configured by chaining together readers and writers, you can simultaneously create and run your job using a convenience method on the Job class.
1 |
Job.run(reader, writer); |
The job class has many hidden features, like:
- multi-threading
- callbacks and listeners
- management functions: cancel, pause, resume
- JMX monitoring
- logging and progress monitoring
See the running jobs section of the user guide for more details.
Wrap-up
The Spring Framework is a great tool, but it’s not always the right tool for the job. In some cases, like with Spring Batch, you may be required to write a lot of code to supplement the lack of features in the framework. You may also run into software that breaks when small changes are introduced — like when a new field is added to the input file or a new arrangement of input fields is released.
I hope this example shows you another perspective on programming batch ETL jobs in Java. If you have simplicity in mind, you can do batch jobs with much less code.
Happy Coding!