From 9280ff626a8b352c53ed8c6eb6656f9bf062ef6f Mon Sep 17 00:00:00 2001 From: ryzheboka <25465835+ryzheboka@users.noreply.github.com> Date: Tue, 25 Jun 2024 14:00:36 +0200 Subject: [PATCH] Closes #2599 - add locking to Query to enable adapter clustering --- lib/taskana-core-test/pom.xml | 6 + .../task/query/TaskQueryImplAccTest.java | 111 ++++++++++++++++++ .../java/pro/taskana/task/api/TaskQuery.java | 11 ++ .../pro/taskana/task/api/TaskService.java | 2 + .../taskana/task/internal/TaskQueryImpl.java | 18 ++- .../task/internal/TaskQuerySqlProvider.java | 18 ++- .../task/internal/TaskServiceImpl.java | 2 + 7 files changed, 160 insertions(+), 8 deletions(-) diff --git a/lib/taskana-core-test/pom.xml b/lib/taskana-core-test/pom.xml index 5206793ec..de1e849fc 100644 --- a/lib/taskana-core-test/pom.xml +++ b/lib/taskana-core-test/pom.xml @@ -72,6 +72,12 @@ ${version.equalsverifier} test + + pro.taskana + taskana-common-test + ${project.version} + test + diff --git a/lib/taskana-core-test/src/test/java/acceptance/task/query/TaskQueryImplAccTest.java b/lib/taskana-core-test/src/test/java/acceptance/task/query/TaskQueryImplAccTest.java index 9d7608b61..970502b53 100644 --- a/lib/taskana-core-test/src/test/java/acceptance/task/query/TaskQueryImplAccTest.java +++ b/lib/taskana-core-test/src/test/java/acceptance/task/query/TaskQueryImplAccTest.java @@ -2,16 +2,23 @@ package acceptance.task.query; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.catchThrowableOfType; import static pro.taskana.task.api.CallbackState.CALLBACK_PROCESSING_REQUIRED; import static pro.taskana.testapi.DefaultTestEntities.defaultTestClassification; import static pro.taskana.testapi.DefaultTestEntities.defaultTestObjectReference; import static pro.taskana.testapi.DefaultTestEntities.defaultTestWorkbasket; +import java.security.PrivilegedAction; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.security.auth.Subject; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DynamicTest; @@ -26,8 +33,13 @@ import pro.taskana.classification.api.models.ClassificationSummary; import pro.taskana.common.api.IntInterval; import pro.taskana.common.api.KeyDomain; import pro.taskana.common.api.TimeInterval; +import pro.taskana.common.api.exceptions.SystemException; import pro.taskana.common.api.security.CurrentUserContext; +import pro.taskana.common.api.security.UserPrincipal; +import pro.taskana.common.internal.InternalTaskanaEngine; +import pro.taskana.common.internal.util.CheckedConsumer; import pro.taskana.common.internal.util.Pair; +import pro.taskana.common.test.util.ParallelThreadHelper; import pro.taskana.task.api.CallbackState; import pro.taskana.task.api.TaskCustomField; import pro.taskana.task.api.TaskCustomIntField; @@ -54,6 +66,7 @@ import pro.taskana.workbasket.api.models.WorkbasketSummary; class TaskQueryImplAccTest { @TaskanaInject TaskService taskService; + @TaskanaInject InternalTaskanaEngine internalTaskanaEngine; @TaskanaInject WorkbasketService workbasketService; @TaskanaInject CurrentUserContext currentUserContext; @TaskanaInject ClassificationService classificationService; @@ -98,6 +111,104 @@ class TaskQueryImplAccTest { .buildAndStore(workbasketService, "businessadmin"); } + @Nested + @TestInstance(Lifecycle.PER_CLASS) + class LockResultsEqualsTest { + private static final Integer LOCK_RESULTS_EQUALS = 2; + WorkbasketSummary wb1; + TaskSummary taskSummary1; + TaskSummary taskSummary2; + TaskSummary taskSummary3; + TaskSummary taskSummary4; + + @WithAccessId(user = "user-1-1") + @BeforeAll + void setup() throws Exception { + wb1 = createWorkbasketWithPermission(); + + taskSummary1 = taskInWorkbasket(wb1).state(TaskState.READY) + .buildAndStoreAsSummary(taskService); + taskSummary2 = taskInWorkbasket(wb1).state(TaskState.READY) + .buildAndStoreAsSummary(taskService); + taskSummary3 = + taskInWorkbasket(wb1).state(TaskState.READY) + .buildAndStoreAsSummary(taskService); + taskSummary4 = taskInWorkbasket(wb1).state(TaskState.READY) + .buildAndStoreAsSummary(taskService); + + } + + @Test + void should_ReturnDifferentTasks_For_LockResultsEqualsTwo() throws Exception { + if (System.getenv("DB") != null && (System.getenv("DB").equals("POSTGRES") + || System.getenv("DB").equals("DB2"))) { + + List returnedTasks = Collections.synchronizedList(new ArrayList<>()); + List accessIds = + Collections.synchronizedList( + Stream.of("admin", "admin") + .collect(Collectors.toList())); + + ParallelThreadHelper.runInThread( + getRunnableTest(returnedTasks, accessIds), accessIds.size()); + + assertThat(returnedTasks) + .extracting(TaskSummary::getId) + .containsExactlyInAnyOrder( + taskSummary1.getId(), taskSummary2.getId(), taskSummary3.getId(), + taskSummary4.getId()); + } + } + + @Test + void should_ThrowException_When_UsingLockResultsWithSelectAndClaim() { + ThrowingCallable call = () -> taskService.selectAndClaim(taskService + .createTaskQuery() + .workbasketIdIn(wb1.getId()) + .stateIn(TaskState.READY) + .lockResultsEquals(LOCK_RESULTS_EQUALS)); + IllegalArgumentException e = catchThrowableOfType(IllegalArgumentException.class, call); + assertThat(e).isNotNull(); + assertThat(e.getMessage()).isEqualTo("The params \"lockResultsEquals\" and " + + "\"selectAndClaim\" cannot be used together!"); + } + + private Runnable getRunnableTest(List listedTasks, List accessIds) { + return () -> { + Subject subject = new Subject(); + subject.getPrincipals().add(new UserPrincipal(accessIds.remove(0))); + + Consumer consumer = + CheckedConsumer.wrap( + taskService -> { + internalTaskanaEngine.executeInDatabaseConnection(() -> { + List results = taskService + .createTaskQuery() + .workbasketIdIn(wb1.getId()) + .stateIn(TaskState.READY) + .lockResultsEquals(LOCK_RESULTS_EQUALS).list(); + listedTasks.addAll(results); + for (TaskSummary task : results) { + try { + taskService.claim(task.getId()); + } catch (Exception e) { + throw new SystemException(e.getMessage()); + } + } + }); + }); + + + PrivilegedAction action = + () -> { + consumer.accept(taskService); + return null; + }; + Subject.doAs(subject, action); + }; + } + } + @Nested @TestInstance(Lifecycle.PER_CLASS) class PermissionsTest { diff --git a/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskQuery.java b/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskQuery.java index fb72c306b..886623560 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskQuery.java +++ b/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskQuery.java @@ -1905,4 +1905,15 @@ public interface TaskQuery extends BaseQuery { * @return the query */ TaskQuery orderByWorkbasketName(SortDirection sortDirection); + + /** + * This method locks the returned rows until the end of the transaction using the FOR UPDATE lock. + * It cannot be used together with selectAndClaim. + * + * @param lockResults determines the number of returned and locked results; + * if zero, no results are locked, but the number of returned results is not + * limited + * @return the query + */ + TaskQuery lockResultsEquals(Integer lockResults); } diff --git a/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskService.java b/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskService.java index d83214238..da3047440 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskService.java +++ b/lib/taskana-core/src/main/java/pro/taskana/task/api/TaskService.java @@ -191,6 +191,8 @@ public interface TaskService { /** * Selects and claims the first {@linkplain Task} which is returned by the {@linkplain TaskQuery}. + * It cannot be used together with the {@linkplain TaskQuery#lockResultsEquals(Integer)} + * parameter of the query. * * @param taskQuery the {@linkplain TaskQuery} * @return the {@linkplain Task} that got selected and claimed diff --git a/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQueryImpl.java b/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQueryImpl.java index 1f2704368..f1d00072c 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQueryImpl.java +++ b/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQueryImpl.java @@ -337,6 +337,7 @@ public class TaskQueryImpl implements TaskQuery { private CallbackState[] callbackStateNotIn; private WildcardSearchField[] wildcardSearchFieldIn; private String wildcardSearchValueLike; + private Integer lockResults; TaskQueryImpl(InternalTaskanaEngine taskanaEngine) { this.taskanaEngine = taskanaEngine; @@ -345,6 +346,7 @@ public class TaskQueryImpl implements TaskQuery { this.orderByInner = new ArrayList<>(); this.filterByAccessIdIn = true; this.withoutAttachment = false; + this.lockResults = 0; this.joinWithUserInfo = taskanaEngine.getEngine().getConfiguration().isAddAdditionalUserInfo(); } @@ -2076,6 +2078,7 @@ public class TaskQueryImpl implements TaskQuery { TaskSummary result; try { taskanaEngine.openConnection(); + checkForIllegalParamCombinations(); checkOpenReadAndReadTasksPermissionForSpecifiedWorkbaskets(); setupAccessIds(); setupJoinAndOrderParameters(); @@ -2116,11 +2119,17 @@ public class TaskQueryImpl implements TaskQuery { return this; } + public TaskQuery lockResultsEquals(Integer lockResults) { + this.lockResults = lockResults; + return this; + } + // optimized query for db2 can't be used for now in case of selectAndClaim because of temporary // tables and the "for update" clause clashing in db2 private String getLinkToMapperScript() { if (DB.DB2 == getDB() && !selectAndClaim + && lockResults == 0 && taskanaEngine.getEngine().getConfiguration().isUseSpecificDb2Taskquery()) { return LINK_TO_MAPPER_DB2; } else if (selectAndClaim && DB.ORACLE == getDB()) { @@ -2159,6 +2168,11 @@ public class TaskQueryImpl implements TaskQuery { "The params \"wildcardSearchFieldIn\" and \"wildcardSearchValueLike\"" + " must be used together!"); } + if (selectAndClaim && lockResults != null && lockResults != 0) { + throw new IllegalArgumentException( + "The params \"lockResultsEquals\" and \"selectAndClaim\"" + + " cannot be used together!"); + } if (withoutAttachment && (attachmentChannelIn != null || attachmentChannelLike != null @@ -2810,8 +2824,8 @@ public class TaskQueryImpl implements TaskQuery { + Arrays.toString(wildcardSearchFieldIn) + ", wildcardSearchValueLike=" + wildcardSearchValueLike - + ", joinWithUserInfo=" - + joinWithUserInfo + + ", lockResults=" + + lockResults + "]"; } } diff --git a/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQuerySqlProvider.java b/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQuerySqlProvider.java index 867994f95..8e72ca316 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQuerySqlProvider.java +++ b/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskQuerySqlProvider.java @@ -72,9 +72,18 @@ public class TaskQuerySqlProvider { + " " + "FETCH FIRST ROW ONLY FOR UPDATE " + "" - + "WITH RS USE " + + " " + + "FETCH FIRST ${lockResults} ROWS ONLY FOR UPDATE " + + "" + + "SKIP LOCKED " + + "" + + "" + + "SKIP LOCKED DATA " + + "" + + "" + + "WITH RS USE " + "AND KEEP UPDATE LOCKS " - + "WITH UR " + + "WITH UR " + CLOSING_SCRIPT_TAG; } @@ -143,10 +152,7 @@ public class TaskQuerySqlProvider { + "" + "ORDER BY ${item}" + " " - + "" - + "FETCH FIRST ROW ONLY FOR UPDATE WITH RS USE AND KEEP UPDATE LOCKS" - + "" - + " with UR" + + "with UR " + CLOSING_SCRIPT_TAG; } diff --git a/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskServiceImpl.java b/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskServiceImpl.java index 29f4c7c56..fc0c643ae 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskServiceImpl.java +++ b/lib/taskana-core/src/main/java/pro/taskana/task/internal/TaskServiceImpl.java @@ -662,6 +662,8 @@ public class TaskServiceImpl implements TaskService { Optional.ofNullable(taskQuery.single()) .map(TaskSummary::getId) .map(wrap(this::claim))); + } catch (IllegalArgumentException e) { + throw e; } catch (Exception e) { return Optional.empty(); }