Write a Sequence of Files by Record Count
This example divides an outgoing dataset into several files containing specified record counts. This can be used to break files into manageable chunks and ensure they can be loaded by downstream tools without running out of memory.
You can view Write a Sequence of Files by Elapsed Time example to change the sequence strategy to data writing by elapsed time.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.io.File; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.core.SequenceWriter; import com.northconcepts.datapipeline.core.SequenceWriter.DataWriterFactory; import com.northconcepts.datapipeline.core.SequenceWriter.RecordCountSequenceStrategy; import com.northconcepts.datapipeline.core.SequenceWriter.Sequence; import com.northconcepts.datapipeline.csv.CSVWriter; import com.northconcepts.datapipeline.job.Job; public class WriteASequenceOfFilesByRecordCount { private static final int MAX_ORDERS = 10; private static final long MAX_TRANSACTIONS = 200; private static final int RECORD_DELAY_MILLISECONDS = 0; public static void main(String[] args) { DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS); DataWriter writer = new SequenceWriter(new RecordCountSequenceStrategy(50), new CsvDataWriterFactory()); Job.run(reader, writer); } //================================================== public static class CsvDataWriterFactory implements DataWriterFactory { @Override public DataWriter createDataWriter(Sequence sequence, Record record) { return new CSVWriter(new File("example/data/output", "sequence-" + sequence.getIndex() + ".csv")); } } //================================================== public static class FakeMessageQueueReader extends DataReader { private final int maxOrders; private final long maxTransactions; private long nextTransactionId; private final long recordDelay; public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) { this.maxOrders = maxOrders; this.maxTransactions = maxTransactions; this.recordDelay = recordDelay; } @Override protected Record readImpl() throws Throwable { if (nextTransactionId >= maxTransactions) { return null; } if (recordDelay > 0) { Thread.sleep(recordDelay); } Record record = new Record(); record.setField("transaction_id", nextTransactionId++); record.setField("order_id", "order" + nextTransactionId % maxOrders); record.setField("amount", nextTransactionId + 0.01); return record; } } }
Code Walkthrough
- First, FakeMessageQueueReader instance is created with three arguments:
MAX_ORDERS
,MAX_TRANSACTIONS
,RECORD_DELAY_MILLISECONDS
. - SequenceWriter is created to write to the sequence of data writers created by CsvDataWriterFactory in turn and rolled based on an ISequenceStrategy policy (i.e. RecordCountSequenceStrategy(50)). Each writer is closed when the 50 records are written and a new one is created by the factory.
- Job.run() method is used to transfer the data from
reader
towriter
. See how to compile and run data pipeline jobs.
CsvDataWriterFactory
CsvDataWriterFactory implements DataWriterFactory interface. Its createDataWriter()
method returns a new CsvWriter for each sequence. It means that when the specified ISequenceStrategy is achieved, the transfer of data is continued on a new CSV file.
FakeMessageQueueReader
FakeMessageQueueReader class extends from DataReader. According to its overridden method readImpl()
, Record instances with "transaction_id", "order_id" and "amount" fields are returned.
Output
10 CSV files (sequence-0.csv, sequence-1.csv, sequence-2.csv, etc.) with 50 records in each are created.
The first file sequence-0.csv
starts with the following data:
transaction_id,order_id,amount 0,order1,1.01 1,order2,2.01 2,order3,3.01 3,order4,4.01 4,order5,5.01 5,order6,6.01 6,order7,7.01 7,order8,8.01 8,order9,9.01 ...