TSK-1504: Next scheduled Job now based on due date of job before

This commit is contained in:
Tim Gerversmann 2021-01-14 17:07:16 +01:00 committed by Mustapha Zorgati
parent 751930226b
commit 9906c0c7f9
9 changed files with 72 additions and 92 deletions

View File

@ -47,12 +47,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
private static final String TASKANA_JOB_HISTORY_BATCH_SIZE = "taskana.jobs.history.batchSize"; private static final String TASKANA_JOB_HISTORY_BATCH_SIZE = "taskana.jobs.history.batchSize";
private static final String TASKANA_JOB_HISTORY_CLEANUP_RUN_EVERY =
"taskana.jobs.history.cleanup.runEvery";
private static final String TASKANA_JOB_HISTORY_CLEANUP_FIRST_RUN =
"taskana.jobs.history.cleanup.firstRunAt";
private static final String TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE = private static final String TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE =
"taskana.jobs.history.cleanup.minimumAge"; "taskana.jobs.history.cleanup.minimumAge";
@ -61,8 +55,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
TaskanaHistoryEngineImpl taskanaHistoryEngine = TaskanaHistoryEngineImpl taskanaHistoryEngine =
TaskanaHistoryEngineImpl.createTaskanaEngine(taskanaEngineImpl); TaskanaHistoryEngineImpl.createTaskanaEngine(taskanaEngineImpl);
private Instant firstRun = Instant.parse("2018-01-01T00:00:00Z");
private Duration runEvery = Duration.parse("P1D");
private Duration minimumAge = Duration.parse("P14D"); private Duration minimumAge = Duration.parse("P14D");
private int batchSize = 100; private int batchSize = 100;
@ -268,20 +260,11 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
LOGGER.debug("Entry to scheduleNextCleanupJob."); LOGGER.debug("Entry to scheduleNextCleanupJob.");
ScheduledJob job = new ScheduledJob(); ScheduledJob job = new ScheduledJob();
job.setType(Type.HISTORYCLEANUPJOB); job.setType(Type.HISTORYCLEANUPJOB);
job.setDue(getNextDueForHistoryCleanupJob()); job.setDue(getNextDueForCleanupJob());
taskanaEngineImpl.getJobService().createJob(job); taskanaEngineImpl.getJobService().createJob(job);
LOGGER.debug("Exit from scheduleNextCleanupJob."); LOGGER.debug("Exit from scheduleNextCleanupJob.");
} }
private Instant getNextDueForHistoryCleanupJob() {
Instant nextRunAt = firstRun;
while (nextRunAt.isBefore(Instant.now())) {
nextRunAt = nextRunAt.plus(runEvery);
}
LOGGER.info("Scheduling next run of the HistoryCleanupJob for {}", nextRunAt);
return nextRunAt;
}
private void initJobParameters(Properties props) { private void initJobParameters(Properties props) {
String jobBatchSizeProperty = props.getProperty(TASKANA_JOB_HISTORY_BATCH_SIZE); String jobBatchSizeProperty = props.getProperty(TASKANA_JOB_HISTORY_BATCH_SIZE);
@ -296,33 +279,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
} }
} }
String historyCleanupJobFirstRunProperty =
props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_FIRST_RUN);
if (historyCleanupJobFirstRunProperty != null && !historyCleanupJobFirstRunProperty.isEmpty()) {
try {
firstRun = Instant.parse(historyCleanupJobFirstRunProperty);
} catch (Exception e) {
LOGGER.warn(
"Could not parse historyCleanupJobFirstRunProperty ({}). Using default."
+ " Exception: {} ",
historyCleanupJobFirstRunProperty,
e.getMessage());
}
}
String historyCleanupJobRunEveryProperty =
props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_RUN_EVERY);
if (historyCleanupJobRunEveryProperty != null && !historyCleanupJobRunEveryProperty.isEmpty()) {
try {
runEvery = Duration.parse(historyCleanupJobRunEveryProperty);
} catch (Exception e) {
LOGGER.warn(
"Could not parse historyCleanupJobRunEveryProperty ({}). Using default. Exception: {} ",
historyCleanupJobRunEveryProperty,
e.getMessage());
}
}
String historyEventCleanupJobMinimumAgeProperty = String historyEventCleanupJobMinimumAgeProperty =
props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE); props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE);
if (historyEventCleanupJobMinimumAgeProperty != null if (historyEventCleanupJobMinimumAgeProperty != null
@ -339,7 +295,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
} }
LOGGER.debug("Configured number of history events per transaction: {}", batchSize); LOGGER.debug("Configured number of history events per transaction: {}", batchSize);
LOGGER.debug("HistoryCleanupJob configuration: first run at {}", firstRun);
LOGGER.debug("HistoryCleanupJob configuration: runs every {}", runEvery); LOGGER.debug("HistoryCleanupJob configuration: runs every {}", runEvery);
LOGGER.debug( LOGGER.debug(
"HistoryCleanupJob configuration: minimum age of history events to be cleanup up is {}", "HistoryCleanupJob configuration: minimum age of history events to be cleanup up is {}",

View File

@ -386,7 +386,7 @@ class HistoryCleanupJobAccTest extends AbstractAccTest {
taskanaEngine.getJobService().createJob(job); taskanaEngine.getJobService().createJob(job);
} }
List<ScheduledJob> jobsToRun = getJobMapper().findJobsToRun(); List<ScheduledJob> jobsToRun = getJobMapper().findJobsToRun(Instant.now());
assertThat(jobsToRun).hasSize(30); assertThat(jobsToRun).hasSize(30);
@ -397,7 +397,7 @@ class HistoryCleanupJobAccTest extends AbstractAccTest {
HistoryCleanupJob.initializeSchedule(taskanaEngine); HistoryCleanupJob.initializeSchedule(taskanaEngine);
jobsToRun = getJobMapper().findJobsToRun(); jobsToRun = getJobMapper().findJobsToRun(Instant.now());
assertThat(jobsToRun).doesNotContainAnyElementsOf(historyCleanupJobs); assertThat(jobsToRun).doesNotContainAnyElementsOf(historyCleanupJobs);
} }

View File

@ -1,5 +1,6 @@
package pro.taskana.common.internal; package pro.taskana.common.internal;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Delete;
@ -38,7 +39,7 @@ public interface JobMapper {
@Select( @Select(
"<script> SELECT JOB_ID, PRIORITY, CREATED, DUE, STATE, LOCKED_BY, LOCK_EXPIRES, TYPE, RETRY_COUNT, ARGUMENTS " "<script> SELECT JOB_ID, PRIORITY, CREATED, DUE, STATE, LOCKED_BY, LOCK_EXPIRES, TYPE, RETRY_COUNT, ARGUMENTS "
+ "FROM SCHEDULED_JOB " + "FROM SCHEDULED_JOB "
+ "WHERE STATE IN ( 'READY') AND (DUE is null OR DUE &lt; CURRENT_TIMESTAMP) AND (LOCK_EXPIRES is null OR LOCK_EXPIRES &lt; CURRENT_TIMESTAMP) AND RETRY_COUNT > 0 " + "WHERE STATE IN ( 'READY') AND (DUE is null OR DUE &lt; #{now}) AND (LOCK_EXPIRES is null OR LOCK_EXPIRES &lt; CURRENT_TIMESTAMP) AND RETRY_COUNT > 0 "
+ "ORDER BY PRIORITY DESC " + "ORDER BY PRIORITY DESC "
+ "<if test=\"_databaseId == 'db2'\">with UR </if> " + "<if test=\"_databaseId == 'db2'\">with UR </if> "
+ "</script>") + "</script>")
@ -59,7 +60,7 @@ public interface JobMapper {
javaType = Map.class, javaType = Map.class,
typeHandler = MapTypeHandler.class) typeHandler = MapTypeHandler.class)
}) })
List<ScheduledJob> findJobsToRun(); List<ScheduledJob> findJobsToRun(Instant now);
@Update( @Update(
value = value =

View File

@ -73,7 +73,7 @@ public class JobServiceImpl implements JobService {
List<ScheduledJob> availableJobs; List<ScheduledJob> availableJobs;
try { try {
taskanaEngineImpl.openConnection(); taskanaEngineImpl.openConnection();
availableJobs = jobMapper.findJobsToRun(); availableJobs = jobMapper.findJobsToRun(Instant.now());
LOGGER.debug("Found available jobs: {}", availableJobs); LOGGER.debug("Found available jobs: {}", availableJobs);
} finally { } finally {
taskanaEngineImpl.returnConnection(); taskanaEngineImpl.returnConnection();

View File

@ -1,6 +1,8 @@
package pro.taskana.common.internal.jobs; package pro.taskana.common.internal.jobs;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -13,6 +15,7 @@ import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
/** Abstract base for all background jobs of TASKANA. */ /** Abstract base for all background jobs of TASKANA. */
public abstract class AbstractTaskanaJob implements TaskanaJob { public abstract class AbstractTaskanaJob implements TaskanaJob {
protected final Duration runEvery;
protected TaskanaEngineImpl taskanaEngineImpl; protected TaskanaEngineImpl taskanaEngineImpl;
protected TaskanaTransactionProvider<Object> txProvider; protected TaskanaTransactionProvider<Object> txProvider;
protected ScheduledJob scheduledJob; protected ScheduledJob scheduledJob;
@ -24,6 +27,7 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
this.taskanaEngineImpl = (TaskanaEngineImpl) taskanaEngine; this.taskanaEngineImpl = (TaskanaEngineImpl) taskanaEngine;
this.txProvider = txProvider; this.txProvider = txProvider;
this.scheduledJob = job; this.scheduledJob = job;
this.runEvery = taskanaEngineImpl.getConfiguration().getCleanupJobRunEvery();
} }
public static TaskanaJob createFromScheduledJob( public static TaskanaJob createFromScheduledJob(
@ -54,4 +58,17 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
} }
return result; return result;
} }
protected Instant getNextDueForCleanupJob() {
Instant nextRun = Instant.now();
if (scheduledJob != null && scheduledJob.getDue() != null) {
nextRun = scheduledJob.getDue();
}
while (nextRun.isBefore(Instant.now())) {
nextRun = nextRun.plus(runEvery);
}
return nextRun;
}
} }

View File

@ -33,8 +33,6 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
private static final SortDirection ASCENDING = SortDirection.ASCENDING; private static final SortDirection ASCENDING = SortDirection.ASCENDING;
// Parameter // Parameter
private final Instant firstRun;
private final Duration runEvery;
private final Duration minimumAge; private final Duration minimumAge;
private final int batchSize; private final int batchSize;
private final boolean allCompletedSameParentBusiness; private final boolean allCompletedSameParentBusiness;
@ -44,8 +42,6 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
TaskanaTransactionProvider<Object> txProvider, TaskanaTransactionProvider<Object> txProvider,
ScheduledJob scheduledJob) { ScheduledJob scheduledJob) {
super(taskanaEngine, txProvider, scheduledJob); super(taskanaEngine, txProvider, scheduledJob);
firstRun = taskanaEngine.getConfiguration().getCleanupJobFirstRun();
runEvery = taskanaEngine.getConfiguration().getCleanupJobRunEvery();
minimumAge = taskanaEngine.getConfiguration().getCleanupJobMinimumAge(); minimumAge = taskanaEngine.getConfiguration().getCleanupJobMinimumAge();
batchSize = taskanaEngine.getConfiguration().getMaxNumberOfUpdatesPerTransaction(); batchSize = taskanaEngine.getConfiguration().getMaxNumberOfUpdatesPerTransaction();
allCompletedSameParentBusiness = allCompletedSameParentBusiness =
@ -198,17 +194,8 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
LOGGER.debug("Entry to scheduleNextCleanupJob."); LOGGER.debug("Entry to scheduleNextCleanupJob.");
ScheduledJob job = new ScheduledJob(); ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.TASKCLEANUPJOB); job.setType(ScheduledJob.Type.TASKCLEANUPJOB);
job.setDue(getNextDueForTaskCleanupJob()); job.setDue(getNextDueForCleanupJob());
taskanaEngineImpl.getJobService().createJob(job); taskanaEngineImpl.getJobService().createJob(job);
LOGGER.debug("Exit from scheduleNextCleanupJob."); LOGGER.debug("Exit from scheduleNextCleanupJob.");
} }
private Instant getNextDueForTaskCleanupJob() {
Instant nextRunAt = firstRun;
while (nextRunAt.isBefore(Instant.now())) {
nextRunAt = nextRunAt.plus(runEvery);
}
LOGGER.info("Scheduling next run of the TaskCleanupJob for {}", nextRunAt);
return nextRunAt;
}
} }

View File

@ -1,7 +1,5 @@
package pro.taskana.workbasket.internal.jobs; package pro.taskana.workbasket.internal.jobs;
import java.time.Duration;
import java.time.Instant;
import java.util.List; import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -29,8 +27,6 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkbasketCleanupJob.class); private static final Logger LOGGER = LoggerFactory.getLogger(WorkbasketCleanupJob.class);
// Parameter // Parameter
private final Instant firstRun;
private final Duration runEvery;
private final int batchSize; private final int batchSize;
public WorkbasketCleanupJob( public WorkbasketCleanupJob(
@ -38,8 +34,6 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
TaskanaTransactionProvider<Object> txProvider, TaskanaTransactionProvider<Object> txProvider,
ScheduledJob job) { ScheduledJob job) {
super(taskanaEngine, txProvider, job); super(taskanaEngine, txProvider, job);
firstRun = taskanaEngine.getConfiguration().getCleanupJobFirstRun();
runEvery = taskanaEngine.getConfiguration().getCleanupJobRunEvery();
batchSize = taskanaEngine.getConfiguration().getMaxNumberOfUpdatesPerTransaction(); batchSize = taskanaEngine.getConfiguration().getMaxNumberOfUpdatesPerTransaction();
} }
@ -128,17 +122,8 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
LOGGER.debug("Entry to scheduleNextCleanupJob."); LOGGER.debug("Entry to scheduleNextCleanupJob.");
ScheduledJob job = new ScheduledJob(); ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.WORKBASKETCLEANUPJOB); job.setType(ScheduledJob.Type.WORKBASKETCLEANUPJOB);
job.setDue(getNextDueForWorkbasketCleanupJob()); job.setDue(getNextDueForCleanupJob());
taskanaEngineImpl.getJobService().createJob(job); taskanaEngineImpl.getJobService().createJob(job);
LOGGER.debug("Exit from scheduleNextCleanupJob."); LOGGER.debug("Exit from scheduleNextCleanupJob.");
} }
private Instant getNextDueForWorkbasketCleanupJob() {
Instant nextRunAt = firstRun;
while (nextRunAt.isBefore(Instant.now())) {
nextRunAt = nextRunAt.plus(runEvery);
}
LOGGER.info("Scheduling next run of the WorkbasketCleanupJob for {}", nextRunAt);
return nextRunAt;
}
} }

View File

@ -4,6 +4,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import acceptance.AbstractAccTest; import acceptance.AbstractAccTest;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
@ -19,6 +21,9 @@ import org.junit.jupiter.api.function.ThrowingConsumer;
import pro.taskana.common.api.ScheduledJob; import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.ScheduledJob.Type; import pro.taskana.common.api.ScheduledJob.Type;
import pro.taskana.common.internal.JobMapper;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.jobs.JobRunner;
import pro.taskana.common.test.security.JaasExtension; import pro.taskana.common.test.security.JaasExtension;
import pro.taskana.common.test.security.WithAccessId; import pro.taskana.common.test.security.WithAccessId;
import pro.taskana.task.api.TaskService; import pro.taskana.task.api.TaskService;
@ -43,9 +48,9 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
@WithAccessId(user = "admin") @WithAccessId(user = "admin")
@Test @Test
void should_CleanCompletedTasksUntilDate() throws Exception { void should_CleanCompletedTasksUntilDate() throws Exception {
String id = createAndInsertTask(null); String taskId = createAndInsertTask(null);
taskService.claim(id); taskService.claim(taskId);
taskService.completeTask(id); taskService.completeTask(taskId);
long totalTasksCount = taskService.createTaskQuery().count(); long totalTasksCount = taskService.createTaskQuery().count();
assertThat(totalTasksCount).isEqualTo(88); assertThat(totalTasksCount).isEqualTo(88);
@ -88,14 +93,14 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
@WithAccessId(user = "admin") @WithAccessId(user = "admin")
@Test @Test
void shouldNotCleanCompleteTasksAfterDefinedDay() throws Exception { void shouldNotCleanCompleteTasksAfterDefinedDay() throws Exception {
String id = createAndInsertTask(null); String taskId = createAndInsertTask(null);
taskService.claim(id); taskService.claim(taskId);
taskService.completeTask(id); taskService.completeTask(taskId);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null); TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
job.run(); job.run();
Task completedCreatedTask = taskService.getTask(id); Task completedCreatedTask = taskService.getTask(taskId);
assertThat(completedCreatedTask).isNotNull(); assertThat(completedCreatedTask).isNotNull();
} }
@ -113,7 +118,7 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
taskanaEngine.getJobService().createJob(job); taskanaEngine.getJobService().createJob(job);
} }
List<ScheduledJob> jobsToRun = getJobMapper().findJobsToRun(); List<ScheduledJob> jobsToRun = getJobMapper().findJobsToRun(Instant.now());
assertThat(jobsToRun).hasSize(30); assertThat(jobsToRun).hasSize(30);
@ -124,7 +129,7 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
TaskCleanupJob.initializeSchedule(taskanaEngine); TaskCleanupJob.initializeSchedule(taskanaEngine);
jobsToRun = getJobMapper().findJobsToRun(); jobsToRun = getJobMapper().findJobsToRun(Instant.now());
assertThat(jobsToRun).doesNotContainAnyElementsOf(taskCleanupJobs); assertThat(jobsToRun).doesNotContainAnyElementsOf(taskCleanupJobs);
} }
@ -157,6 +162,35 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
return DynamicTest.stream(iterator, c -> "for parentBusinessProcessId = '" + c + "'", test); return DynamicTest.stream(iterator, c -> "for parentBusinessProcessId = '" + c + "'", test);
} }
@WithAccessId(user = "admin")
@Test
void should_SetNextScheduledJobBasedOnDueDateOfPredecessor_When_RunningTaskCleanupJob()
throws Exception {
JobMapper jobMapper = getJobMapper();
List<ScheduledJob> jobsToRun = jobMapper.findJobsToRun(Instant.now());
assertThat(jobsToRun).isEmpty();
Instant firstDue = Instant.now().truncatedTo(ChronoUnit.MILLIS);
ScheduledJob scheduledJob = new ScheduledJob();
scheduledJob.setType(ScheduledJob.Type.TASKCLEANUPJOB);
scheduledJob.setDue(firstDue);
JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();
jobService.createJob(scheduledJob);
jobsToRun = jobMapper.findJobsToRun(Instant.now());
assertThat(jobsToRun).hasSize(1);
assertThat(jobsToRun.get(0).getDue()).isEqualTo(firstDue);
JobRunner runner = new JobRunner(taskanaEngine);
runner.runJobs();
Duration runEvery = taskanaEngineConfiguration.getCleanupJobRunEvery();
jobsToRun = jobMapper.findJobsToRun(Instant.now().plus(runEvery));
assertThat(jobsToRun).hasSize(1);
assertThat(jobsToRun).extracting(ScheduledJob::getDue).containsExactly(firstDue.plus(runEvery));
}
private String createAndInsertTask(String parentBusinessProcessId) throws Exception { private String createAndInsertTask(String parentBusinessProcessId) throws Exception {
Task newTask = taskService.newTask("user-1-1", "DOMAIN_A"); Task newTask = taskService.newTask("user-1-1", "DOMAIN_A");
newTask.setClassificationKey("T2100"); newTask.setClassificationKey("T2100");

View File

@ -3,6 +3,7 @@ package acceptance.jobs;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import acceptance.AbstractAccTest; import acceptance.AbstractAccTest;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -102,7 +103,7 @@ class WorkbasketCleanupJobAccTest extends AbstractAccTest {
taskanaEngine.getJobService().createJob(job); taskanaEngine.getJobService().createJob(job);
} }
List<ScheduledJob> jobsToRun = getJobMapper().findJobsToRun(); List<ScheduledJob> jobsToRun = getJobMapper().findJobsToRun(Instant.now());
assertThat(jobsToRun).hasSize(30); assertThat(jobsToRun).hasSize(30);
@ -113,7 +114,7 @@ class WorkbasketCleanupJobAccTest extends AbstractAccTest {
WorkbasketCleanupJob.initializeSchedule(taskanaEngine); WorkbasketCleanupJob.initializeSchedule(taskanaEngine);
jobsToRun = getJobMapper().findJobsToRun(); jobsToRun = getJobMapper().findJobsToRun(Instant.now());
assertThat(jobsToRun).doesNotContainAnyElementsOf(workbasketCleanupJobs); assertThat(jobsToRun).doesNotContainAnyElementsOf(workbasketCleanupJobs);
} }