Write a Sequence of Files by Elapsed Time
This example splits a dataset into manageable files based on a specified elapsed time interval. This can be applied to log management, data export/import, and batch processing scenarios.
You can view Write a Sequence of Files by Record Count example to change the sequence strategy to data writing by record count.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.SequenceWriter; import com.northconcepts.datapipeline.core.SequenceWriter.DataWriterFactory; import com.northconcepts.datapipeline.core.SequenceWriter.ElapsedTimeSequenceStrategy; import com.northconcepts.datapipeline.core.SequenceWriter.Sequence; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.job.Job; public class WriteASequenceOfFilesByElapsedTime { private static final int MAX_ORDERS = 10; private static final long MAX_TRANSACTIONS = 200; private static final int RECORD_DELAY_MILLISECONDS = 250; public static void main(String[] args) { DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS); DataWriter writer = new SequenceWriter(new ElapsedTimeSequenceStrategy(5000), new CsvDataWriterFactory()); Job.run(reader, writer); } //================================================== public static class CsvDataWriterFactory implements DataWriterFactory { @Override public DataWriter createDataWriter(Sequence sequence, Record record) { return new CSVWriter(new File("example/data/output", "sequence-" + sequence.getIndex() + ".csv")); } } //================================================== public static class FakeMessageQueueReader extends DataReader { private final int maxOrders; private final long maxTransactions; private long nextTransactionId; private final long recordDelay; public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) { this.maxOrders = maxOrders; this.maxTransactions = maxTransactions; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextTransactionId >= maxTransactions) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("transaction_id", nextTransactionId++); record.setField("order_id", "order" + nextTransactionId % maxOrders); record.setField("amount", nextTransactionId + 0.01); return record; } } }
Code Walkthrough
- First, FakeMessageQueueReader instance is created with three arguments:
MAX_ORDERS
,MAX_TRANSACTIONS
,RECORD_DELAY_MILLISECONDS
. - SequenceWriter is created to write to the sequence of data writers created by CsvDataWriterFactory in turn and rolled based on an ISequenceStrategy policy (i.e. ElapsedTimeSequenceStrategy(5000)). Each writer is closed when the specified elapsed time (5 seconds) passes and a new one is created by the factory.
- Job.run() method is used to transfer the data from
reader
towriter
. See how to compile and run data pipeline jobs.
CsvDataWriterFactory
CsvDataWriterFactory implements DataWriterFactory interface. Its createDataWriter()
method returns a new CsvWriter for each sequence. It means that when the specified ISequenceStrategy is achieved, the transfer of data is continued on a new CSV file.
FakeMessageQueueReader
FakeMessageQueueReader class extends from DataReader. According to its overridden method readImpl()
, Record instances with "transaction_id", "order_id" and "amount" fields are returned.
Output
10 CSV files (sequence-0.csv, sequence-1.csv, sequence-2.csv, etc.) with average 20 records in each are created.
The first file sequence-0.csv
starts with the following data:
transaction_id,order_id,amount 0,order1,1.01 1,order2,2.01 2,order3,3.01 3,order4,4.01 4,order5,5.01 5,order6,6.01 6,order7,7.01 7,order8,8.01 8,order9,9.01 ...