Closes #2599 - add locking to Query to enable adapter clustering

This commit is contained in:
ryzheboka 2024-06-25 14:00:36 +02:00 committed by Jörg Heffner
parent 944c73f9c1
commit 9280ff626a
7 changed files with 160 additions and 8 deletions

View File

@ -72,6 +72,12 @@
<version>${version.equalsverifier}</version> <version>${version.equalsverifier}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>pro.taskana</groupId>
<artifactId>taskana-common-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -2,16 +2,23 @@ package acceptance.task.query;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static pro.taskana.task.api.CallbackState.CALLBACK_PROCESSING_REQUIRED; import static pro.taskana.task.api.CallbackState.CALLBACK_PROCESSING_REQUIRED;
import static pro.taskana.testapi.DefaultTestEntities.defaultTestClassification; import static pro.taskana.testapi.DefaultTestEntities.defaultTestClassification;
import static pro.taskana.testapi.DefaultTestEntities.defaultTestObjectReference; import static pro.taskana.testapi.DefaultTestEntities.defaultTestObjectReference;
import static pro.taskana.testapi.DefaultTestEntities.defaultTestWorkbasket; import static pro.taskana.testapi.DefaultTestEntities.defaultTestWorkbasket;
import java.security.PrivilegedAction;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.security.auth.Subject;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.DynamicTest;
@ -26,8 +33,13 @@ import pro.taskana.classification.api.models.ClassificationSummary;
import pro.taskana.common.api.IntInterval; import pro.taskana.common.api.IntInterval;
import pro.taskana.common.api.KeyDomain; import pro.taskana.common.api.KeyDomain;
import pro.taskana.common.api.TimeInterval; import pro.taskana.common.api.TimeInterval;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.security.CurrentUserContext; import pro.taskana.common.api.security.CurrentUserContext;
import pro.taskana.common.api.security.UserPrincipal;
import pro.taskana.common.internal.InternalTaskanaEngine;
import pro.taskana.common.internal.util.CheckedConsumer;
import pro.taskana.common.internal.util.Pair; import pro.taskana.common.internal.util.Pair;
import pro.taskana.common.test.util.ParallelThreadHelper;
import pro.taskana.task.api.CallbackState; import pro.taskana.task.api.CallbackState;
import pro.taskana.task.api.TaskCustomField; import pro.taskana.task.api.TaskCustomField;
import pro.taskana.task.api.TaskCustomIntField; import pro.taskana.task.api.TaskCustomIntField;
@ -54,6 +66,7 @@ import pro.taskana.workbasket.api.models.WorkbasketSummary;
class TaskQueryImplAccTest { class TaskQueryImplAccTest {
@TaskanaInject TaskService taskService; @TaskanaInject TaskService taskService;
@TaskanaInject InternalTaskanaEngine internalTaskanaEngine;
@TaskanaInject WorkbasketService workbasketService; @TaskanaInject WorkbasketService workbasketService;
@TaskanaInject CurrentUserContext currentUserContext; @TaskanaInject CurrentUserContext currentUserContext;
@TaskanaInject ClassificationService classificationService; @TaskanaInject ClassificationService classificationService;
@ -98,6 +111,104 @@ class TaskQueryImplAccTest {
.buildAndStore(workbasketService, "businessadmin"); .buildAndStore(workbasketService, "businessadmin");
} }
@Nested
@TestInstance(Lifecycle.PER_CLASS)
class LockResultsEqualsTest {
private static final Integer LOCK_RESULTS_EQUALS = 2;
WorkbasketSummary wb1;
TaskSummary taskSummary1;
TaskSummary taskSummary2;
TaskSummary taskSummary3;
TaskSummary taskSummary4;
@WithAccessId(user = "user-1-1")
@BeforeAll
void setup() throws Exception {
wb1 = createWorkbasketWithPermission();
taskSummary1 = taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
taskSummary2 = taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
taskSummary3 =
taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
taskSummary4 = taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
}
@Test
void should_ReturnDifferentTasks_For_LockResultsEqualsTwo() throws Exception {
if (System.getenv("DB") != null && (System.getenv("DB").equals("POSTGRES")
|| System.getenv("DB").equals("DB2"))) {
List<TaskSummary> returnedTasks = Collections.synchronizedList(new ArrayList<>());
List<String> accessIds =
Collections.synchronizedList(
Stream.of("admin", "admin")
.collect(Collectors.toList()));
ParallelThreadHelper.runInThread(
getRunnableTest(returnedTasks, accessIds), accessIds.size());
assertThat(returnedTasks)
.extracting(TaskSummary::getId)
.containsExactlyInAnyOrder(
taskSummary1.getId(), taskSummary2.getId(), taskSummary3.getId(),
taskSummary4.getId());
}
}
@Test
void should_ThrowException_When_UsingLockResultsWithSelectAndClaim() {
ThrowingCallable call = () -> taskService.selectAndClaim(taskService
.createTaskQuery()
.workbasketIdIn(wb1.getId())
.stateIn(TaskState.READY)
.lockResultsEquals(LOCK_RESULTS_EQUALS));
IllegalArgumentException e = catchThrowableOfType(IllegalArgumentException.class, call);
assertThat(e).isNotNull();
assertThat(e.getMessage()).isEqualTo("The params \"lockResultsEquals\" and "
+ "\"selectAndClaim\" cannot be used together!");
}
private Runnable getRunnableTest(List<TaskSummary> listedTasks, List<String> accessIds) {
return () -> {
Subject subject = new Subject();
subject.getPrincipals().add(new UserPrincipal(accessIds.remove(0)));
Consumer<TaskService> consumer =
CheckedConsumer.wrap(
taskService -> {
internalTaskanaEngine.executeInDatabaseConnection(() -> {
List<TaskSummary> results = taskService
.createTaskQuery()
.workbasketIdIn(wb1.getId())
.stateIn(TaskState.READY)
.lockResultsEquals(LOCK_RESULTS_EQUALS).list();
listedTasks.addAll(results);
for (TaskSummary task : results) {
try {
taskService.claim(task.getId());
} catch (Exception e) {
throw new SystemException(e.getMessage());
}
}
});
});
PrivilegedAction<Void> action =
() -> {
consumer.accept(taskService);
return null;
};
Subject.doAs(subject, action);
};
}
}
@Nested @Nested
@TestInstance(Lifecycle.PER_CLASS) @TestInstance(Lifecycle.PER_CLASS)
class PermissionsTest { class PermissionsTest {

View File

@ -1905,4 +1905,15 @@ public interface TaskQuery extends BaseQuery<TaskSummary, TaskQueryColumnName> {
* @return the query * @return the query
*/ */
TaskQuery orderByWorkbasketName(SortDirection sortDirection); TaskQuery orderByWorkbasketName(SortDirection sortDirection);
/**
* This method locks the returned rows until the end of the transaction using the FOR UPDATE lock.
* It cannot be used together with selectAndClaim.
*
* @param lockResults determines the number of returned and locked results;
* if zero, no results are locked, but the number of returned results is not
* limited
* @return the query
*/
TaskQuery lockResultsEquals(Integer lockResults);
} }

View File

@ -191,6 +191,8 @@ public interface TaskService {
/** /**
* Selects and claims the first {@linkplain Task} which is returned by the {@linkplain TaskQuery}. * Selects and claims the first {@linkplain Task} which is returned by the {@linkplain TaskQuery}.
* It cannot be used together with the {@linkplain TaskQuery#lockResultsEquals(Integer)}
* parameter of the query.
* *
* @param taskQuery the {@linkplain TaskQuery} * @param taskQuery the {@linkplain TaskQuery}
* @return the {@linkplain Task} that got selected and claimed * @return the {@linkplain Task} that got selected and claimed

View File

@ -337,6 +337,7 @@ public class TaskQueryImpl implements TaskQuery {
private CallbackState[] callbackStateNotIn; private CallbackState[] callbackStateNotIn;
private WildcardSearchField[] wildcardSearchFieldIn; private WildcardSearchField[] wildcardSearchFieldIn;
private String wildcardSearchValueLike; private String wildcardSearchValueLike;
private Integer lockResults;
TaskQueryImpl(InternalTaskanaEngine taskanaEngine) { TaskQueryImpl(InternalTaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine; this.taskanaEngine = taskanaEngine;
@ -345,6 +346,7 @@ public class TaskQueryImpl implements TaskQuery {
this.orderByInner = new ArrayList<>(); this.orderByInner = new ArrayList<>();
this.filterByAccessIdIn = true; this.filterByAccessIdIn = true;
this.withoutAttachment = false; this.withoutAttachment = false;
this.lockResults = 0;
this.joinWithUserInfo = taskanaEngine.getEngine().getConfiguration().isAddAdditionalUserInfo(); this.joinWithUserInfo = taskanaEngine.getEngine().getConfiguration().isAddAdditionalUserInfo();
} }
@ -2076,6 +2078,7 @@ public class TaskQueryImpl implements TaskQuery {
TaskSummary result; TaskSummary result;
try { try {
taskanaEngine.openConnection(); taskanaEngine.openConnection();
checkForIllegalParamCombinations();
checkOpenReadAndReadTasksPermissionForSpecifiedWorkbaskets(); checkOpenReadAndReadTasksPermissionForSpecifiedWorkbaskets();
setupAccessIds(); setupAccessIds();
setupJoinAndOrderParameters(); setupJoinAndOrderParameters();
@ -2116,11 +2119,17 @@ public class TaskQueryImpl implements TaskQuery {
return this; return this;
} }
public TaskQuery lockResultsEquals(Integer lockResults) {
this.lockResults = lockResults;
return this;
}
// optimized query for db2 can't be used for now in case of selectAndClaim because of temporary // optimized query for db2 can't be used for now in case of selectAndClaim because of temporary
// tables and the "for update" clause clashing in db2 // tables and the "for update" clause clashing in db2
private String getLinkToMapperScript() { private String getLinkToMapperScript() {
if (DB.DB2 == getDB() if (DB.DB2 == getDB()
&& !selectAndClaim && !selectAndClaim
&& lockResults == 0
&& taskanaEngine.getEngine().getConfiguration().isUseSpecificDb2Taskquery()) { && taskanaEngine.getEngine().getConfiguration().isUseSpecificDb2Taskquery()) {
return LINK_TO_MAPPER_DB2; return LINK_TO_MAPPER_DB2;
} else if (selectAndClaim && DB.ORACLE == getDB()) { } else if (selectAndClaim && DB.ORACLE == getDB()) {
@ -2159,6 +2168,11 @@ public class TaskQueryImpl implements TaskQuery {
"The params \"wildcardSearchFieldIn\" and \"wildcardSearchValueLike\"" "The params \"wildcardSearchFieldIn\" and \"wildcardSearchValueLike\""
+ " must be used together!"); + " must be used together!");
} }
if (selectAndClaim && lockResults != null && lockResults != 0) {
throw new IllegalArgumentException(
"The params \"lockResultsEquals\" and \"selectAndClaim\""
+ " cannot be used together!");
}
if (withoutAttachment if (withoutAttachment
&& (attachmentChannelIn != null && (attachmentChannelIn != null
|| attachmentChannelLike != null || attachmentChannelLike != null
@ -2810,8 +2824,8 @@ public class TaskQueryImpl implements TaskQuery {
+ Arrays.toString(wildcardSearchFieldIn) + Arrays.toString(wildcardSearchFieldIn)
+ ", wildcardSearchValueLike=" + ", wildcardSearchValueLike="
+ wildcardSearchValueLike + wildcardSearchValueLike
+ ", joinWithUserInfo=" + ", lockResults="
+ joinWithUserInfo + lockResults
+ "]"; + "]";
} }
} }

View File

@ -72,9 +72,18 @@ public class TaskQuerySqlProvider {
+ "<if test='selectAndClaim == true'> " + "<if test='selectAndClaim == true'> "
+ "FETCH FIRST ROW ONLY FOR UPDATE " + "FETCH FIRST ROW ONLY FOR UPDATE "
+ "</if>" + "</if>"
+ "<if test=\"_databaseId == 'db2' and selectAndClaim \">WITH RS USE " + "<if test='lockResults and lockResults != 0'> "
+ "FETCH FIRST ${lockResults} ROWS ONLY FOR UPDATE "
+ "<if test=\"_databaseId == 'postgres'\">"
+ "SKIP LOCKED "
+ "</if>"
+ "<if test=\"_databaseId == 'db2'\">"
+ "SKIP LOCKED DATA "
+ "</if>"
+ "</if>"
+ "<if test=\"_databaseId == 'db2' and (selectAndClaim or lockResults != 0) \">WITH RS USE "
+ "AND KEEP UPDATE LOCKS </if>" + "AND KEEP UPDATE LOCKS </if>"
+ "<if test=\"_databaseId == 'db2' and !selectAndClaim \">WITH UR </if>" + "<if test=\"_databaseId == 'db2' and !selectAndClaim and lockResults==0 \">WITH UR </if>"
+ CLOSING_SCRIPT_TAG; + CLOSING_SCRIPT_TAG;
} }
@ -143,10 +152,7 @@ public class TaskQuerySqlProvider {
+ "<if test='!orderByOuter.isEmpty()'>" + "<if test='!orderByOuter.isEmpty()'>"
+ "ORDER BY <foreach item='item' collection='orderByOuter' separator=',' >${item}</foreach>" + "ORDER BY <foreach item='item' collection='orderByOuter' separator=',' >${item}</foreach>"
+ "</if> " + "</if> "
+ "<if test='selectAndClaim == true'>" + "with UR "
+ "FETCH FIRST ROW ONLY FOR UPDATE WITH RS USE AND KEEP UPDATE LOCKS"
+ "</if>"
+ "<if test='selectAndClaim == false'> with UR</if>"
+ CLOSING_SCRIPT_TAG; + CLOSING_SCRIPT_TAG;
} }

View File

@ -662,6 +662,8 @@ public class TaskServiceImpl implements TaskService {
Optional.ofNullable(taskQuery.single()) Optional.ofNullable(taskQuery.single())
.map(TaskSummary::getId) .map(TaskSummary::getId)
.map(wrap(this::claim))); .map(wrap(this::claim)));
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) { } catch (Exception e) {
return Optional.empty(); return Optional.empty();
} }