Today we’re pleased announce the release of Data Pipeline version 4.4. This update includes integration with Amazon S3, new features to better handle real-time data and aggregation, and new XML and JSON readers to speed up your development.
Amazon S3 Streaming Uploads & Downloads
We’ve introduced a new AmazonS3FileSystem
class to help you interact with remote files stored on S3. The class lets you read and write files with minimal memory overhead without first saving them to disk. It also let’s you browse your buckets, folders, and files.
Read a File from Amazon S3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import java.io.InputStream; import java.io.InputStreamReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.NullWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.filesystem.amazons3.AmazonS3FileSystem; import com.northconcepts.datapipeline.job.Job; public class ReadFromAmazonS3 { private static final String ACCESS_KEY = "YOUR ACCESS KEY"; private static final String SECRET_KEY = "YOUR SECRET KEY"; public static void main(String[] args) throws Throwable { AmazonS3FileSystem s3 = new AmazonS3FileSystem(); s3.setBasicAWSCredentials(ACCESS_KEY, SECRET_KEY); s3.open(); try { InputStream inputStream = s3.readFile("datapipeline-bucket", "output/trades.csv"); DataReader reader = new CSVReader(new InputStreamReader(inputStream)); // DataWriter writer = StreamWriter.newSystemOutWriter(); DataWriter writer = new NullWriter(); Job.run(reader, writer); System.out.println("Records read: " + writer.getRecordCount()); } finally { s3.close(); } } } |
Write a File to Amazon S3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import java.io.File; import java.io.OutputStream; import java.io.OutputStreamWriter; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.csv.CSVReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.filesystem.amazons3.AmazonS3FileSystem; import com.northconcepts.datapipeline.job.Job; public class WriteToAmazonS3UsingMultipartStreaming { private static final String ACCESS_KEY = "YOUR ACCESS KEY"; private static final String SECRET_KEY = "YOUR SECRET KEY"; public static void main(String[] args) throws Throwable { AmazonS3FileSystem s3 = new AmazonS3FileSystem(); s3.setBasicAWSCredentials(ACCESS_KEY, SECRET_KEY); // s3.setDebug(true); s3.open(); try { // Create AWS S3 streaming, multi-part OutputStream OutputStream outputStream = s3.writeMultipartFile("datapipeline-bucket", "output/trades.csv"); DataReader reader = new CSVReader(new File("example/data/input/trades.csv")) .setFieldNamesInFirstRow(true); DataWriter writer = new CSVWriter(new OutputStreamWriter(outputStream, "utf-8")) .setFieldNamesInFirstRow(true); Job.run(reader, writer); System.out.println("Done."); } finally { s3.close(); } } } |
JSON Record Reader
Our existing JSON reader requires that you identify the fields ahead of time to add to your records. But what if you want all the fields or don’t know what the fields are ahead of time? The new JsonRecordReader
is here to help. You no longer need to specify any fields, just one or more record breaks and Data Pipeline will collect everything it finds into hierarchical records.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.json.JsonRecordReader; public class ReadJsonRecordsFromFile { public static void main(String[] args) { DataReader reader = new JsonRecordReader(new File("example/data/output/simple-json-to-file.json")) .addRecordBreak("/array/object"); DataWriter writer = StreamWriter.newSystemOutWriter(); Job.run(reader, writer); } } |
XML Record Reader
Like the new JSON record reader, XML also has a similar addition. If you need to collect all subbranches of an XML tree into records without specifying each field, the new XmlRecordReader
is exactly what you need.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.xml.XmlRecordReader; public class ReadXmlRecordsFromFile { public static void main(String[] args) { DataReader reader = new XmlRecordReader(new File("example/data/output/simple-xml-to-file.xml")) .addRecordBreak("/records/record"); DataWriter writer = StreamWriter.newSystemOutWriter(); Job.run(reader, writer); } } |
Buffered Reader
When dealing with real-time data, you sometimes need to collect out-of-order records together. In a shipping system, you may need to process packages arriving on the same truck together, even if packages from multiple trucks are mixed together. The new BufferedReader
class uses a configurable strategy to determine when to release incoming records in each of its buffers downstream.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.buffer.BufferStrategy; import com.northconcepts.datapipeline.buffer.BufferedReader; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.job.Job; public class BufferRecordsByTimePeriodOrCount { private static final int MAX_TRUCKS = 10; private static final long MAX_PACKAGES = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakePackageReader(MAX_TRUCKS, MAX_PACKAGES, RECORD_DELAY_MILLISECONDS); // group records by "truck_id" and release all records for each "truck_id" downstream every 10 seconds or // when 10 records for that "truck_id" have been collected // and limit the internal buffer size to 100 records reader = new BufferedReader(reader, 100, "truck_id") .setBufferStrategy(BufferStrategy.limitedTimeFromOpenOrLimitedRecords(TimeUnit.SECONDS.toMillis(10), 10)) ; DataWriter writer = StreamWriter.newSystemOutWriter(); Job job = Job.run(reader, writer); System.out.println(job.getRecordsTransferred() + " - " + job.getRunningTimeAsString()); } //================================================== public static class FakePackageReader extends DataReader { private final int maxTrucks; private final long maxPackages; private long nextPackageId; private final long recordDelay; public FakePackageReader(int maxTrucks, long maxPackages, long recordDelay) { this.maxTrucks = maxTrucks; this.maxPackages = maxPackages; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextPackageId >= maxPackages) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("package_id", nextPackageId++); record.setField("truck_id", "truck" + nextPackageId % maxTrucks); record.setField("amount", nextPackageId + 0.01); return record; } } } |
Group By Reader
If you need to summarize streaming data, GroupByReader
can now produce output at a different rate than incoming data. The CreateWindowStrategy
and CloseWindowStrategy
classes can now be used together to produce sliding windows that release summarized data even when no new records are available. They can also be used to create complex windows based on a combination of factors including time and record counts.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.StreamWriter; import com.northconcepts.datapipeline.group.CloseWindowStrategy; import com.northconcepts.datapipeline.group.GroupByReader; import com.northconcepts.datapipeline.job.Job; public class GroupRecordsByTimePeriodOrCount { private static final int MAX_ORDERS = 10; private static final long MAX_TRANSACTIONS = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS); // group records by "order_id" and release them every 10 seconds even if no new records are received reader = new GroupByReader(reader, "order_id") .count("count_of_transactions") .sum("amount") .setCloseWindowStrategy(CloseWindowStrategy.limitedTime(TimeUnit.SECONDS.toMillis(10))) ; DataWriter writer = StreamWriter.newSystemOutWriter(); Job job = Job.run(reader, writer); System.out.println(job.getRecordsTransferred() + " - " + job.getRunningTimeAsString()); } //================================================== public static class FakeMessageQueueReader extends DataReader { private final int maxOrders; private final long maxTransactions; private long nextTransactionId; private final long recordDelay; public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) { this.maxOrders = maxOrders; this.maxTransactions = maxTransactions; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextTransactionId >= maxTransactions) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("transaction_id", nextTransactionId++); record.setField("order_id", "order" + nextTransactionId % maxOrders); record.setField("amount", nextTransactionId + 0.01); return record; } } } |
Bigrams, Trigrams, Ngrams
Data Pipeline has a new transformer for extracting every n-sequence of words in a body of text. The new Ngrams
class allows you to extract bigrams, trigrams, and other ngrams from any field in a data stream. The following example finds the top 25 three-word sequences found in the headlines of several Canadian news websites.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URL; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.LimitReader; import com.northconcepts.datapipeline.core.SequenceReader; import com.northconcepts.datapipeline.core.SortingReader; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.group.GroupByReader; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.transform.BasicFieldTransformer; import com.northconcepts.datapipeline.transform.Ngrams; import com.northconcepts.datapipeline.transform.TransformingReader; import com.northconcepts.datapipeline.xml.XmlRecordReader; public class ExtractBigramsTrigramsAndNgrams { private static final int NGRAMS = 3; // bigram: 2; trigrams: 3; quadrigrams: 4; private static final int TOP_PHRASES = 25; private static final String[] URLS = { "https://rss.cbc.ca/lineup/topstories.xml", "https://rss.cbc.ca/lineup/world.xml", "https://rss.cbc.ca/lineup/canada.xml", "https://rss.cbc.ca/lineup/politics.xml", "https://rss.cbc.ca/lineup/business.xml", "https://rss.cbc.ca/lineup/health.xml", "https://rss.cbc.ca/lineup/arts.xml", "https://rss.cbc.ca/lineup/technology.xml", "https://rss.cbc.ca/lineup/offbeat.xml", "https://www.cbc.ca/cmlink/rss-cbcaboriginal", "https://globalnews.ca/feed/", "https://globalnews.ca/canada/feed/", "https://globalnews.ca/world/feed/", "https://globalnews.ca/politics/feed/", "https://globalnews.ca/money/feed/", "https://globalnews.ca/health/feed/", "https://globalnews.ca/entertainment/feed/", "https://globalnews.ca/environment/feed/", "https://globalnews.ca/tech/feed/", "https://globalnews.ca/sports/feed/", "https://www.ctvnews.ca/rss/ctvnews-ca-top-stories-public-rss-1.822009", "https://www.ctvnews.ca/rss/ctvnews-ca-canada-public-rss-1.822284", "https://www.ctvnews.ca/rss/ctvnews-ca-world-public-rss-1.822289", "https://www.ctvnews.ca/rss/ctvnews-ca-entertainment-public-rss-1.822292", "https://www.ctvnews.ca/rss/ctvnews-ca-politics-public-rss-1.822302", "https://www.ctvnews.ca/rss/lifestyle/ctv-news-lifestyle-1.3407722", "https://www.ctvnews.ca/rss/business/ctv-news-business-headlines-1.867648", "https://www.ctvnews.ca/rss/ctvnews-ca-sci-tech-public-rss-1.822295", "https://www.ctvnews.ca/rss/sports/ctv-news-sports-1.3407726", "https://www.ctvnews.ca/rss/ctvnews-ca-health-public-rss-1.822299", "https://www.ctvnews.ca/rss/autos/ctv-news-autos-1.867636", }; public static void main(String[] args) throws Throwable { SequenceReader sequenceReader = new SequenceReader(); for (String url : URLS) { BufferedReader input = new BufferedReader(new InputStreamReader(new URL(url).openStream(), "UTF-8")); sequenceReader.add(new XmlRecordReader(input).addRecordBreak("/rss/channel/item")); } DataReader reader = sequenceReader; reader = new TransformingReader(reader) .add(new BasicFieldTransformer("title").lowerCase()) .add(new Ngrams("title", "phrase", NGRAMS)); reader = new GroupByReader(reader, "phrase") .setExcludeNulls(true) .count("count", true); reader = new SortingReader(reader).desc("count").asc("phrase"); reader = new LimitReader(reader, TOP_PHRASES); DataWriter writer = new CSVWriter(new OutputStreamWriter(System.out)) .setFieldNamesInFirstRow(true); Job.run(reader, writer); } } |
Multiple Field Support
The BasicFieldTransformer
, FieldTransformer
, and FieldFilter
classes can now accept more than one field name. You no longer have to duplicate lines of code to perform the same operations or filters on multiple fields, just pass them all into the constructors.
And More
Version 4.4 also includes improvements in the areas of:
- Multi-threading
- Debugging & diagnostic
- The expression language
- Job management and cancellation
- BigInteger and BigDecimal support
- Twitter data readers
- XPath expression handling
See the changelog for the complete list of updates.