TSK-576: added transaction slicing to cleanup job.

This commit is contained in:
Holger Hagen 2018-08-08 23:01:18 +02:00 committed by Martin Rojas Miguel Angel
parent afdf6cbaff
commit d6a29e7bdd
6 changed files with 63 additions and 36 deletions

View File

@ -7,6 +7,7 @@ import java.util.List;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.TaskanaEngineImpl;
import pro.taskana.transaction.TaskanaTransactionProvider;
/**
* Abstract base for all background jobs of TASKANA.
@ -14,21 +15,25 @@ import pro.taskana.impl.TaskanaEngineImpl;
public abstract class AbstractTaskanaJob implements TaskanaJob {
protected TaskanaEngineImpl taskanaEngineImpl;
protected TaskanaTransactionProvider<Object> txProvider;
protected ScheduledJob scheduledJob;
public AbstractTaskanaJob(TaskanaEngine taskanaEngine, ScheduledJob job) {
public AbstractTaskanaJob(TaskanaEngine taskanaEngine, TaskanaTransactionProvider<Object> txProvider,
ScheduledJob job) {
this.taskanaEngineImpl = (TaskanaEngineImpl) taskanaEngine;
this.txProvider = txProvider;
this.scheduledJob = job;
}
public static TaskanaJob createFromScheduledJob(TaskanaEngine engine, ScheduledJob job) throws TaskanaException {
public static TaskanaJob createFromScheduledJob(TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider,
ScheduledJob job) throws TaskanaException {
switch (job.getType()) {
case CLASSIFICATIONCHANGEDJOB:
return new ClassificationChangedJob(engine, job);
return new ClassificationChangedJob(engine, txProvider, job);
case UPDATETASKSJOB:
return new TaskRefreshJob(engine, job);
return new TaskRefreshJob(engine, txProvider, job);
case TASKCLEANUPJOB:
return new TaskCleanupJob(engine, job);
return new TaskCleanupJob(engine, txProvider, job);
default:
throw new TaskanaException(
"No matching job found for " + job.getType() + " of ScheduledJob " + job.getJobId() + ".");

View File

@ -14,6 +14,7 @@ import pro.taskana.TaskSummary;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.util.LoggerUtils;
import pro.taskana.transaction.TaskanaTransactionProvider;
/**
* This class executes a job of type CLASSIFICATIONCHANGEDJOB.
@ -34,8 +35,9 @@ public class ClassificationChangedJob extends AbstractTaskanaJob {
private boolean priorityChanged;
private boolean serviceLevelChanged;
public ClassificationChangedJob(TaskanaEngine engine, ScheduledJob job) {
super(engine, job);
public ClassificationChangedJob(TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider,
ScheduledJob job) {
super(engine, txProvider, job);
Map<String, String> args = job.getArguments();
classificationId = args.get(CLASSIFICATION_ID);
priorityChanged = Boolean.parseBoolean(args.get(PRIORITY_CHANGED));

View File

@ -126,7 +126,7 @@ public class JobRunner {
private void runScheduledJob(ScheduledJob scheduledJob) {
LOGGER.debug("entry to runScheduledJob(job = {})", scheduledJob);
try {
TaskanaJob job = AbstractTaskanaJob.createFromScheduledJob(taskanaEngine, scheduledJob);
TaskanaJob job = AbstractTaskanaJob.createFromScheduledJob(taskanaEngine, txProvider, scheduledJob);
job.run();
} catch (Exception e) {
LOGGER.error("Error running job: {} ", scheduledJob.getType(), e);

View File

@ -14,6 +14,7 @@ import pro.taskana.TaskanaEngine;
import pro.taskana.TimeInterval;
import pro.taskana.exceptions.InvalidArgumentException;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.transaction.TaskanaTransactionProvider;
/**
* Job to cleanup completed tasks after a period of time.
@ -26,12 +27,15 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
private Instant firstRun;
private Duration runEvery;
private Duration minimumAge;
private int batchSize;
public TaskCleanupJob(TaskanaEngine taskanaEngine, ScheduledJob scheduledJob) {
super(taskanaEngine, scheduledJob);
public TaskCleanupJob(TaskanaEngine taskanaEngine, TaskanaTransactionProvider<Object> txProvider,
ScheduledJob scheduledJob) {
super(taskanaEngine, txProvider, scheduledJob);
firstRun = taskanaEngine.getConfiguration().getTaskCleanupJobFirstRun();
runEvery = taskanaEngine.getConfiguration().getTaskCleanupJobRunEvery();
minimumAge = taskanaEngine.getConfiguration().getTaskCleanupJobMinimumAge();
batchSize = taskanaEngine.getConfiguration().getMaxNumberOfTaskUpdatesPerTransaction();
}
@Override
@ -40,10 +44,18 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
LOGGER.info("Running job to delete all tasks completed before ({})", completedBefore.toString());
try {
List<TaskSummary> tasksCompletedBefore = getTasksCompletedBefore(completedBefore);
deleteTasks(tasksCompletedBefore);
int totalNumberOfTasksCompleted = 0;
while (tasksCompletedBefore.size() > 0) {
int upperLimit = batchSize;
if (upperLimit > tasksCompletedBefore.size()) {
upperLimit = tasksCompletedBefore.size();
}
totalNumberOfTasksCompleted += deleteTasksTransactionally(tasksCompletedBefore.subList(0, upperLimit));
tasksCompletedBefore.subList(0, upperLimit).clear();
}
scheduleNextCleanupJob();
LOGGER.info("Job ended successfully.");
} catch (InvalidArgumentException e) {
LOGGER.info("Job ended successfully. {} tasks deleted.", totalNumberOfTasksCompleted);
} catch (Exception e) {
throw new TaskanaException("Error while processing TaskCleanupJob.", e);
}
}
@ -55,16 +67,39 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
.list();
}
private void deleteTasks(List<TaskSummary> tasksToBeDeleted) throws InvalidArgumentException {
private int deleteTasksTransactionally(List<TaskSummary> tasksToBeDeleted) {
int deletedTaskCount = 0;
if (txProvider != null) {
Integer count = (Integer) txProvider.executeInTransaction(() -> {
try {
return new Integer(deleteTasks(tasksToBeDeleted));
} catch (Exception e) {
LOGGER.warn("Could not delete tasks.", e);
return new Integer(0);
}
});
return count.intValue();
} else {
try {
deletedTaskCount = deleteTasks(tasksToBeDeleted);
} catch (Exception e) {
LOGGER.warn("Could not delete tasks.", e);
}
}
return deletedTaskCount;
}
private int deleteTasks(List<TaskSummary> tasksToBeDeleted) throws InvalidArgumentException {
List<String> tasksIdsToBeDeleted = tasksToBeDeleted.stream()
.map(task -> task.getTaskId())
.collect(Collectors.toList());
BulkOperationResults<String, TaskanaException> results = taskanaEngineImpl.getTaskService()
.deleteTasks(tasksIdsToBeDeleted);
LOGGER.info("{} tasks deleted.", tasksIdsToBeDeleted.size() - results.getFailedIds().size());
LOGGER.debug("{} tasks deleted.", tasksIdsToBeDeleted.size() - results.getFailedIds().size());
for (String failedId : results.getFailedIds()) {
LOGGER.warn("Task with id {} could not be deleted. Reason: {}", failedId, results.getErrorForId(failedId));
}
return tasksIdsToBeDeleted.size() - results.getFailedIds().size();
}
public void scheduleNextCleanupJob() {
@ -92,7 +127,7 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
* @param taskanaEngine
*/
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
job.scheduleNextCleanupJob();
}

View File

@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.TaskServiceImpl;
import pro.taskana.transaction.TaskanaTransactionProvider;
/**
* This class executes a job of type CLASSIFICATIONCHANGEDJOB.
@ -25,8 +26,8 @@ public class TaskRefreshJob extends AbstractTaskanaJob {
private List<String> affectedTaskIds;
public TaskRefreshJob(TaskanaEngine engine, ScheduledJob job) {
super(engine, job);
public TaskRefreshJob(TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider, ScheduledJob job) {
super(engine, txProvider, job);
Map<String, String> args = job.getArguments();
String taskIdsString = args.get(ARG_TASK_IDS);
affectedTaskIds = Arrays.asList(taskIdsString.split(","));

View File

@ -3,12 +3,6 @@ package acceptance.jobs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -48,12 +42,7 @@ public class TaskCleanupJobAccTest extends AbstractAccTest {
long totalTasksCount = taskService.createTaskQuery().count();
assertEquals(72, totalTasksCount);
Instant completedBefore = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault())
.minusDays(14)
.toInstant();
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
job.run();
totalTasksCount = taskService.createTaskQuery().count();
@ -68,12 +57,7 @@ public class TaskCleanupJobAccTest extends AbstractAccTest {
Task createdTask = createAndCompleteTask();
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault())
.minusDays(14)
.toInstant();
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
job.run();
Task completedCreatedTask = taskService.getTask(createdTask.getId());