TSK-1332: Add HistoryCleanupJob
This commit is contained in:
parent
a5351f1158
commit
851b3536db
|
@ -0,0 +1,337 @@
|
|||
package pro.taskana.simplehistory.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.TaskanaEngineConfiguration;
|
||||
import pro.taskana.common.api.ScheduledJob;
|
||||
import pro.taskana.common.api.ScheduledJob.Type;
|
||||
import pro.taskana.common.api.TaskanaEngine;
|
||||
import pro.taskana.common.api.TimeInterval;
|
||||
import pro.taskana.common.api.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.common.api.exceptions.NotAuthorizedException;
|
||||
import pro.taskana.common.api.exceptions.SystemException;
|
||||
import pro.taskana.common.api.exceptions.TaskanaException;
|
||||
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
|
||||
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
|
||||
|
||||
public class HistoryCleanupJob extends AbstractTaskanaJob {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryCleanupJob.class);
|
||||
|
||||
private static final String TASKANA_PROPERTIES = "/taskana.properties";
|
||||
|
||||
private static final String TASKANA_JOB_HISTORY_BATCH_SIZE = "taskana.jobs.history.batchSize";
|
||||
private static final String TASKANA_JOB_HISTORY_CLEANUP_RUN_EVERY =
|
||||
"taskana.jobs.history.cleanup.runEvery";
|
||||
private static final String TASKANA_JOB_HISTORY_CLEANUP_FIRST_RUN =
|
||||
"taskana.jobs.history.cleanup.firstRunAt";
|
||||
private static final String TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE =
|
||||
"taskana.jobs.history.cleanup.minimumAge";
|
||||
|
||||
TaskanaHistoryEngineImpl taskanaHistoryEngine =
|
||||
TaskanaHistoryEngineImpl.createTaskanaEngine(taskanaEngineImpl.getConfiguration());
|
||||
|
||||
private Instant firstRun;
|
||||
private Duration runEvery;
|
||||
private Duration minimumAge;
|
||||
private int batchSize;
|
||||
private boolean allCompletedSameParentBusiness;
|
||||
|
||||
public HistoryCleanupJob(
|
||||
TaskanaEngine taskanaEngine,
|
||||
TaskanaTransactionProvider<Object> txProvider,
|
||||
ScheduledJob scheduledJob) {
|
||||
super(taskanaEngine, txProvider, scheduledJob);
|
||||
allCompletedSameParentBusiness =
|
||||
taskanaEngine.getConfiguration().isTaskCleanupJobAllCompletedSameParentBusiness();
|
||||
Properties props = readPropertiesFromFile(TASKANA_PROPERTIES);
|
||||
initJobParameters(props);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws TaskanaException {
|
||||
|
||||
Instant createdBefore = Instant.now().minus(minimumAge);
|
||||
|
||||
LOGGER.info("Running job to delete all history events created before ({})", createdBefore);
|
||||
|
||||
try {
|
||||
|
||||
SimpleHistoryServiceImpl simpleHistoryService =
|
||||
(SimpleHistoryServiceImpl) taskanaHistoryEngine.getTaskanaHistoryService();
|
||||
|
||||
List<HistoryEventImpl> historyEventsToClean =
|
||||
simpleHistoryService
|
||||
.createHistoryQuery()
|
||||
.createdWithin(new TimeInterval(null, createdBefore))
|
||||
.eventTypeIn("TASK_COMPLETED")
|
||||
.list();
|
||||
|
||||
if (allCompletedSameParentBusiness) {
|
||||
historyEventsToClean =
|
||||
filterSameParentBusinessHistoryEventsQualifiedToClean(
|
||||
simpleHistoryService, historyEventsToClean);
|
||||
}
|
||||
|
||||
int totalNumberOfHistoryEventsDeleted = 0;
|
||||
while (!historyEventsToClean.isEmpty()) {
|
||||
int upperLimit = batchSize;
|
||||
if (upperLimit > historyEventsToClean.size()) {
|
||||
upperLimit = historyEventsToClean.size();
|
||||
}
|
||||
totalNumberOfHistoryEventsDeleted +=
|
||||
deleteHistoryEventsTransactionally(historyEventsToClean.subList(0, upperLimit));
|
||||
historyEventsToClean.subList(0, upperLimit).clear();
|
||||
}
|
||||
LOGGER.info(
|
||||
"Job ended successfully. {} history events deleted.", totalNumberOfHistoryEventsDeleted);
|
||||
} catch (Exception e) {
|
||||
throw new TaskanaException("Error while processing HistoryCleanupJob.", e);
|
||||
} finally {
|
||||
scheduleNextCleanupJob();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the HistoryCleanupJob schedule. <br>
|
||||
* All scheduled cleanup jobs are cancelled/deleted and a new one is scheduled.
|
||||
*
|
||||
* @param taskanaEngine the TASKANA engine.
|
||||
*/
|
||||
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
|
||||
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
|
||||
job.scheduleNextCleanupJob();
|
||||
}
|
||||
|
||||
private List<HistoryEventImpl> filterSameParentBusinessHistoryEventsQualifiedToClean(
|
||||
SimpleHistoryServiceImpl simpleHistoryService, List<HistoryEventImpl> historyEventsToClean) {
|
||||
|
||||
Map<String, Long> eventsToCleanForParentBusinessCount = new HashMap<>();
|
||||
|
||||
historyEventsToClean.forEach(
|
||||
event ->
|
||||
eventsToCleanForParentBusinessCount.merge(
|
||||
event.getParentBusinessProcessId(), 1L, Long::sum));
|
||||
|
||||
Predicate<HistoryEventImpl> noCompletedEventsUnderMinimumAgeExistInSameParentBusiness =
|
||||
event ->
|
||||
simpleHistoryService
|
||||
.createHistoryQuery()
|
||||
.parentBusinessProcessIdIn(event.getParentBusinessProcessId())
|
||||
.eventTypeIn("TASK_COMPLETED")
|
||||
.count()
|
||||
== eventsToCleanForParentBusinessCount.get(event.getParentBusinessProcessId());
|
||||
|
||||
Predicate<HistoryEventImpl> allTasksCleanedSameParentBusiness =
|
||||
e ->
|
||||
taskanaEngineImpl
|
||||
.getTaskService()
|
||||
.createTaskQuery()
|
||||
.parentBusinessProcessIdIn(e.getParentBusinessProcessId())
|
||||
.count()
|
||||
== 0;
|
||||
|
||||
return historyEventsToClean.stream()
|
||||
.filter(
|
||||
noCompletedEventsUnderMinimumAgeExistInSameParentBusiness.and(
|
||||
allTasksCleanedSameParentBusiness))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private int deleteHistoryEventsTransactionally(List<HistoryEventImpl> historyEventsToBeDeleted) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(
|
||||
"entry to deleteHistoryEventsTransactionally(historyEventsToBeDeleted = {})",
|
||||
historyEventsToBeDeleted);
|
||||
}
|
||||
|
||||
int deletedEventsCount = 0;
|
||||
if (txProvider != null) {
|
||||
int count =
|
||||
(Integer)
|
||||
txProvider.executeInTransaction(
|
||||
() -> {
|
||||
try {
|
||||
return deleteEvents(historyEventsToBeDeleted);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Could not delete history events.", e);
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
LOGGER.debug("exit from deleteHistoryEventsTransactionally(), returning {}", count);
|
||||
return count;
|
||||
} else {
|
||||
try {
|
||||
deletedEventsCount = deleteEvents(historyEventsToBeDeleted);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Could not delete history events.", e);
|
||||
}
|
||||
}
|
||||
LOGGER.debug(
|
||||
"exit from deleteHistoryEventsTransactionally()(), returning {}", deletedEventsCount);
|
||||
return deletedEventsCount;
|
||||
}
|
||||
|
||||
private int deleteEvents(List<HistoryEventImpl> historyEventsToBeDeleted)
|
||||
throws InvalidArgumentException, NotAuthorizedException {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(
|
||||
"entry to deleteEvents(historyEventsToBeDeleted = {})", historyEventsToBeDeleted);
|
||||
}
|
||||
|
||||
List<String> taskIdsOfEventsToBeDeleted =
|
||||
historyEventsToBeDeleted.stream()
|
||||
.map(HistoryEventImpl::getTaskId)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
SimpleHistoryServiceImpl simpleHistoryService =
|
||||
(SimpleHistoryServiceImpl) taskanaHistoryEngine.getTaskanaHistoryService();
|
||||
|
||||
String[] taskIdsArray = new String[taskIdsOfEventsToBeDeleted.size()];
|
||||
int deletedTasksCount =
|
||||
(int)
|
||||
simpleHistoryService
|
||||
.createHistoryQuery()
|
||||
.taskIdIn(taskIdsOfEventsToBeDeleted.toArray(taskIdsArray))
|
||||
.count();
|
||||
|
||||
simpleHistoryService.deleteHistoryEventsByTaskIds(taskIdsOfEventsToBeDeleted);
|
||||
|
||||
LOGGER.debug("{} events deleted.", deletedTasksCount);
|
||||
|
||||
LOGGER.debug("exit from deleteEvents(), returning {}", taskIdsOfEventsToBeDeleted.size());
|
||||
return deletedTasksCount;
|
||||
}
|
||||
|
||||
private void scheduleNextCleanupJob() {
|
||||
LOGGER.debug("Entry to scheduleNextCleanupJob.");
|
||||
ScheduledJob job = new ScheduledJob();
|
||||
job.setType(Type.HISTORYCLEANUPJOB);
|
||||
job.setDue(getNextDueForHistoryCleanupJob());
|
||||
taskanaEngineImpl.getJobService().createJob(job);
|
||||
LOGGER.debug("Exit from scheduleNextCleanupJob.");
|
||||
}
|
||||
|
||||
private Instant getNextDueForHistoryCleanupJob() {
|
||||
Instant nextRunAt = firstRun;
|
||||
while (nextRunAt.isBefore(Instant.now())) {
|
||||
nextRunAt = nextRunAt.plus(runEvery);
|
||||
}
|
||||
LOGGER.info("Scheduling next run of the HistoryCleanupJob for {}", nextRunAt);
|
||||
return nextRunAt;
|
||||
}
|
||||
|
||||
private void initJobParameters(Properties props) {
|
||||
|
||||
String jobBatchSizeProperty = props.getProperty(TASKANA_JOB_HISTORY_BATCH_SIZE);
|
||||
if (jobBatchSizeProperty != null && !jobBatchSizeProperty.isEmpty()) {
|
||||
try {
|
||||
batchSize = Integer.parseInt(jobBatchSizeProperty);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn(
|
||||
"Could not parse jobBatchSizeProperty ({}). Using default. Exception: {} ",
|
||||
jobBatchSizeProperty,
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
String historyCleanupJobFirstRunProperty =
|
||||
props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_FIRST_RUN);
|
||||
if (historyCleanupJobFirstRunProperty != null && !historyCleanupJobFirstRunProperty.isEmpty()) {
|
||||
try {
|
||||
firstRun = Instant.parse(historyCleanupJobFirstRunProperty);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn(
|
||||
"Could not parse historyCleanupJobFirstRunProperty ({}). Using default."
|
||||
+ " Exception: {} ",
|
||||
historyCleanupJobFirstRunProperty,
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
String historyCleanupJobRunEveryProperty =
|
||||
props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_RUN_EVERY);
|
||||
if (historyCleanupJobRunEveryProperty != null && !historyCleanupJobRunEveryProperty.isEmpty()) {
|
||||
try {
|
||||
runEvery = Duration.parse(historyCleanupJobRunEveryProperty);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn(
|
||||
"Could not parse historyCleanupJobRunEveryProperty ({}). Using default. Exception: {} ",
|
||||
historyCleanupJobRunEveryProperty,
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
String historyEventCleanupJobMinimumAgeProperty =
|
||||
props.getProperty(TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE);
|
||||
if (historyEventCleanupJobMinimumAgeProperty != null
|
||||
&& !historyEventCleanupJobMinimumAgeProperty.isEmpty()) {
|
||||
try {
|
||||
minimumAge = Duration.parse(historyEventCleanupJobMinimumAgeProperty);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn(
|
||||
"Could not parse historyEventCleanupJobMinimumAgeProperty ({}). Using default."
|
||||
+ " Exception: {} ",
|
||||
historyEventCleanupJobMinimumAgeProperty,
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.debug("Configured number of history events per transaction: {}", batchSize);
|
||||
LOGGER.debug("HistoryCleanupJob configuration: first run at {}", firstRun);
|
||||
LOGGER.debug("HistoryCleanupJob configuration: runs every {}", runEvery);
|
||||
LOGGER.debug(
|
||||
"HistoryCleanupJob configuration: minimum age of history events to be cleanup up is {}",
|
||||
minimumAge);
|
||||
}
|
||||
|
||||
private Properties readPropertiesFromFile(String propertiesFile) {
|
||||
Properties props = new Properties();
|
||||
boolean loadFromClasspath = loadFromClasspath(propertiesFile);
|
||||
try {
|
||||
if (loadFromClasspath) {
|
||||
InputStream inputStream =
|
||||
TaskanaEngineConfiguration.class.getResourceAsStream(propertiesFile);
|
||||
if (inputStream == null) {
|
||||
LOGGER.error("taskana properties file {} was not found on classpath.", propertiesFile);
|
||||
} else {
|
||||
props.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
|
||||
LOGGER.debug(
|
||||
"taskana properties were loaded from file {} from classpath.", propertiesFile);
|
||||
}
|
||||
} else {
|
||||
props.load(new FileInputStream(propertiesFile));
|
||||
LOGGER.debug("taskana properties were loaded from file {}.", propertiesFile);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("caught IOException when processing properties file {}.", propertiesFile);
|
||||
throw new SystemException(
|
||||
"internal System error when processing properties file " + propertiesFile, e.getCause());
|
||||
}
|
||||
return props;
|
||||
}
|
||||
|
||||
private boolean loadFromClasspath(String propertiesFile) {
|
||||
boolean loadFromClasspath = true;
|
||||
File f = new File(propertiesFile);
|
||||
if (f.exists() && !f.isDirectory()) {
|
||||
loadFromClasspath = false;
|
||||
}
|
||||
return loadFromClasspath;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
package acceptance.jobs;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import acceptance.AbstractAccTest;
|
||||
import acceptance.security.JaasExtension;
|
||||
import acceptance.security.WithAccessId;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import pro.taskana.simplehistory.impl.HistoryCleanupJob;
|
||||
import pro.taskana.simplehistory.impl.HistoryEventImpl;
|
||||
|
||||
@ExtendWith(JaasExtension.class)
|
||||
class HistoryCleanupJobAccTest extends AbstractAccTest {
|
||||
|
||||
@BeforeEach
|
||||
void before() throws Exception {
|
||||
resetDb(getSchemaName());
|
||||
}
|
||||
|
||||
@Test
|
||||
@WithAccessId(user = "admin")
|
||||
void should_CleanHistoryEventsUntilDate_When_SameParentBusinessTrueAndEventsQualified()
|
||||
throws Exception {
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(13);
|
||||
|
||||
HistoryEventImpl eventToBeCleaned =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId1", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned.setCreated(Instant.now().minus(20, ChronoUnit.DAYS));
|
||||
eventToBeCleaned.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
HistoryEventImpl eventToBeCleaned2 =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId2", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned2.setCreated(Instant.now().minus(20, ChronoUnit.DAYS));
|
||||
eventToBeCleaned2.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
getHistoryService().create(eventToBeCleaned);
|
||||
getHistoryService().create(eventToBeCleaned2);
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(15);
|
||||
|
||||
taskanaEngine.getConfiguration().setTaskCleanupJobAllCompletedSameParentBusiness(true);
|
||||
|
||||
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
|
||||
job.run();
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(13);
|
||||
}
|
||||
|
||||
@Test
|
||||
@WithAccessId(user = "admin")
|
||||
void should_NotCleanHistoryEventsUntilDate_When_SameParentBusinessTrueAndEventsNotQualified()
|
||||
throws Exception {
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(13);
|
||||
|
||||
HistoryEventImpl eventToBeCleaned =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId1", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned.setCreated(Instant.now().minus(20, ChronoUnit.DAYS));
|
||||
eventToBeCleaned.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
HistoryEventImpl eventToBeCleaned2 =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId2", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned2.setCreated(Instant.now().minus(1, ChronoUnit.DAYS));
|
||||
eventToBeCleaned2.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
HistoryEventImpl eventToBeCleaned3 =
|
||||
createHistoryEvent(
|
||||
"wbKey1",
|
||||
"TKI:000000000000000000000000000000000001",
|
||||
"TASK_COMPLETED",
|
||||
"wbKey2",
|
||||
"someUserId",
|
||||
"someDetails");
|
||||
eventToBeCleaned3.setCreated(Instant.now().minus(20, ChronoUnit.DAYS));
|
||||
eventToBeCleaned3.setParentBusinessProcessId("PBPI21");
|
||||
|
||||
getHistoryService().create(eventToBeCleaned);
|
||||
getHistoryService().create(eventToBeCleaned2);
|
||||
getHistoryService().create(eventToBeCleaned3);
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(16);
|
||||
|
||||
taskanaEngine.getConfiguration().setTaskCleanupJobAllCompletedSameParentBusiness(true);
|
||||
|
||||
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
|
||||
job.run();
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(16);
|
||||
}
|
||||
|
||||
@Test
|
||||
@WithAccessId(user = "admin")
|
||||
void should_CleanHistoryEventsUntilDate_When_SameParentBusinessFalse() throws Exception {
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(13);
|
||||
|
||||
HistoryEventImpl eventToBeCleaned =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId1", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned.setCreated(Instant.now().minus(20, ChronoUnit.DAYS));
|
||||
eventToBeCleaned.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
HistoryEventImpl eventToBeCleaned2 =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId2", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned2.setCreated(Instant.now().minus(1, ChronoUnit.DAYS));
|
||||
eventToBeCleaned2.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
getHistoryService().create(eventToBeCleaned);
|
||||
getHistoryService().create(eventToBeCleaned2);
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(15);
|
||||
|
||||
taskanaEngine.getConfiguration().setTaskCleanupJobAllCompletedSameParentBusiness(false);
|
||||
|
||||
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
|
||||
job.run();
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(14);
|
||||
assertThat(getHistoryService().getHistoryEvent(eventToBeCleaned2.getId())).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@WithAccessId(user = "admin")
|
||||
void should_NotCleanHistoryEvents_When_MinimumAgeNotReached() throws Exception {
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(13);
|
||||
|
||||
HistoryEventImpl eventToBeCleaned =
|
||||
createHistoryEvent(
|
||||
"wbKey1", "taskId1", "TASK_COMPLETED", "wbKey2", "someUserId", "someDetails");
|
||||
eventToBeCleaned.setCreated(Instant.now().minus(1, ChronoUnit.DAYS));
|
||||
eventToBeCleaned.setParentBusinessProcessId("sameParentId");
|
||||
|
||||
getHistoryService().create(eventToBeCleaned);
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(14);
|
||||
|
||||
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
|
||||
job.run();
|
||||
|
||||
assertThat(getHistoryService().createHistoryQuery().count()).isEqualTo(14);
|
||||
}
|
||||
}
|
|
@ -12,6 +12,10 @@ taskana.jobs.batchSize=50
|
|||
taskana.jobs.cleanup.runEvery=P1D
|
||||
taskana.jobs.cleanup.firstRunAt=2018-07-25T08:00:00Z
|
||||
taskana.jobs.cleanup.minimumAge=P14D
|
||||
taskana.jobs.history.batchSize=50
|
||||
taskana.jobs.history.cleanup.runEvery=P1D
|
||||
taskana.jobs.history.cleanup.firstRunAt=2018-07-25T08:00:00Z
|
||||
taskana.jobs.history.cleanup.minimumAge=P15D
|
||||
taskana.german.holidays.enabled=true
|
||||
taskana.german.holidays.corpus-christi.enabled=false
|
||||
taskana.history.deletion.on.task.deletion.enabled=true
|
||||
|
|
|
@ -4,6 +4,11 @@ import java.time.Instant;
|
|||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import pro.taskana.classification.internal.jobs.ClassificationChangedJob;
|
||||
import pro.taskana.task.internal.jobs.TaskCleanupJob;
|
||||
import pro.taskana.task.internal.jobs.TaskRefreshJob;
|
||||
import pro.taskana.workbasket.internal.jobs.WorkbasketCleanupJob;
|
||||
|
||||
/**
|
||||
* This class holds all data that go into the Job table.
|
||||
*
|
||||
|
@ -172,9 +177,20 @@ public class ScheduledJob {
|
|||
|
||||
/** This enum controls the type of a job. */
|
||||
public enum Type {
|
||||
CLASSIFICATIONCHANGEDJOB,
|
||||
UPDATETASKSJOB,
|
||||
TASKCLEANUPJOB,
|
||||
WORKBASKETCLEANUPJOB;
|
||||
CLASSIFICATIONCHANGEDJOB(ClassificationChangedJob.class.getName()),
|
||||
UPDATETASKSJOB(TaskRefreshJob.class.getName()),
|
||||
TASKCLEANUPJOB(TaskCleanupJob.class.getName()),
|
||||
WORKBASKETCLEANUPJOB(WorkbasketCleanupJob.class.getName()),
|
||||
HISTORYCLEANUPJOB("pro.taskana.simplehistory.impl.HistoryCleanupJob");
|
||||
|
||||
private String clazz;
|
||||
|
||||
Type(String clazz) {
|
||||
this.clazz = clazz;
|
||||
}
|
||||
|
||||
public String getClazz() {
|
||||
return clazz;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package pro.taskana.common.internal.jobs;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -32,7 +33,9 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
|
|||
|
||||
public static TaskanaJob createFromScheduledJob(
|
||||
TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider, ScheduledJob job)
|
||||
throws TaskanaException {
|
||||
throws TaskanaException, ClassNotFoundException, IllegalAccessException,
|
||||
InstantiationException, InvocationTargetException {
|
||||
|
||||
switch (job.getType()) {
|
||||
case CLASSIFICATIONCHANGEDJOB:
|
||||
return new ClassificationChangedJob(engine, txProvider, job);
|
||||
|
@ -42,6 +45,14 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
|
|||
return new TaskCleanupJob(engine, txProvider, job);
|
||||
case WORKBASKETCLEANUPJOB:
|
||||
return new WorkbasketCleanupJob(engine, txProvider, job);
|
||||
case HISTORYCLEANUPJOB:
|
||||
return (TaskanaJob)
|
||||
Thread.currentThread()
|
||||
.getContextClassLoader()
|
||||
.loadClass(job.getType().getClazz())
|
||||
.getConstructors()[0]
|
||||
.newInstance(engine, txProvider, job);
|
||||
|
||||
default:
|
||||
throw new TaskanaException(
|
||||
"No matching job found for "
|
||||
|
|
|
@ -2,7 +2,6 @@ package pro.taskana.task.internal.jobs;
|
|||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -86,8 +85,10 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
|
|||
}
|
||||
|
||||
private List<TaskSummary> getTasksCompletedBefore(Instant untilDate) {
|
||||
|
||||
LOGGER.debug("entry to getTasksCompletedBefore(untilDate = {})", untilDate);
|
||||
List<TaskSummary> taskList =
|
||||
|
||||
List<TaskSummary> tasksToDelete =
|
||||
taskanaEngineImpl
|
||||
.getTaskService()
|
||||
.createTaskQuery()
|
||||
|
@ -98,45 +99,41 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
|
|||
if (allCompletedSameParentBusiness) {
|
||||
Map<String, Long> numberParentTasksShouldHave = new HashMap<>();
|
||||
Map<String, Long> countParentTask = new HashMap<>();
|
||||
for (TaskSummary task : taskList) {
|
||||
numberParentTasksShouldHave.put(
|
||||
task.getParentBusinessProcessId(),
|
||||
taskanaEngineImpl
|
||||
.getTaskService()
|
||||
.createTaskQuery()
|
||||
.parentBusinessProcessIdIn(task.getParentBusinessProcessId())
|
||||
.count());
|
||||
countParentTask.merge(task.getParentBusinessProcessId(), 1L, Long::sum);
|
||||
}
|
||||
|
||||
List<String> idsList = new ArrayList<>();
|
||||
numberParentTasksShouldHave.forEach(
|
||||
(k, v) -> {
|
||||
if (v.compareTo(countParentTask.get(k)) == 0) {
|
||||
idsList.add(k);
|
||||
tasksToDelete.forEach(
|
||||
task -> {
|
||||
if (!numberParentTasksShouldHave.containsKey(task.getParentBusinessProcessId())) {
|
||||
numberParentTasksShouldHave.put(
|
||||
task.getParentBusinessProcessId(),
|
||||
taskanaEngineImpl
|
||||
.getTaskService()
|
||||
.createTaskQuery()
|
||||
.parentBusinessProcessIdIn(task.getParentBusinessProcessId())
|
||||
.count());
|
||||
}
|
||||
countParentTask.merge(task.getParentBusinessProcessId(), 1L, Long::sum);
|
||||
});
|
||||
|
||||
if (idsList.isEmpty()) {
|
||||
LOGGER.debug("exit from getTasksCompletedBefore(), returning {}", new ArrayList<>());
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<String> taskIdsNotAllCompletedSameParentBusiness =
|
||||
numberParentTasksShouldHave.entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().equals(countParentTask.get(entry.getKey())))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
String[] ids = new String[idsList.size()];
|
||||
ids = idsList.toArray(ids);
|
||||
taskList =
|
||||
taskanaEngineImpl
|
||||
.getTaskService()
|
||||
.createTaskQuery()
|
||||
.parentBusinessProcessIdIn(ids)
|
||||
.list();
|
||||
tasksToDelete =
|
||||
tasksToDelete.stream()
|
||||
.filter(
|
||||
taskSummary ->
|
||||
!taskIdsNotAllCompletedSameParentBusiness.contains(
|
||||
taskSummary.getParentBusinessProcessId()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("exit from getTasksCompletedBefore(), returning {}", taskList);
|
||||
LOGGER.debug("exit from getTasksCompletedBefore(), returning {}", tasksToDelete);
|
||||
}
|
||||
|
||||
return taskList;
|
||||
return tasksToDelete;
|
||||
}
|
||||
|
||||
private int deleteTasksTransactionally(List<TaskSummary> tasksToBeDeleted) {
|
||||
|
|
|
@ -11,8 +11,8 @@ INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000005', 'TKI
|
|||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000006', 'TKI:000000000000000000000000000000000002', 'some text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000007', 'TKI:000000000000000000000000000000000002', 'some other text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
-- TaskComments for CreateTaskCommentAccTest
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000008', 'TKI:000000000000000000000000000000000026', 'some text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000009', 'TKI:000000000000000000000000000000000026', 'some other text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000010', 'TKI:000000000000000000000000000000000027', 'some text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000011', 'TKI:000000000000000000000000000000000027', 'some other text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000012', 'TKI:000000000000000000000000000000000004', 'some text in textfield', 'user-1-1', '2018-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000008', 'TKI:000000000000000000000000000000000026', 'some text in textfield', 'user_1_1', '2016-01-29 15:55:00', '2030-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000009', 'TKI:000000000000000000000000000000000026', 'some other text in textfield', 'user_1_1', '2015-01-29 15:55:00', '2000-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000010', 'TKI:000000000000000000000000000000000027', 'some text in textfield', 'user_1_1', '2020-01-29 15:55:00', '2024-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000011', 'TKI:000000000000000000000000000000000027', 'some other text in textfield', 'user_1_1', '2018-01-29 15:55:00', '1988-01-30 15:55:00');
|
||||
INSERT INTO TASK_COMMENT VALUES('TCI:000000000000000000000000000000000012', 'TKI:000000000000000000000000000000000004', 'some text in textfield', 'user_1_1', '2017-01-29 15:55:00', '2018-01-30 15:55:00');
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.security.Principal;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -13,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import pro.taskana.common.api.ScheduledJob.Type;
|
||||
import pro.taskana.common.api.TaskanaEngine;
|
||||
import pro.taskana.common.api.TaskanaRole;
|
||||
import pro.taskana.common.internal.jobs.JobRunner;
|
||||
|
@ -30,10 +32,20 @@ public class JobScheduler {
|
|||
@Autowired private TaskanaEngine taskanaEngine;
|
||||
|
||||
@PostConstruct
|
||||
public void scheduleCleanupJob() {
|
||||
public void scheduleCleanupJob()
|
||||
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException,
|
||||
ClassNotFoundException {
|
||||
LOGGER.debug("Entry to scheduleCleanupJob.");
|
||||
TaskCleanupJob.initializeSchedule(taskanaEngine);
|
||||
WorkbasketCleanupJob.initializeSchedule(taskanaEngine);
|
||||
|
||||
if (taskanaEngine.isHistoryEnabled()) {
|
||||
Thread.currentThread()
|
||||
.getContextClassLoader()
|
||||
.loadClass(Type.HISTORYCLEANUPJOB.getClazz())
|
||||
.getDeclaredMethod("initializeSchedule", TaskanaEngine.class)
|
||||
.invoke(null, taskanaEngine);
|
||||
}
|
||||
LOGGER.debug("Exit from scheduleCleanupJob.");
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue