Skip to content

Commit

Permalink
Fix for issue #711
Browse files Browse the repository at this point in the history
Fix for issue #711 caused by start time resetting in LoadRateCalculator.start() invocation when processing each row in a batch.
  • Loading branch information
ashitsalesforce committed Jul 3, 2023
1 parent e5afa7e commit e2742f6
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ protected boolean visit() throws DataAccessObjectException, ParameterLoadExcepti
if (daoRowList == null || daoRowList.size() == 0) return false;
int daoRowCount = 0;

getVisitor().initLoadRateCalculator();
for (final Row daoRow : daoRowList) {
if (!DAORowUtil.isValidRow(daoRow)) {
getVisitor().setRowConversionStatus(daoRowNumBase + daoRowCount++,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ protected int getWriteBatchSize() {
}

protected void startWriteExtraction(int size) {
getRateCalculator().start();
getRateCalculator().setNumRecords(size);
getRateCalculator().start(size);
// start the Progress Monitor
getProgressMonitor().beginTask(Messages.getMessage(getClass(), "extracting"), size); //$NON-NLS-1$
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ protected boolean isRowConversionSuccessful(int dataSourceRow) {
@Override
public boolean visit(Row row) throws OperationException, DataAccessObjectException,
ConnectionException {
initLoadRateCalculator();
if (controller.getConfig().getBoolean(Config.PROCESS_BULK_CACHE_DATA_FROM_DAO)
|| !controller.getConfig().getBoolean(Config.BULK_API_ENABLED)) {
// either batch mode or cache bulk data uploaded from DAO
Expand Down Expand Up @@ -224,9 +223,8 @@ protected boolean writeStatus() {
return true;
}

private void initLoadRateCalculator() throws DataAccessObjectException {
getRateCalculator().setNumRecords(((DataReader)getController().getDao()).getTotalRows());
getRateCalculator().start();
public void initLoadRateCalculator() throws DataAccessObjectException {
getRateCalculator().start(((DataReader)getController().getDao()).getTotalRows());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ public class LoadRateCalculator {
// TODO: we can probably move all references to this code into a base ProgressMonitor class
private Date startTime = null;
private long totalRecordsInJob = 0;
private boolean started = false;

public LoadRateCalculator() {}

public void start() {
this.startTime = new Date();
public LoadRateCalculator() {
// do nothing
}

public void setNumRecords(int numRecords) {
this.totalRecordsInJob = numRecords;
public synchronized void start(int numRecords) {
if (!started) {
started = true;
this.startTime = new Date();
this.totalRecordsInJob = numRecords;
}
}

public String calculateSubTask(long processedRecordsInJob, long numErrorsInJob) {
Expand All @@ -67,9 +70,10 @@ public String calculateSubTask(long processedRecordsInJob, long numErrorsInJob)
}

long remainingTimeInSec = 0;
long estimatedTotalTimeInSec = 0;
if (this.totalRecordsInJob > 0 && processedRecordsInJob > 0) {
// can estimate remaining time only if a few records are processed already.
long estimatedTotalTimeInSec = (long) (totalElapsedTimeInSec * this.totalRecordsInJob / processedRecordsInJob);
estimatedTotalTimeInSec = (long) (totalElapsedTimeInSec * this.totalRecordsInJob / processedRecordsInJob);
remainingTimeInSec = estimatedTotalTimeInSec - totalElapsedTimeInSec;
}

Expand All @@ -85,7 +89,7 @@ public String calculateSubTask(long processedRecordsInJob, long numErrorsInJob)
numSuccessInJob, // {2}
numErrorsInJob); // {3}
}
// LoadRateCalculator.processed=Processed {0} of {1} total records.
// LoadRateCalculator.processed=Processed {0} of {1} total records in {7} seconds.
// There are {5} successes and {6} errors. \nRate: {2} records per hour.
// Estimated time to complete: {3} minutes and {4} seconds.
return Messages.getMessage(getClass(), "processed",
Expand All @@ -95,6 +99,8 @@ public String calculateSubTask(long processedRecordsInJob, long numErrorsInJob)
remainingTimeInMinutes, // {3}
remainingSeconds, // {4}
numSuccessInJob, // {5}
numErrorsInJob); // {6}
numErrorsInJob, // {6}
totalElapsedTimeInSec // {7}
);
}
}
2 changes: 1 addition & 1 deletion src/main/resources/messages.properties
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ AbstractExtractAction.errorMissingFrom=Error in query: missing [ from ]: {0}
AbstractExtractAction.errorInvalidFieldName=Error in query: invalid field name [{0}] in query string: {1}
AbstractExtractAction.errorMalformedQuery=Error in query: Malformed query string: {0}
AbstractExtractAction.errorEmptyQuery=Error in query: query is empty
LoadRateCalculator.processed=Processed {0} of {1} total records. There are {5} successes and {6} errors. \nRate: {2} records per hour. Estimated time to complete: {3} minutes and {4} seconds.
LoadRateCalculator.processed=Processed {0} of {1} total records in {7} seconds. There are {5} successes and {6} errors. \nRate: {2} records per hour. Estimated time to complete: {3} minutes and {4} seconds.
LoadRateCalculator.processedTimeUnknown=Processed {0} of {1} total records. There are {2} successes and {3} errors.
Visitor.emptyRowIgnored=Item #: {0} will not be loaded due to the empty input data
Visitor.conversionException=Conversion Exception, writing to errors.csv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


public class LoadRateCalculatorTest {
@Test
public void testRateCalculatorZeroCompletion() {
LoadRateCalculator rateCalculator = new LoadRateCalculator();
rateCalculator.setNumRecords(3600);
rateCalculator.start();
rateCalculator.start(3600);
String message = rateCalculator.calculateSubTask(0, 0);
assertEquals("incorrect rate calculation: ",
"Processed 0 of 3,600 total records. There are 0 successes and 0 errors.",
Expand All @@ -46,8 +46,7 @@ public void testRateCalculatorZeroCompletion() {
@Test
public void testRateCalculatorFirstBatchCompletion() {
LoadRateCalculator rateCalculator = new LoadRateCalculator();
rateCalculator.setNumRecords(3600);
rateCalculator.start();
rateCalculator.start(3600);
String message = rateCalculator.calculateSubTask(0, 0);
try {
Thread.sleep(1000);
Expand All @@ -56,18 +55,14 @@ public void testRateCalculatorFirstBatchCompletion() {
e.printStackTrace();
}
message = rateCalculator.calculateSubTask(1, 0);
assertEquals("incorrect rate calculation: ",
"Processed 1 of 3,600 total records. "
+ "There are 1 successes and 0 errors. \nRate: 3,600 records per hour. "
+ "Estimated time to complete: 59 minutes and 59 seconds. ",
message);
boolean success = message.contains("There are 1 successes and 0 errors. \nRate: 3,600 records per hour.");
assertTrue("incorrect rate calculation: ", success);
}

@Test
public void testRateCalculatorSecondBatchCompletion() {
public void testRateCalculatorMultiBatchCompletion() {
LoadRateCalculator rateCalculator = new LoadRateCalculator();
rateCalculator.setNumRecords(3600);
rateCalculator.start();
rateCalculator.start(3600);
String message = rateCalculator.calculateSubTask(0, 0);
try {
Thread.sleep(1000);
Expand All @@ -76,25 +71,55 @@ public void testRateCalculatorSecondBatchCompletion() {
e.printStackTrace();
}
message = rateCalculator.calculateSubTask(1, 0);
rateCalculator.start(3600);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
message = rateCalculator.calculateSubTask(2, 1);
assertEquals("incorrect rate calculation: ",
"Processed 2 of 3,600 total records. "
+ "There are 1 successes and 1 errors. \nRate: 3,600 records per hour. "
+ "Estimated time to complete: 59 minutes and 58 seconds. ",
message);
boolean success = message.contains("Processed 2 of 3,600 total records in 2 seconds. There are 1 successes and 1 errors. \nRate: 3,600 records per hour.");
assertTrue("incorrect rate calculation: ", success);

rateCalculator.start(3600);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
message = rateCalculator.calculateSubTask(3, 1);
success = message.contains("Processed 3 of 3,600 total records in 3 seconds. There are 2 successes and 1 errors. \nRate: 3,600 records per hour.");
assertTrue("incorrect rate calculation: ", success);

rateCalculator.start(3600);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
message = rateCalculator.calculateSubTask(4, 1);
success = message.contains("Processed 4 of 3,600 total records in 4 seconds. There are 3 successes and 1 errors. \nRate: 3,600 records per hour.");
assertTrue("incorrect rate calculation: ", success);

rateCalculator.start(3600);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
message = rateCalculator.calculateSubTask(7, 2);
success = message.contains("Processed 7 of 3,600 total records in 7 seconds. There are 5 successes and 2 errors. \nRate: 3,600 records per hour.");
assertTrue("incorrect rate calculation: ", success);
}

@Test
public void testRateCalculatorLongTimeForCompletion() {
LoadRateCalculator rateCalculator = new LoadRateCalculator();
rateCalculator.setNumRecords(999999999);
rateCalculator.start();
rateCalculator.start(999999999);
String message = rateCalculator.calculateSubTask(0, 0);
try {
Thread.sleep(1000);
Expand Down

0 comments on commit e2742f6

Please sign in to comment.