TSK-1499: Fixed bug of also cleaning completed tasks with parentProcessId null/empty (#1394)

* TSK-1499: Fixed bug of also cleaning completed tasks with parentProcessId null/empty

* TSK-1499: Now also fixed for the HistoryCleanupJob

* TSK-1499: Improvements after review
This commit is contained in:
tge20 2021-02-03 09:34:22 +01:00 committed by GitHub
parent 368b3c7d54
commit aa6d304b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 173 additions and 77 deletions

View File

@ -5,23 +5,24 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class ListUtil {
public class CollectionUtil {
private ListUtil() {
private CollectionUtil() {
throw new IllegalStateException("Utility class");
}
/**
* Splits a list with objects of type T into chunks of a certain size.
* Splits a collection with objects of type T into chunks of a certain size.
*
* @param <T> type of elements inside list
* @param inputList list to be divided
* @param <T> type of elements inside collection
* @param inputCollection collection to be divided
* @param size maximal number of elements inside chunk
* @return list containing the chunks
*/
public static <T> Collection<List<T>> partitionBasedOnSize(Collection<T> inputList, int size) {
public static <T> Collection<List<T>> partitionBasedOnSize(
Collection<T> inputCollection, int size) {
final AtomicInteger counter = new AtomicInteger(0);
return inputList.stream()
return inputCollection.stream()
.collect(Collectors.groupingBy(s -> counter.getAndIncrement() / size))
.values();
}

View File

@ -8,17 +8,17 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import pro.taskana.common.internal.util.ListUtil;
import pro.taskana.common.internal.util.CollectionUtil;
/** Test for the ListUtil class. */
class ListUtilTest {
class CollectionUtilTest {
@Test
void should_SplitListIntoChunks_When_CallingPartitionBasedOnSize() {
List<Integer> listWith1000Entries =
IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
assertThat(listWith1000Entries).hasSize(1000);
Collection<List<Integer>> partitions = ListUtil.partitionBasedOnSize(listWith1000Entries, 100);
Collection<List<Integer>> partitions = CollectionUtil
.partitionBasedOnSize(listWith1000Entries, 100);
assertThat(partitions).hasSize(10);
}
}

View File

@ -16,6 +16,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,6 +33,7 @@ import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
import pro.taskana.common.internal.util.CollectionUtil;
import pro.taskana.simplehistory.impl.SimpleHistoryServiceImpl;
import pro.taskana.simplehistory.impl.TaskanaHistoryEngineImpl;
import pro.taskana.spi.history.api.events.task.TaskHistoryEvent;
@ -82,7 +85,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
LOGGER.info("Running job to delete all history events created before ({})", createdBefore);
try {
SimpleHistoryServiceImpl simpleHistoryService =
(SimpleHistoryServiceImpl) taskanaHistoryEngine.getTaskanaHistoryService();
@ -96,43 +98,48 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
TaskHistoryEventType.TERMINATED.getName())
.list();
List<String> taskIdsToDeleteHistoryEventsFor;
Set<String> taskIdsToDeleteHistoryEventsFor;
if (allCompletedSameParentBusiness) {
String[] parentBusinessProcessIds =
historyEventCandidatesToClean.stream()
.map(TaskHistoryEvent::getParentBusinessProcessId)
.distinct()
.toArray(String[]::new);
historyEventCandidatesToClean.addAll(
simpleHistoryService
.createTaskHistoryQuery()
.parentBusinessProcessIdIn(parentBusinessProcessIds)
.eventTypeIn(TaskHistoryEventType.CREATED.getName())
.list());
taskIdsToDeleteHistoryEventsFor =
filterSameParentBusinessHistoryEventsQualifiedToClean(historyEventCandidatesToClean);
historyEventCandidatesToClean.stream()
.filter(
event ->
event.getParentBusinessProcessId() == null
|| event.getParentBusinessProcessId().isEmpty())
.map(TaskHistoryEvent::getTaskId)
.collect(Collectors.toSet());
historyEventCandidatesToClean.removeIf(
event -> taskIdsToDeleteHistoryEventsFor.contains(event.getTaskId()));
if (!historyEventCandidatesToClean.isEmpty()) {
String[] parentBusinessProcessIds =
historyEventCandidatesToClean.stream()
.map(TaskHistoryEvent::getParentBusinessProcessId)
.distinct()
.toArray(String[]::new);
historyEventCandidatesToClean.addAll(
simpleHistoryService
.createTaskHistoryQuery()
.parentBusinessProcessIdIn(parentBusinessProcessIds)
.eventTypeIn(TaskHistoryEventType.CREATED.getName())
.list());
taskIdsToDeleteHistoryEventsFor.addAll(
filterSameParentBusinessHistoryEventsQualifiedToClean(historyEventCandidatesToClean));
}
} else {
taskIdsToDeleteHistoryEventsFor =
historyEventCandidatesToClean.stream()
.map(TaskHistoryEvent::getTaskId)
.collect(toList());
.collect(Collectors.toSet());
}
int totalNumberOfHistoryEventsDeleted = 0;
while (!taskIdsToDeleteHistoryEventsFor.isEmpty()) {
int upperLimit = batchSize;
if (upperLimit > taskIdsToDeleteHistoryEventsFor.size()) {
upperLimit = taskIdsToDeleteHistoryEventsFor.size();
}
totalNumberOfHistoryEventsDeleted +=
deleteHistoryEventsTransactionally(
taskIdsToDeleteHistoryEventsFor.subList(0, upperLimit));
taskIdsToDeleteHistoryEventsFor.subList(0, upperLimit).clear();
}
int totalNumberOfHistoryEventsDeleted =
CollectionUtil.partitionBasedOnSize(taskIdsToDeleteHistoryEventsFor, batchSize).stream()
.mapToInt(this::deleteHistoryEventsTransactionally)
.sum();
LOGGER.info(
"Job ended successfully. {} history events deleted.", totalNumberOfHistoryEventsDeleted);
} catch (Exception e) {

View File

@ -5,14 +5,21 @@ import static org.assertj.core.api.Assertions.assertThat;
import acceptance.AbstractAccTest;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.ThrowingConsumer;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.ScheduledJob.Type;
import pro.taskana.common.internal.util.Pair;
import pro.taskana.common.test.security.JaasExtension;
import pro.taskana.common.test.security.WithAccessId;
import pro.taskana.simplehistory.impl.jobs.HistoryCleanupJob;
@ -394,4 +401,50 @@ class HistoryCleanupJobAccTest extends AbstractAccTest {
assertThat(jobsToRun).doesNotContainAnyElementsOf(historyCleanupJobs);
}
@WithAccessId(user = "admin")
@TestFactory
Stream<DynamicTest>
should_CleanTaskHistoryEventsWithParentProcessIdEmptyOrNull_When_TaskCompleted() {
Iterator<String> iterator = Arrays.asList("", null).iterator();
String taskId1 = "taskId1";
String taskId2 = "taskId2";
List<TaskHistoryEvent> events =
List.of(
Pair.of(taskId1, TaskHistoryEventType.CREATED),
Pair.of(taskId1, TaskHistoryEventType.COMPLETED),
Pair.of(taskId2, TaskHistoryEventType.CREATED))
.stream()
.map(
pair ->
createTaskHistoryEvent(
"wbKey1",
pair.getLeft(),
pair.getRight().getName(),
"wbKey2",
"someUserId",
"someDetails"))
.collect(Collectors.toList());
taskanaEngine.getConfiguration().setTaskCleanupJobAllCompletedSameParentBusiness(true);
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
ThrowingConsumer<String> test =
parentBusinessId -> {
getHistoryService().deleteHistoryEventsByTaskIds(List.of(taskId1, taskId2));
events.forEach(x -> x.setCreated(Instant.now().minus(20, ChronoUnit.DAYS)));
events.forEach(x -> x.setParentBusinessProcessId(parentBusinessId));
events.forEach(x -> getHistoryService().create(x));
job.run();
List<TaskHistoryEvent> eventsAfterCleanup =
getHistoryService().createTaskHistoryQuery().taskIdIn(taskId1, taskId2).list();
assertThat(eventsAfterCleanup)
.extracting(TaskHistoryEvent::getTaskId)
.containsExactly(taskId2);
};
return DynamicTest.stream(iterator, c -> "for parentBusinessProcessId = '" + c + "'", test);
}
}

View File

@ -33,8 +33,8 @@ import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.InternalTaskanaEngine;
import pro.taskana.common.internal.util.CheckedConsumer;
import pro.taskana.common.internal.util.CollectionUtil;
import pro.taskana.common.internal.util.IdGenerator;
import pro.taskana.common.internal.util.ListUtil;
import pro.taskana.common.internal.util.ObjectAttributeChangeDetector;
import pro.taskana.common.internal.util.Pair;
import pro.taskana.spi.history.api.events.task.TaskCancelledEvent;
@ -999,7 +999,7 @@ public class TaskServiceImpl implements TaskService {
// splitting Augmentation into steps of maximal 32000 tasks
// reason: DB2 has a maximum for parameters in a query
List<TaskSummary> result =
ListUtil.partitionBasedOnSize(taskSummaries, 32000).stream()
CollectionUtil.partitionBasedOnSize(taskSummaries, 32000).stream()
.map(this::augmentTaskSummariesByContainedSummariesWithoutPartitioning)
.flatMap(Collection::stream)
.collect(Collectors.toList());

View File

@ -21,6 +21,7 @@ import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
import pro.taskana.common.internal.util.CollectionUtil;
import pro.taskana.common.internal.util.LogSanitizer;
import pro.taskana.task.api.models.TaskSummary;
@ -57,17 +58,13 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
LOGGER.info("Running job to delete all tasks completed before ({})", completedBefore);
try {
List<TaskSummary> tasksCompletedBefore = getTasksCompletedBefore(completedBefore);
int totalNumberOfTasksCompleted = 0;
while (!tasksCompletedBefore.isEmpty()) {
int upperLimit = batchSize;
if (upperLimit > tasksCompletedBefore.size()) {
upperLimit = tasksCompletedBefore.size();
}
totalNumberOfTasksCompleted +=
deleteTasksTransactionally(tasksCompletedBefore.subList(0, upperLimit));
tasksCompletedBefore.subList(0, upperLimit).clear();
}
LOGGER.info("Job ended successfully. {} tasks deleted.", totalNumberOfTasksCompleted);
int totalNumberOfTasksDeleted =
CollectionUtil.partitionBasedOnSize(tasksCompletedBefore, batchSize).stream()
.mapToInt(this::deleteTasksTransactionally)
.sum();
LOGGER.info("Job ended successfully. {} tasks deleted.", totalNumberOfTasksDeleted);
} catch (Exception e) {
throw new TaskanaException("Error while processing TaskCleanupJob.", e);
} finally {
@ -119,6 +116,8 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
List<String> taskIdsNotAllCompletedSameParentBusiness =
numberParentTasksShouldHave.entrySet().stream()
.filter(entry -> entry.getKey() != null)
.filter(entry -> !entry.getKey().isEmpty())
.filter(entry -> !entry.getValue().equals(countParentTask.get(entry.getKey())))
.map(Map.Entry::getKey)
.collect(Collectors.toList());

View File

@ -17,6 +17,7 @@ import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
import pro.taskana.common.internal.util.CollectionUtil;
import pro.taskana.workbasket.api.WorkbasketQueryColumnName;
/**
@ -47,16 +48,12 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
LOGGER.info("Running job to delete all workbaskets marked for deletion");
try {
List<String> workbasketsMarkedForDeletion = getWorkbasketsMarkedForDeletion();
int totalNumberOfWorkbasketDeleted = 0;
while (workbasketsMarkedForDeletion.size() > 0) {
int upperLimit = batchSize;
if (upperLimit > workbasketsMarkedForDeletion.size()) {
upperLimit = workbasketsMarkedForDeletion.size();
}
totalNumberOfWorkbasketDeleted +=
deleteWorkbasketsTransactionally(workbasketsMarkedForDeletion.subList(0, upperLimit));
workbasketsMarkedForDeletion.subList(0, upperLimit).clear();
}
int totalNumberOfWorkbasketDeleted =
CollectionUtil.partitionBasedOnSize(workbasketsMarkedForDeletion, batchSize).stream()
.mapToInt(this::deleteWorkbasketsTransactionally)
.sum();
LOGGER.info(
"Job ended successfully. {} workbaskets deleted.", totalNumberOfWorkbasketDeleted);
} catch (Exception e) {

View File

@ -3,12 +3,19 @@ package acceptance.jobs;
import static org.assertj.core.api.Assertions.assertThat;
import acceptance.AbstractAccTest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.ThrowingConsumer;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.ScheduledJob.Type;
@ -35,9 +42,11 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
@WithAccessId(user = "admin")
@Test
void shouldCleanCompletedTasksUntilDate() throws Exception {
void should_CleanCompletedTasksUntilDate() throws Exception {
String id = createAndInsertTask(null);
taskService.claim(id);
taskService.completeTask(id);
createAndCompleteTask();
long totalTasksCount = taskService.createTaskQuery().count();
assertThat(totalTasksCount).isEqualTo(88);
@ -79,12 +88,14 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
@WithAccessId(user = "admin")
@Test
void shouldNotCleanCompleteTasksAfterDefinedDay() throws Exception {
Task createdTask = createAndCompleteTask();
String id = createAndInsertTask(null);
taskService.claim(id);
taskService.completeTask(id);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
job.run();
Task completedCreatedTask = taskService.getTask(createdTask.getId());
Task completedCreatedTask = taskService.getTask(id);
assertThat(completedCreatedTask).isNotNull();
}
@ -118,14 +129,40 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
assertThat(jobsToRun).doesNotContainAnyElementsOf(taskCleanupJobs);
}
private Task createAndCompleteTask() throws Exception {
@WithAccessId(user = "admin")
@TestFactory
Stream<DynamicTest>
should_DeleteCompletedTaskWithParentBusinessEmptyOrNull_When_RunningCleanupJob() {
Iterator<String> iterator = Arrays.asList("", null).iterator();
taskanaEngine.getConfiguration().setTaskCleanupJobAllCompletedSameParentBusiness(true);
taskanaEngine.getConfiguration().setCleanupJobMinimumAge(Duration.ZERO);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
ThrowingConsumer<String> test =
parentBusinessId -> {
String taskId1 = createAndInsertTask(parentBusinessId);
taskService.claim(taskId1);
taskService.completeTask(taskId1);
String taskId2 = createAndInsertTask(parentBusinessId);
taskService.claim(taskId2);
job.run();
List<TaskSummary> tasksAfterCleaning =
taskService.createTaskQuery().idIn(taskId1, taskId2).list();
assertThat(tasksAfterCleaning).extracting(TaskSummary::getId).containsExactly(taskId2);
};
return DynamicTest.stream(iterator, c -> "for parentBusinessProcessId = '" + c + "'", test);
}
private String createAndInsertTask(String parentBusinessProcessId) throws Exception {
Task newTask = taskService.newTask("user-1-1", "DOMAIN_A");
newTask.setClassificationKey("T2100");
newTask.setPrimaryObjRef(
createObjectReference("COMPANY_A", "SYSTEM_A", "INSTANCE_A", "VNR", "1234567"));
Task createdTask = taskService.createTask(newTask);
taskService.claim(createdTask.getId());
taskService.completeTask(createdTask.getId());
return createdTask;
newTask.setParentBusinessProcessId(parentBusinessProcessId);
return taskService.createTask(newTask).getId();
}
}

View File

@ -40,7 +40,7 @@ import pro.taskana.common.api.BaseQuery.SortDirection;
import pro.taskana.common.api.TimeInterval;
import pro.taskana.common.api.exceptions.InvalidArgumentException;
import pro.taskana.common.internal.TaskanaEngineProxy;
import pro.taskana.common.internal.util.ListUtil;
import pro.taskana.common.internal.util.CollectionUtil;
import pro.taskana.common.internal.util.Triplet;
import pro.taskana.common.test.security.JaasExtension;
import pro.taskana.common.test.security.WithAccessId;
@ -136,12 +136,14 @@ class QueryTasksAccTest extends AbstractAccTest {
@WithAccessId(user = "admin")
@Test
void should_SplitTaskListIntoChunksOf32000_When_AugmentingTasksAfterTaskQuery() {
MockedStatic<ListUtil> listUtilMock = Mockito.mockStatic(ListUtil.class);
listUtilMock.when(() -> ListUtil.partitionBasedOnSize(any(), anyInt())).thenCallRealMethod();
MockedStatic<CollectionUtil> listUtilMock = Mockito.mockStatic(CollectionUtil.class);
listUtilMock
.when(() -> CollectionUtil.partitionBasedOnSize(any(), anyInt()))
.thenCallRealMethod();
TASK_SERVICE.createTaskQuery().list();
listUtilMock.verify(() -> ListUtil.partitionBasedOnSize(any(), eq(32000)));
listUtilMock.verify(() -> CollectionUtil.partitionBasedOnSize(any(), eq(32000)));
}
@WithAccessId(user = "admin")