Log Job Progress

Updated: Aug 10, 2023

This example keeps track of the progress and status of data processing tasks within a pipeline. It records relevant information, such as the number of records processed, errors encountered, and time taken for each step, to help monitor and optimize the data processing workflow.

This can be used in data integration tasks, data migration processes, and batch data processing jobs. It provides valuable insights into the performance and efficiency of the pipeline, enabling you to identify bottlenecks, troubleshoot issues, and improve data processing productivity. Additionally, it offers an audit trail for data processing, enhancing data governance and compliance in data-driven projects.

 

Java Code Listing

package com.northconcepts.datapipeline.examples.cookbook;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import com.northconcepts.datapipeline.core.DataException;
import com.northconcepts.datapipeline.core.DataReader;
import com.northconcepts.datapipeline.core.DataWriter;
import com.northconcepts.datapipeline.core.NullWriter;
import com.northconcepts.datapipeline.core.Record;
import com.northconcepts.datapipeline.internal.lang.Util;
import com.northconcepts.datapipeline.job.Job;
import com.northconcepts.datapipeline.job.JobCallback;

public class LogJobProgress {

    private static final long RUNNING_TIME = TimeUnit.SECONDS.toMillis(30);
    private static final long LOG_INTERVAL = TimeUnit.SECONDS.toMillis(5);

    public static void main(String[] args) {
        
        DataReader reader = new SlowReader("orange", RUNNING_TIME, 50L, false);
        DataWriter writer = new NullWriter();
        
        JobCallback jobCallback = new JobCallback(){
            
            private long nextLogTimeMillis;

            @Override
            public void onProgress(DataReader reader, DataWriter writer, Record currentRecord) throws Throwable {
                if (System.currentTimeMillis() > nextLogTimeMillis) {
                    nextLogTimeMillis = System.currentTimeMillis() + LOG_INTERVAL; 
                    System.out.println("Records: " + reader.getRecordCount());
                }
            }

            @Override
            public void onSuccess(DataReader reader, DataWriter writer) throws Throwable {
                System.out.println("DONE");
            }

            @Override
            public void onFailure(DataReader reader, DataWriter writer, Record currentRecord, Throwable exception) throws Throwable {
                System.out.println("FAILED: " + exception);
            }
            
        };
        
        Job.run(reader, writer, jobCallback);
        
    }

    public static class SlowReader extends DataReader {
        
        private final String name;
        private final long elapsedTime;
        private final long recordDelay;
        private final boolean fail;
        
        public SlowReader(String name, long elapsedTime, long recordDelay, boolean fail) {
            this.name = name;
            this.elapsedTime = elapsedTime;
            this.recordDelay = recordDelay;
            this.fail = fail;
        }
        
        @Override
        protected Record readImpl() throws Throwable {
            if (getOpenElapsedTime() >= elapsedTime) {
                if (fail) {
                    throw exception("forced to fail");
                }
                return null;
            }
            
            Thread.sleep(recordDelay);
            
            Record record = new Record();
            record.setField("id", getRecordCount() + 1);
            record.setField("time", new Date());
            record.setField("name", name + "-" + getRecordCount());
            record.setField("overflow", Util.repeat("-", 8 * 1024));  // overflow I/O buffers, hopefully
            
            return record;
        }
        
        @Override
        public DataException addExceptionProperties(DataException exception) {
            exception.set("SlowReader.name", name);
            exception.set("SlowReader.elapsedTime", elapsedTime);
            exception.set("SlowReader.recordDelay", recordDelay);
            exception.set("SlowReader.fail", fail);
            return super.addExceptionProperties(exception);
        }
        
    }
    
}

 

Code Walkthrough

  1. First, RUNNING_TIME and LOG_INTERVAL variables are defined as constant class variables.
  2. Then, a custom SlowReader instance is created to generate and read records.
  3. NullWriter object is created to discard records.
  4. Next, JobCallback interface is implemented to log job progress. onProgress(), onSuccess(), onFailure() methods are overridden, and in each method body, custom log messages are printed on the console.
  5. Job.run() is used to transfer the data from reader to writer while using jobCallback. See how to compile and run data pipeline jobs. 

 

SlowReader

SlowReader is a custom DataReader class implemented particularly for this example. It has constant variables, a constructor, and two overridden methods from the parent class. Inside the readImpl() method body, Record instances are returned with some time intervals.

 

Console Output

Records: 1
Records: 94
Records: 187
Records: 279
Records: 372
Records: 466
DONE
Mobile Analytics