Measure Data being Read and Written

Updated: Feb 21, 2022

This example demonstrates how to measure the rate at which data is read and written using the MeteredReader and MeteredWriter classes.

The demo code reads data from one CSV file, writes it to a different CSV file, and logs the rate for reading and writing separately. This process can be applied to other endpoints by replacing either the CSVReader, CSVWriter, or both with different endpoint classes

There are other examples which demo how to obtain other statistics like record count, etc.

Input CSV file

Account,LastName,FirstName,Balance,CreditLimit,AccountCreated,Rating
101,Reeves,Keanu,9315.45,10000.00,1/17/1998,A
312,Butler,Gerard,90.00,1000.00,8/6/2003,B
868,Hewitt,Jennifer Love,0,17000.00,5/25/1985,B
761,Pinkett-Smith,Jada,49654.87,100000.00,12/5/2006,A
317,Murray,Bill,789.65,5000.00,2/5/2007,C

Java Code listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.io.File;

import org.apache.log4j.Logger;

import com.northconcepts.datapipeline.core.DataEndpoint;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.csv.CSVReader;
import com.northconcepts.datapipeline.csv.CSVWriter;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.meter.MeteredReader;
import com.northconcepts.datapipeline.meter.MeteredWriter;

public class MeasureDataBeingReadAndWritten {
    
    public static final Logger log = DataEndpoint.log; 

    public static void main(String[] args) throws Throwable {
        DataReader reader = new CSVReader(new File("example/data/input/credit-balance-01.csv"))
            .setFieldNamesInFirstRow(true);
        
        DataWriter writer = new CSVWriter(new File("example/data/output/credit-balance-04.csv"))
            .setFieldNamesInFirstRow(true);

        MeteredReader meteredReader = new MeteredReader(reader);
        MeteredWriter meteredWriter = new MeteredWriter(writer);

        Job job = Job.run(meteredReader, meteredWriter);
        
        log.info("read rate: " + meteredReader.getMeter().getUnitsPerSecondAsString());
        log.info("write rate: " + meteredWriter.getMeter().getUnitsPerSecondAsString());
        log.info("records transferred: " + job.getRecordsTransferred());
        log.info("running time: " + job.getRunningTimeAsString());
    }

}

Code walkthrough

  1. A CSVReader is constructed to read from the credit-balance-01.csv file.
  2. A CSVWriter is constructed to write to the credit-balance-04.csv file.
  3. The reader and writer are then wrapped by MeteredReader and MeteredWriter respectively.
  4. The transfer is then run using JobTemplate.DEFAULT.transfer on the wrapped, metered endpoints.
  5. The read and write transfer rates are logged.

Metered Interface

The MeteredReader and MeteredWriter classes implement the Metered interface. This interface defines a method called getMeter which returns a Meter object. Meter is a generic counter that has several methods for recording various statistics related to data transfer.

RecordMeter

Both MeteredReader and MeteredWriter override the getMeter method to return an instance of RecordMeter which is a sub-class of Meter. RecordMeter provides additional functionality on top of Meter by defining a nested class called MeterUnit which defines 2 units of measurement: BYTES and RECORDS.

The default unit of measurement is BYTES but MeteredReader and MeteredWriter can override this by calling RecordMeter.setMeasure prior to the transfer. The actual record counting (or measurement) happens inside RecordMeter.add(Record) based on the units selected.

Displaying transfer speeds

Meter has a method called getUnitsPerSecondAsString which is overridden in RecordMeter. This method returns the actual data transfer rate using the specified units (bytes or records) and an appropriate multiplier prefix ("kilo-" in this case).

Console output

16:41:24,090 DEBUG [main] datapipeline:60 - job::Start
16:41:24,141 DEBUG [main] datapipeline:72 - job::Success
16:41:24,152  INFO [main] datapipeline:39 - read rate: 10.2 kilo-bytes/s
16:41:24,155  INFO [main] datapipeline:40 - write rate: 11.0 kilo-bytes/s
Mobile Analytics