Write a Sequence of Files by Elapsed Time

Updated: Sep 19, 2023

This example splits a dataset into manageable files based on a specified elapsed time interval. This can be applied to log management, data export/import, and batch processing scenarios.

You can view Write a Sequence of Files by Record Count example to change the sequence strategy to data writing by record count.

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.core.SequenceWriter;
import com.northconcepts.datapipeline.core.SequenceWriter.DataWriterFactory;
import com.northconcepts.datapipeline.core.SequenceWriter.ElapsedTimeSequenceStrategy;
import com.northconcepts.datapipeline.core.SequenceWriter.Sequence;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.job.Job;

public class WriteASequenceOfFilesByElapsedTime {

    private static final int MAX_ORDERS = 10;
    private static final long MAX_TRANSACTIONS = 200;
    private static final int RECORD_DELAY_MILLISECONDS = 250;

    public static void main(String[] args) {
        
        DataReader reader = new FakeMessageQueueReader(MAX_ORDERS, MAX_TRANSACTIONS, RECORD_DELAY_MILLISECONDS);
        
        DataWriter writer = new SequenceWriter(new ElapsedTimeSequenceStrategy(5000), new CsvDataWriterFactory());
        
        Job.run(reader, writer);
    }

    //==================================================
    
    public static class CsvDataWriterFactory implements DataWriterFactory {
        
        @Override
        public DataWriter createDataWriter(Sequence sequence, Record record) {
            return new CSVWriter(new File("example/data/output", "sequence-" + sequence.getIndex() + ".csv"));
        }
    }

    //==================================================
    
    public static class FakeMessageQueueReader extends DataReader {
        
        private final int maxOrders;
        private final long maxTransactions;
        private long nextTransactionId;
        private final long recordDelay;
        
        public FakeMessageQueueReader(int maxOrders, long maxTransactions, long recordDelay) {
            this.maxOrders = maxOrders;
            this.maxTransactions = maxTransactions;
            this.recordDelay = recordDelay;
        }
        
        @Override
        protected Record readImpl() throws Throwable {
            if (nextTransactionId >= maxTransactions) {
                return null;
            }
            
            if (recordDelay > 0) {
                Thread.sleep(recordDelay);
            }
            
            Record record = new Record();
            record.setField("transaction_id", nextTransactionId++);
            record.setField("order_id", "order" + nextTransactionId % maxOrders);
            record.setField("amount", nextTransactionId + 0.01);
            return record;
        }
        
    }

}

 

Code Walkthrough

  1. First, FakeMessageQueueReader instance is created with three arguments: MAX_ORDERSMAX_TRANSACTIONSRECORD_DELAY_MILLISECONDS.
  2. SequenceWriter is created to write to the sequence of data writers created by CsvDataWriterFactory in turn and rolled based on an ISequenceStrategy policy (i.e. ElapsedTimeSequenceStrategy(5000)). Each writer is closed when the specified elapsed time (5 seconds) passes and a new one is created by the factory.
  3. Job.run() method is used to transfer the data from reader to writer. See how to compile and run data pipeline jobs.

 

CsvDataWriterFactory

CsvDataWriterFactory implements DataWriterFactory interface. Its createDataWriter() method returns a new CsvWriter for each sequence. It means that when the specified ISequenceStrategy is achieved, the transfer of data is continued on a new CSV file.

 

FakeMessageQueueReader

FakeMessageQueueReader class extends from DataReaderAccording to its overridden method readImpl(), Record instances with "transaction_id", "order_id" and "amount" fields are returned.

 

Output

10 CSV files (sequence-0.csv, sequence-1.csv, sequence-2.csv, etc.) with average 20 records in each are created. 

The first file sequence-0.csv starts with the following data:

transaction_id,order_id,amount
0,order1,1.01
1,order2,2.01
2,order3,3.01
3,order4,4.01
4,order5,5.01
5,order6,6.01
6,order7,7.01
7,order8,8.01
8,order9,9.01
...
Mobile Analytics