Log Job Progress
This example keeps track of the progress and status of data processing tasks within a pipeline. It records relevant information, such as the number of records processed, errors encountered, and time taken for each step, to help monitor and optimize the data processing workflow.
This can be used in data integration tasks, data migration processes, and batch data processing jobs. It provides valuable insights into the performance and efficiency of the pipeline, enabling you to identify bottlenecks, troubleshoot issues, and improve data processing productivity. Additionally, it offers an audit trail for data processing, enhancing data governance and compliance in data-driven projects.
Java Code Listing
package com.northconcepts.datapipeline.examples.cookbook; import java.util.Date; import java.util.concurrent.TimeUnit; import com.northconcepts.datapipeline.core.DataException; import com.northconcepts.datapipeline.core.DataReader; import com.northconcepts.datapipeline.core.DataWriter; import com.northconcepts.datapipeline.core.NullWriter; import com.northconcepts.datapipeline.core.Record; import com.northconcepts.datapipeline.internal.lang.Util; import com.northconcepts.datapipeline.job.Job; import com.northconcepts.datapipeline.job.JobCallback; public class LogJobProgress { private static final long RUNNING_TIME = TimeUnit.SECONDS.toMillis(30); private static final long LOG_INTERVAL = TimeUnit.SECONDS.toMillis(5); public static void main(String[] args) { DataReader reader = new SlowReader("orange", RUNNING_TIME, 50L, false); DataWriter writer = new NullWriter(); JobCallbackjobCallback = new JobCallback (){ private long nextLogTimeMillis; @Override public void onProgress(DataReader reader, DataWriter writer, Record currentRecord) throws Throwable { if (System.currentTimeMillis() > nextLogTimeMillis) { nextLogTimeMillis = System.currentTimeMillis() + LOG_INTERVAL; System.out.println("Records: " + reader.getRecordCount()); } } @Override public void onSuccess(DataReader reader, DataWriter writer) throws Throwable { System.out.println("DONE"); } @Override public void onFailure(DataReader reader, DataWriter writer, Record currentRecord, Throwable exception) throws Throwable { System.out.println("FAILED: " + exception); } }; Job.run(reader, writer, jobCallback); } public static class SlowReader extends DataReader { private final String name; private final long elapsedTime; private final long recordDelay; private final boolean fail; public SlowReader(String name, long elapsedTime, long recordDelay, boolean fail) { this.name = name; this.elapsedTime = elapsedTime; this.recordDelay = recordDelay; this.fail = fail; } @Override protected Record readImpl() throws Throwable { if (getOpenElapsedTime() >= elapsedTime) { if (fail) { throw exception("forced to fail"); } return null; } Thread.sleep(recordDelay); Record record = new Record(); record.setField("id", getRecordCount() + 1); record.setField("time", new Date()); record.setField("name", name + "-" + getRecordCount()); record.setField("overflow", Util.repeat("-", 8 * 1024)); // overflow I/O buffers, hopefully return record; } @Override public DataException addExceptionProperties(DataException exception) { exception.set("SlowReader.name", name); exception.set("SlowReader.elapsedTime", elapsedTime); exception.set("SlowReader.recordDelay", recordDelay); exception.set("SlowReader.fail", fail); return super.addExceptionProperties(exception); } } }
Code Walkthrough
- First, RUNNING_TIME and LOG_INTERVAL variables are defined as constant class variables.
- Then, a custom SlowReader instance is created to generate and read records.
- NullWriter object is created to discard records.
- Next, JobCallback interface is implemented to log job progress.
onProgress()
,onSuccess()
,onFailure()
methods are overridden, and in each method body, custom log messages are printed on the console. - Job.run() is used to transfer the data from
reader
towriter
while usingjobCallback
. See how to compile and run data pipeline jobs.
SlowReader
SlowReader is a custom DataReader class implemented particularly for this example. It has constant variables, a constructor, and two overridden methods from the parent class. Inside the readImpl()
method body, Record instances are returned with some time intervals.
Console Output
Records: 1 Records: 94 Records: 187 Records: 279 Records: 372 Records: 466 DONE