TSK-1649: implemented database lock for resolution of jobs to run

This commit is contained in:
Mustapha Zorgati 2021-08-02 04:16:58 +02:00
parent eaed4e9613
commit 55d21a9e8b
37 changed files with 556 additions and 498 deletions

View File

@ -1,12 +0,0 @@
package pro.taskana.common.internal.transaction;
/**
* represents a callable Object.
*
* @param <T> the type of the returned objects.
*/
@FunctionalInterface
public interface TaskanaCallable<T> {
T call();
}

View File

@ -1,12 +1,27 @@
package pro.taskana.common.internal.transaction;
/**
* This class provides support for transactions.
*
* @param <T> the type of the returned objects.
*/
@FunctionalInterface
public interface TaskanaTransactionProvider<T> {
import java.util.function.Supplier;
T executeInTransaction(TaskanaCallable<T> action);
/** This functional interface provides support for transactions. */
@FunctionalInterface
public interface TaskanaTransactionProvider {
<T> T executeInTransaction(Supplier<T> supplier);
static <T> T executeInTransactionIfPossible(
TaskanaTransactionProvider transactionProvider, Supplier<T> supplier) {
return transactionProvider != null
? transactionProvider.executeInTransaction(supplier)
: supplier.get();
}
static void executeInTransactionIfPossible(
TaskanaTransactionProvider transactionProvider, Runnable runnable) {
executeInTransactionIfPossible(
transactionProvider,
() -> {
runnable.run();
return null;
});
}
}

View File

@ -65,9 +65,9 @@ public class TaskanaHistoryEngineImpl implements TaskanaHistoryEngine {
if (taskanaHistoryService == null) {
SimpleHistoryServiceImpl historyService = new SimpleHistoryServiceImpl();
historyService.initialize(taskanaEngine);
this.taskanaHistoryService = historyService;
taskanaHistoryService = historyService;
}
return this.taskanaHistoryService;
return taskanaHistoryService;
}
public boolean isUserInRole(TaskanaRole... roles) {

View File

@ -4,12 +4,6 @@ import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@ -21,7 +15,6 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.TaskanaEngineConfiguration;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.ScheduledJob.Type;
import pro.taskana.common.api.TaskanaEngine;
@ -29,7 +22,6 @@ import pro.taskana.common.api.TimeInterval;
import pro.taskana.common.api.exceptions.InvalidArgumentException;
import pro.taskana.common.api.exceptions.NotAuthorizedException;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
@ -43,34 +35,33 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryCleanupJob.class);
private static final String TASKANA_PROPERTIES = "/taskana.properties";
private static final String TASKANA_JOB_HISTORY_BATCH_SIZE = "taskana.jobs.history.batchSize";
private static final String TASKANA_JOB_HISTORY_CLEANUP_MINIMUM_AGE =
"taskana.jobs.history.cleanup.minimumAge";
private final boolean allCompletedSameParentBusiness;
TaskanaHistoryEngineImpl taskanaHistoryEngine =
private final TaskanaHistoryEngineImpl taskanaHistoryEngine =
TaskanaHistoryEngineImpl.createTaskanaEngine(taskanaEngineImpl);
private Duration minimumAge = Duration.parse("P14D");
private int batchSize = 100;
private final boolean allCompletedSameParentBusiness;
private Duration minimumAge = taskanaEngineImpl.getConfiguration().getCleanupJobMinimumAge();
private int batchSize =
taskanaEngineImpl.getConfiguration().getMaxNumberOfUpdatesPerTransaction();
public HistoryCleanupJob(
TaskanaEngine taskanaEngine,
TaskanaTransactionProvider<Object> txProvider,
TaskanaTransactionProvider txProvider,
ScheduledJob scheduledJob) {
super(taskanaEngine, txProvider, scheduledJob);
super(taskanaEngine, txProvider, scheduledJob, true);
allCompletedSameParentBusiness =
taskanaEngine.getConfiguration().isTaskCleanupJobAllCompletedSameParentBusiness();
Properties props = readPropertiesFromFile(TASKANA_PROPERTIES);
Properties props = taskanaEngine.getConfiguration().readPropertiesFromFile();
initJobParameters(props);
}
@Override
public void run() throws TaskanaException {
public void execute() {
Instant createdBefore = Instant.now().minus(minimumAge);
@ -136,8 +127,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
"Job ended successfully. {} history events deleted.", totalNumberOfHistoryEventsDeleted);
} catch (Exception e) {
throw new SystemException("Error while processing HistoryCleanupJob.", e);
} finally {
scheduleNextCleanupJob();
}
}
@ -149,15 +138,20 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
*/
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();
jobService.deleteJobs(Type.HISTORYCLEANUPJOB);
HistoryCleanupJob job = new HistoryCleanupJob(taskanaEngine, null, null);
job.scheduleNextCleanupJob();
jobService.deleteJobs(job.getType());
job.scheduleNextJob();
}
@Override
protected Type getType() {
return Type.HISTORY_CLEANUP_JOB;
}
private List<String> filterSameParentBusinessHistoryEventsQualifiedToClean(
List<TaskHistoryEvent> historyEventCandidatesToClean) {
Map<String, Map<String, List<String>>> historyEventsGroupedByParentBusinessProcessIdAndType =
Map<String, Map<String, List<String>>> taskHistoryIdsByEventTypeByParentBusinessProcessId =
historyEventCandidatesToClean.stream()
.collect(
groupingBy(
@ -168,54 +162,32 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
List<String> taskIdsToDeleteHistoryEventsFor = new ArrayList<>();
historyEventsGroupedByParentBusinessProcessIdAndType
.entrySet()
.forEach(
idsOfTasksInSameParentBusinessProcessGroupedByType -> {
if (idsOfTasksInSameParentBusinessProcessGroupedByType
.getValue()
.get(TaskHistoryEventType.CREATED.getName())
.size()
== idsOfTasksInSameParentBusinessProcessGroupedByType
.getValue()
.entrySet()
.stream()
.filter(
entry -> !entry.getKey().equals(TaskHistoryEventType.CREATED.getName()))
.mapToInt(stringListEntry -> stringListEntry.getValue().size())
.sum()) {
taskIdsToDeleteHistoryEventsFor.addAll(
idsOfTasksInSameParentBusinessProcessGroupedByType
.getValue()
.get(TaskHistoryEventType.CREATED.getName()));
}
});
taskHistoryIdsByEventTypeByParentBusinessProcessId.forEach(
(parentBusinessProcessId, taskHistoryIdsByEventType) -> {
if (taskHistoryIdsByEventType.get(TaskHistoryEventType.CREATED.getName()).size()
== taskHistoryIdsByEventType.entrySet().stream()
.filter(entry -> !entry.getKey().equals(TaskHistoryEventType.CREATED.getName()))
.mapToInt(stringListEntry -> stringListEntry.getValue().size())
.sum()) {
taskIdsToDeleteHistoryEventsFor.addAll(
taskHistoryIdsByEventType.get(TaskHistoryEventType.CREATED.getName()));
}
});
return taskIdsToDeleteHistoryEventsFor;
}
private int deleteHistoryEventsTransactionally(List<String> taskIdsToDeleteHistoryEventsFor) {
int deletedEventsCount = 0;
if (txProvider != null) {
return (int)
txProvider.executeInTransaction(
() -> {
try {
return deleteEvents(taskIdsToDeleteHistoryEventsFor);
} catch (Exception e) {
LOGGER.warn("Could not delete history events.", e);
return 0;
}
});
} else {
try {
deletedEventsCount = deleteEvents(taskIdsToDeleteHistoryEventsFor);
} catch (Exception e) {
LOGGER.warn("Could not delete history events.", e);
}
}
return deletedEventsCount;
return TaskanaTransactionProvider.executeInTransactionIfPossible(
txProvider,
() -> {
try {
return deleteEvents(taskIdsToDeleteHistoryEventsFor);
} catch (Exception e) {
LOGGER.warn("Could not delete history events.", e);
return 0;
}
});
}
private int deleteEvents(List<String> taskIdsToDeleteHistoryEventsFor)
@ -223,12 +195,11 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
SimpleHistoryServiceImpl simpleHistoryService =
(SimpleHistoryServiceImpl) taskanaHistoryEngine.getTaskanaHistoryService();
String[] taskIdsArray = new String[taskIdsToDeleteHistoryEventsFor.size()];
int deletedTasksCount =
(int)
simpleHistoryService
.createTaskHistoryQuery()
.taskIdIn(taskIdsToDeleteHistoryEventsFor.toArray(taskIdsArray))
.taskIdIn(taskIdsToDeleteHistoryEventsFor.toArray(new String[0]))
.count();
simpleHistoryService.deleteHistoryEventsByTaskIds(taskIdsToDeleteHistoryEventsFor);
@ -240,13 +211,6 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
return deletedTasksCount;
}
private void scheduleNextCleanupJob() {
ScheduledJob job = new ScheduledJob();
job.setType(Type.HISTORYCLEANUPJOB);
job.setDue(getNextDueForCleanupJob());
taskanaEngineImpl.getJobService().createJob(job);
}
private void initJobParameters(Properties props) {
String jobBatchSizeProperty = props.getProperty(TASKANA_JOB_HISTORY_BATCH_SIZE);
@ -284,45 +248,4 @@ public class HistoryCleanupJob extends AbstractTaskanaJob {
minimumAge);
}
}
private Properties readPropertiesFromFile(String propertiesFile) {
Properties props = new Properties();
boolean loadFromClasspath = loadFromClasspath(propertiesFile);
try {
if (loadFromClasspath) {
InputStream inputStream =
TaskanaEngineConfiguration.class.getResourceAsStream(propertiesFile);
if (inputStream == null) {
LOGGER.error("taskana properties file {} was not found on classpath.", propertiesFile);
} else {
props.load(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"taskana properties were loaded from file {} from classpath.", propertiesFile);
}
}
} else {
try (FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
props.load(fileInputStream);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("taskana properties were loaded from file {}.", propertiesFile);
}
}
}
} catch (IOException e) {
LOGGER.error("caught IOException when processing properties file {}.", propertiesFile);
throw new SystemException(
"internal System error when processing properties file " + propertiesFile, e.getCause());
}
return props;
}
private boolean loadFromClasspath(String propertiesFile) {
boolean loadFromClasspath = true;
File f = new File(propertiesFile);
if (f.exists() && !f.isDirectory()) {
loadFromClasspath = false;
}
return loadFromClasspath;
}
}

View File

@ -378,11 +378,11 @@ class HistoryCleanupJobAccTest extends AbstractAccTest {
for (int i = 0; i < 10; i++) {
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.HISTORYCLEANUPJOB);
job.setType(ScheduledJob.Type.HISTORY_CLEANUP_JOB);
taskanaEngine.getJobService().createJob(job);
job.setType(Type.UPDATETASKSJOB);
job.setType(Type.TASK_REFRESH_JOB);
taskanaEngine.getJobService().createJob(job);
job.setType(Type.CLASSIFICATIONCHANGEDJOB);
job.setType(Type.CLASSIFICATION_CHANGED_JOB);
taskanaEngine.getJobService().createJob(job);
}
@ -392,7 +392,7 @@ class HistoryCleanupJobAccTest extends AbstractAccTest {
List<ScheduledJob> historyCleanupJobs =
jobsToRun.stream()
.filter(scheduledJob -> scheduledJob.getType().equals(Type.HISTORYCLEANUPJOB))
.filter(scheduledJob -> scheduledJob.getType().equals(Type.HISTORY_CLEANUP_JOB))
.collect(Collectors.toList());
HistoryCleanupJob.initializeSchedule(taskanaEngine);

View File

@ -315,7 +315,7 @@ public class ClassificationQueryImpl implements ClassificationQuery {
@Override
public List<ClassificationSummary> list() {
return taskanaEngine.openAndReturnConnection(
return taskanaEngine.executeInDatabaseConnection(
() -> taskanaEngine.getSqlSession().selectList(LINK_TO_SUMMARYMAPPER, this));
}

View File

@ -585,7 +585,7 @@ public class ClassificationServiceImpl implements ClassificationService {
args.put(ClassificationChangedJob.SERVICE_LEVEL_CHANGED, String.valueOf(serviceLevelChanged));
ScheduledJob job = new ScheduledJob();
job.setArguments(args);
job.setType(ScheduledJob.Type.CLASSIFICATIONCHANGEDJOB);
job.setType(ScheduledJob.Type.CLASSIFICATION_CHANGED_JOB);
taskanaEngine.getEngine().getJobService().createJob(job);
}
}

View File

@ -1,5 +1,6 @@
package pro.taskana.classification.internal.jobs;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -7,28 +8,33 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.ScheduledJob.Type;
import pro.taskana.common.api.TaskanaEngine;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
import pro.taskana.common.internal.util.CollectionUtil;
import pro.taskana.task.internal.TaskServiceImpl;
/** This class executes a job of type CLASSIFICATIONCHANGEDJOB. */
/**
* This class executes a job of type {@linkplain
* pro.taskana.common.api.ScheduledJob.Type#CLASSIFICATION_CHANGED_JOB}.
*/
public class ClassificationChangedJob extends AbstractTaskanaJob {
public static final String TASK_IDS = "taskIds";
public static final String CLASSIFICATION_ID = "classificationId";
public static final String PRIORITY_CHANGED = "priorityChanged";
public static final String SERVICE_LEVEL_CHANGED = "serviceLevelChanged";
private static final Logger LOGGER = LoggerFactory.getLogger(ClassificationChangedJob.class);
private static final String TASK_IDS = "taskIds";
private final String classificationId;
private final boolean priorityChanged;
private final boolean serviceLevelChanged;
public ClassificationChangedJob(
TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider, ScheduledJob job) {
super(engine, txProvider, job);
TaskanaEngine engine, TaskanaTransactionProvider txProvider, ScheduledJob job) {
super(engine, txProvider, job, false);
Map<String, String> args = job.getArguments();
classificationId = args.get(CLASSIFICATION_ID);
priorityChanged = Boolean.parseBoolean(args.get(PRIORITY_CHANGED));
@ -36,7 +42,7 @@ public class ClassificationChangedJob extends AbstractTaskanaJob {
}
@Override
public void run() throws TaskanaException {
public void execute() throws TaskanaException {
LOGGER.info("Running ClassificationChangedJob for classification ({})", classificationId);
try {
TaskServiceImpl taskService = (TaskServiceImpl) taskanaEngineImpl.getTaskService();
@ -51,9 +57,15 @@ public class ClassificationChangedJob extends AbstractTaskanaJob {
}
}
@Override
protected Type getType() {
return Type.CLASSIFICATION_CHANGED_JOB;
}
private void scheduleTaskRefreshJobs(List<String> affectedTaskIds) {
int batchSize = taskanaEngineImpl.getConfiguration().getMaxNumberOfUpdatesPerTransaction();
List<List<String>> affectedTaskBatches = partition(affectedTaskIds, batchSize);
Collection<List<String>> affectedTaskBatches =
CollectionUtil.partitionBasedOnSize(affectedTaskIds, batchSize);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Creating {} TaskRefreshJobs out of {} affected tasks "
@ -70,10 +82,31 @@ public class ClassificationChangedJob extends AbstractTaskanaJob {
args.put(PRIORITY_CHANGED, Boolean.toString(priorityChanged));
args.put(SERVICE_LEVEL_CHANGED, Boolean.toString(serviceLevelChanged));
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.UPDATETASKSJOB);
job.setType(ScheduledJob.Type.TASK_REFRESH_JOB);
job.setArguments(args);
taskanaEngineImpl.getJobService().createJob(job);
}
}
}
@Override
public String toString() {
return "ClassificationChangedJob [classificationId="
+ classificationId
+ ", priorityChanged="
+ priorityChanged
+ ", serviceLevelChanged="
+ serviceLevelChanged
+ ", firstRun="
+ firstRun
+ ", runEvery="
+ runEvery
+ ", taskanaEngineImpl="
+ taskanaEngineImpl
+ ", txProvider="
+ txProvider
+ ", scheduledJob="
+ scheduledJob
+ "]";
}
}

View File

@ -168,15 +168,15 @@ public class ScheduledJob {
FAILED
}
/** This enum controls the type of a job. */
/** This enum controls the type of jobs. */
public enum Type {
CLASSIFICATIONCHANGEDJOB(ClassificationChangedJob.class.getName()),
UPDATETASKSJOB(TaskRefreshJob.class.getName()),
TASKCLEANUPJOB(TaskCleanupJob.class.getName()),
WORKBASKETCLEANUPJOB(WorkbasketCleanupJob.class.getName()),
HISTORYCLEANUPJOB("pro.taskana.simplehistory.impl.jobs.HistoryCleanupJob");
CLASSIFICATION_CHANGED_JOB(ClassificationChangedJob.class.getName()),
TASK_REFRESH_JOB(TaskRefreshJob.class.getName()),
TASK_CLEANUP_JOB(TaskCleanupJob.class.getName()),
WORKBASKET_CLEANUP_JOB(WorkbasketCleanupJob.class.getName()),
HISTORY_CLEANUP_JOB("pro.taskana.simplehistory.impl.jobs.HistoryCleanupJob");
private String clazz;
private final String clazz;
Type(String clazz) {
this.clazz = clazz;

View File

@ -70,6 +70,13 @@ public interface TaskanaEngine {
*/
boolean isHistoryEnabled();
/**
* gets the current connection management mode.
*
* @return the current connection management mode.
*/
ConnectionManagementMode getConnectionManagementMode();
/**
* sets the connection management mode.
*
@ -117,8 +124,8 @@ public interface TaskanaEngine {
void checkRoleMembership(TaskanaRole... roles) throws NotAuthorizedException;
/**
* This method is supposed to skip further permission checks if we are already in a secured
* environment. With great power comes great responsibility.
* Executes a given supplier with admin privileges and thus skips further permission checks. With
* great power comes great responsibility.
*
* @param supplier will be executed with admin privileges
* @param <T> defined with the supplier return value
@ -126,6 +133,21 @@ public interface TaskanaEngine {
*/
<T> T runAsAdmin(Supplier<T> supplier);
/**
* Executes a given runnable with admin privileges and thus skips further permission checks. With
* great power comes great responsibility.
*
* @see #runAsAdmin(Supplier)
*/
@SuppressWarnings("checkstyle:JavadocMethod")
default void runAsAdmin(Runnable runnable) {
runAsAdmin(
() -> {
runnable.run();
return null;
});
}
/**
* Returns the CurrentUserContext class.
*

View File

@ -30,13 +30,27 @@ public interface InternalTaskanaEngine {
void returnConnection();
/**
* Executes the supplier after openConnection is called and then returns the connection.
* Executes the given supplier after openConnection is called and then returns the connection.
*
* @param supplier a function that returns something of type T
* @param <T> any type
* @return the result of the supplier
*/
<T> T openAndReturnConnection(Supplier<T> supplier);
<T> T executeInDatabaseConnection(Supplier<T> supplier);
/**
* Executes the given runnable after openConnection is called and then returns the connection.
*
* @see #executeInDatabaseConnection(Supplier)
*/
@SuppressWarnings("checkstyle:JavadocMethod")
default void executeInDatabaseConnection(Runnable runnable) {
executeInDatabaseConnection(
() -> {
runnable.run();
return null;
});
}
/** Initializes the SqlSessionManager. */
void initSqlSession();

View File

@ -36,11 +36,12 @@ public interface JobMapper {
Integer insertJob(@Param("job") ScheduledJob job);
@Select(
"<script> SELECT JOB_ID, PRIORITY, CREATED, DUE, STATE, LOCKED_BY, LOCK_EXPIRES, TYPE, RETRY_COUNT, ARGUMENTS "
"<script> SELECT JOB_ID, PRIORITY, CREATED, DUE, STATE, LOCKED_BY, LOCK_EXPIRES, TYPE, RETRY_COUNT, ARGUMENTS "
+ "FROM SCHEDULED_JOB "
+ "WHERE STATE IN ( 'READY') AND (DUE is null OR DUE &lt; #{now}) AND (LOCK_EXPIRES is null OR LOCK_EXPIRES &lt; #{now}) AND RETRY_COUNT > 0 "
+ "ORDER BY PRIORITY DESC "
+ "<if test=\"_databaseId == 'db2'\">with UR </if> "
+ "FOR UPDATE "
+ "<if test=\"_databaseId == 'db2'\">WITH RS USE AND KEEP UPDATE LOCKS </if> "
+ "</script>")
@Result(property = "jobId", column = "JOB_ID")
@Result(property = "priority", column = "PRIORITY")

View File

@ -1,5 +1,6 @@
package pro.taskana.common.internal;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
@ -12,12 +13,12 @@ import pro.taskana.common.api.ScheduledJob.Type;
/** Controls all job activities. */
public class JobServiceImpl implements JobService {
public static final Integer JOB_DEFAULT_PRIORITY = 50;
public static final long DEFAULT_LOCK_EXPIRATION_PERIOD = 60000;
public static final int JOB_DEFAULT_PRIORITY = 50;
private static final Duration JOB_DEFAULT_LOCK_EXPIRATION_PERIOD = Duration.ofSeconds(60);
private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
private JobMapper jobMapper;
private InternalTaskanaEngine taskanaEngineImpl;
private final JobMapper jobMapper;
private final InternalTaskanaEngine taskanaEngineImpl;
public JobServiceImpl(InternalTaskanaEngine taskanaEngine, JobMapper jobMapper) {
this.taskanaEngineImpl = taskanaEngine;
@ -26,85 +27,60 @@ public class JobServiceImpl implements JobService {
@Override
public ScheduledJob createJob(ScheduledJob job) {
try {
taskanaEngineImpl.openConnection();
job = initializeJobDefault(job);
Integer jobId = jobMapper.insertJob(job);
job.setJobId(jobId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Created job {}", job);
}
} finally {
taskanaEngineImpl.returnConnection();
initializeDefaultJobProperties(job);
Integer id = taskanaEngineImpl.executeInDatabaseConnection(() -> jobMapper.insertJob(job));
job.setJobId(id);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Created job {}", job);
}
return job;
}
public void deleteJobs(Type jobType) {
try {
taskanaEngineImpl.openConnection();
jobMapper.deleteMultiple(jobType);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted jobs of type: {}", jobType);
}
} finally {
taskanaEngineImpl.returnConnection();
taskanaEngineImpl.executeInDatabaseConnection(() -> jobMapper.deleteMultiple(jobType));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted jobs of type: {}", jobType);
}
}
public ScheduledJob lockJob(ScheduledJob job, String owner) {
try {
taskanaEngineImpl.openConnection();
job.setLockedBy(owner);
job.setLockExpires(Instant.now().plusMillis(DEFAULT_LOCK_EXPIRATION_PERIOD));
job.setRetryCount(job.getRetryCount() - 1);
jobMapper.update(job);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job {} locked. Remaining retries: {}", job.getJobId(), job.getRetryCount());
}
} finally {
taskanaEngineImpl.returnConnection();
job.setLockedBy(owner);
job.setLockExpires(Instant.now().plus(JOB_DEFAULT_LOCK_EXPIRATION_PERIOD));
job.setRetryCount(job.getRetryCount() - 1);
taskanaEngineImpl.executeInDatabaseConnection(() -> jobMapper.update(job));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job {} locked. Remaining retries: {}", job.getJobId(), job.getRetryCount());
}
return job;
}
public List<ScheduledJob> findJobsToRun() {
List<ScheduledJob> availableJobs;
try {
taskanaEngineImpl.openConnection();
availableJobs = jobMapper.findJobsToRun(Instant.now());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Found available jobs: {}", availableJobs);
}
} finally {
taskanaEngineImpl.returnConnection();
List<ScheduledJob> availableJobs =
taskanaEngineImpl.executeInDatabaseConnection(() -> jobMapper.findJobsToRun(Instant.now()));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Found available jobs: {}", availableJobs);
}
return availableJobs;
}
public void deleteJob(ScheduledJob job) {
try {
taskanaEngineImpl.openConnection();
jobMapper.delete(job);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted job: {}", job);
}
} finally {
taskanaEngineImpl.returnConnection();
taskanaEngineImpl.executeInDatabaseConnection(() -> jobMapper.delete(job));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted job: {}", job);
}
}
private ScheduledJob initializeJobDefault(ScheduledJob job) {
job.setCreated(Instant.now());
private void initializeDefaultJobProperties(ScheduledJob job) {
Instant now = Instant.now();
job.setCreated(now);
job.setState(ScheduledJob.State.READY);
job.setPriority(JOB_DEFAULT_PRIORITY);
if (job.getDue() == null) {
job.setDue(Instant.now());
job.setDue(now);
}
job.setRetryCount(taskanaEngineImpl.getEngine().getConfiguration().getMaxNumberOfJobRetries());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job after initialization: {}", job);
}
return job;
}
}

View File

@ -115,44 +115,39 @@ public class TaskanaEngineImpl implements TaskanaEngine {
@Override
public TaskService getTaskService() {
SqlSession session = this.sessionManager;
return new TaskServiceImpl(
internalTaskanaEngineImpl,
session.getMapper(TaskMapper.class),
session.getMapper(TaskCommentMapper.class),
session.getMapper(AttachmentMapper.class));
sessionManager.getMapper(TaskMapper.class),
sessionManager.getMapper(TaskCommentMapper.class),
sessionManager.getMapper(AttachmentMapper.class));
}
@Override
public MonitorService getMonitorService() {
SqlSession session = this.sessionManager;
return new MonitorServiceImpl(
internalTaskanaEngineImpl, session.getMapper(MonitorMapper.class));
internalTaskanaEngineImpl, sessionManager.getMapper(MonitorMapper.class));
}
@Override
public WorkbasketService getWorkbasketService() {
SqlSession session = this.sessionManager;
return new WorkbasketServiceImpl(
internalTaskanaEngineImpl,
session.getMapper(WorkbasketMapper.class),
session.getMapper(DistributionTargetMapper.class),
session.getMapper(WorkbasketAccessMapper.class));
sessionManager.getMapper(WorkbasketMapper.class),
sessionManager.getMapper(DistributionTargetMapper.class),
sessionManager.getMapper(WorkbasketAccessMapper.class));
}
@Override
public ClassificationService getClassificationService() {
SqlSession session = this.sessionManager;
return new ClassificationServiceImpl(
internalTaskanaEngineImpl,
session.getMapper(ClassificationMapper.class),
session.getMapper(TaskMapper.class));
sessionManager.getMapper(ClassificationMapper.class),
sessionManager.getMapper(TaskMapper.class));
}
@Override
public JobService getJobService() {
SqlSession session = this.sessionManager;
return new JobServiceImpl(internalTaskanaEngineImpl, session.getMapper(JobMapper.class));
return new JobServiceImpl(internalTaskanaEngineImpl, sessionManager.getMapper(JobMapper.class));
}
@Override
@ -170,6 +165,11 @@ public class TaskanaEngineImpl implements TaskanaEngine {
return HistoryEventManager.isHistoryEnabled();
}
@Override
public ConnectionManagementMode getConnectionManagementMode() {
return mode;
}
@Override
public void setConnectionManagementMode(ConnectionManagementMode mode) {
if (this.mode == ConnectionManagementMode.EXPLICIT
@ -244,6 +244,9 @@ public class TaskanaEngineImpl implements TaskanaEngine {
}
public <T> T runAsAdmin(Supplier<T> supplier) {
if (isUserInRole(TaskanaRole.ADMIN)) {
return supplier.get();
}
String adminName =
this.getConfiguration().getRoleMap().get(TaskanaRole.ADMIN).stream()
@ -420,7 +423,7 @@ public class TaskanaEngineImpl implements TaskanaEngine {
}
@Override
public <T> T openAndReturnConnection(Supplier<T> supplier) {
public <T> T executeInDatabaseConnection(Supplier<T> supplier) {
try {
openConnection();
return supplier.get();

View File

@ -3,12 +3,10 @@ package pro.taskana.common.internal.jobs;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.TaskanaEngine;
import pro.taskana.common.api.exceptions.TaskanaException;
import pro.taskana.common.internal.TaskanaEngineImpl;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
@ -17,23 +15,26 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
protected final Instant firstRun;
protected final Duration runEvery;
protected TaskanaEngineImpl taskanaEngineImpl;
protected TaskanaTransactionProvider<Object> txProvider;
protected ScheduledJob scheduledJob;
protected final TaskanaEngineImpl taskanaEngineImpl;
protected final TaskanaTransactionProvider txProvider;
protected final ScheduledJob scheduledJob;
private final boolean async;
public AbstractTaskanaJob(
TaskanaEngine taskanaEngine,
TaskanaTransactionProvider<Object> txProvider,
ScheduledJob job) {
TaskanaTransactionProvider txProvider,
ScheduledJob job,
boolean async) {
this.taskanaEngineImpl = (TaskanaEngineImpl) taskanaEngine;
this.txProvider = txProvider;
this.scheduledJob = job;
firstRun = taskanaEngine.getConfiguration().getCleanupJobFirstRun();
this.runEvery = taskanaEngineImpl.getConfiguration().getCleanupJobRunEvery();
this.async = async;
firstRun = taskanaEngineImpl.getConfiguration().getCleanupJobFirstRun();
runEvery = taskanaEngineImpl.getConfiguration().getCleanupJobRunEvery();
}
public static TaskanaJob createFromScheduledJob(
TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider, ScheduledJob job)
TaskanaEngine engine, TaskanaTransactionProvider txProvider, ScheduledJob job)
throws ClassNotFoundException, IllegalAccessException, InstantiationException,
InvocationTargetException {
@ -45,23 +46,19 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
.newInstance(engine, txProvider, job);
}
protected <T> List<List<T>> partition(Collection<T> members, int maxSize) {
List<List<T>> result = new ArrayList<>();
List<T> internal = new ArrayList<>();
for (T member : members) {
internal.add(member);
if (internal.size() == maxSize) {
result.add(internal);
internal = new ArrayList<>();
}
@Override
public final void run() throws TaskanaException {
execute();
if (async) {
scheduleNextJob();
}
if (!internal.isEmpty()) {
result.add(internal);
}
return result;
}
protected Instant getNextDueForCleanupJob() {
protected abstract ScheduledJob.Type getType();
protected abstract void execute() throws TaskanaException;
protected Instant getNextDueForJob() {
Instant nextRun = firstRun;
if (scheduledJob != null && scheduledJob.getDue() != null) {
nextRun = scheduledJob.getDue();
@ -73,4 +70,11 @@ public abstract class AbstractTaskanaJob implements TaskanaJob {
return nextRun;
}
protected void scheduleNextJob() {
ScheduledJob job = new ScheduledJob();
job.setType(getType());
job.setDue(getNextDueForJob());
taskanaEngineImpl.getJobService().createJob(job);
}
}

View File

@ -2,20 +2,14 @@ package pro.taskana.common.internal.jobs;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import javax.security.auth.Subject;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.TaskanaEngine;
import pro.taskana.common.api.TaskanaRole;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.security.UserPrincipal;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
@ -25,138 +19,60 @@ public class JobRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(JobRunner.class);
private final TaskanaEngine taskanaEngine;
private final JobServiceImpl jobService;
private TaskanaTransactionProvider<Object> txProvider;
private TaskanaTransactionProvider txProvider;
public JobRunner(TaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine;
jobService = (JobServiceImpl) taskanaEngine.getJobService();
}
public void registerTransactionProvider(TaskanaTransactionProvider<Object> txProvider) {
public void registerTransactionProvider(TaskanaTransactionProvider txProvider) {
this.txProvider = txProvider;
}
public void runJobs() {
try {
List<ScheduledJob> jobsToRun = findAndLockJobsToRun();
for (ScheduledJob scheduledJob : jobsToRun) {
runJobTransactionally(scheduledJob);
}
} catch (Exception e) {
LOGGER.error("Error occurred while running jobs: ", e);
}
findAndLockJobsToRun().forEach(this::runJobTransactionally);
}
private List<ScheduledJob> findAndLockJobsToRun() {
List<ScheduledJob> availableJobs = jobService.findJobsToRun();
List<ScheduledJob> lockedJobs = new ArrayList<>();
for (ScheduledJob job : availableJobs) {
lockedJobs.add(lockJobTransactionally(job));
}
return lockedJobs;
return TaskanaTransactionProvider.executeInTransactionIfPossible(
txProvider,
() -> jobService.findJobsToRun().stream().map(this::lockJob).collect(Collectors.toList()));
}
private ScheduledJob lockJobTransactionally(ScheduledJob job) {
ScheduledJob lockedJob;
if (txProvider != null) {
lockedJob = (ScheduledJob) txProvider.executeInTransaction(() -> lockJob(job));
} else {
lockedJob = lockJob(job);
private void runJobTransactionally(ScheduledJob scheduledJob) {
TaskanaTransactionProvider.executeInTransactionIfPossible(
txProvider, () -> taskanaEngine.runAsAdmin(() -> runScheduledJob(scheduledJob)));
jobService.deleteJob(scheduledJob);
}
private void runScheduledJob(ScheduledJob scheduledJob) {
try {
AbstractTaskanaJob.createFromScheduledJob(taskanaEngine, txProvider, scheduledJob).run();
} catch (Exception e) {
LOGGER.error("Error running job: {} ", scheduledJob.getType(), e);
throw new SystemException(String.format("Error running job '%s'", scheduledJob.getType()), e);
}
}
private ScheduledJob lockJob(ScheduledJob job) {
String hostAddress = getHostAddress();
String owner = hostAddress + " - " + Thread.currentThread().getName();
job.setLockedBy(owner);
ScheduledJob lockedJob = jobService.lockJob(job, owner);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Locked job: {}", lockedJob);
}
return lockedJob;
}
private ScheduledJob lockJob(ScheduledJob job) {
String hostAddress = "UNKNOWN_ADDRESS";
private String getHostAddress() {
String hostAddress;
try {
hostAddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
// ignore
hostAddress = "UNKNOWN_ADDRESS";
}
job.setLockedBy(hostAddress + " - " + Thread.currentThread().getName());
String owner = hostAddress + " - " + Thread.currentThread().getName();
return jobService.lockJob(job, owner);
}
private void runJobTransactionally(ScheduledJob scheduledJob) {
try {
if (txProvider != null) {
txProvider.executeInTransaction(
() -> {
runScheduledJob(scheduledJob);
return null;
});
} else {
runScheduledJob(scheduledJob);
}
jobService.deleteJob(scheduledJob);
} catch (Exception e) {
LOGGER.error(
"Processing of job {} failed. Trying to split it up into two pieces...",
scheduledJob.getJobId(),
e);
}
}
private void runScheduledJob(ScheduledJob scheduledJob) {
if (taskanaEngine.isUserInRole(TaskanaRole.ADMIN)) {
// we run already as admin
runScheduledJobImpl(scheduledJob);
} else {
// we must establish admin context
try {
PrivilegedExceptionAction<Void> action =
() -> {
try {
runScheduledJobImpl(scheduledJob);
} catch (Exception e) {
throw new SystemException(String.format("could not run Job %s.", scheduledJob), e);
}
return null;
};
Subject.doAs(getAdminSubject(), action);
} catch (PrivilegedActionException e) {
LOGGER.warn("Attempt to run job {} failed.", scheduledJob, e);
}
}
}
private void runScheduledJobImpl(ScheduledJob scheduledJob) {
try {
TaskanaJob job =
AbstractTaskanaJob.createFromScheduledJob(taskanaEngine, txProvider, scheduledJob);
job.run();
} catch (Exception e) {
LOGGER.error("Error running job: {} ", scheduledJob.getType(), e);
throw new SystemException(
"When attempting to load class "
+ scheduledJob.getType()
+ " caught Exception "
+ e.getMessage(),
e);
}
}
private Subject getAdminSubject() {
Subject subject = new Subject();
List<Principal> principalList = new ArrayList<>();
try {
principalList.add(
new UserPrincipal(
taskanaEngine
.getConfiguration()
.getRoleMap()
.get(TaskanaRole.ADMIN)
.iterator()
.next()));
} catch (Exception t) {
LOGGER.warn("Could not determine a configured admin user.", t);
}
subject.getPrincipals().addAll(principalList);
return subject;
return hostAddress;
}
}

View File

@ -0,0 +1,39 @@
package pro.taskana.common.internal.jobs;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.function.Supplier;
import javax.sql.DataSource;
import pro.taskana.common.api.TaskanaEngine;
import pro.taskana.common.api.TaskanaEngine.ConnectionManagementMode;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
public class PlainJavaTransactionProvider implements TaskanaTransactionProvider {
private final TaskanaEngine taskanaEngine;
private final DataSource dataSource;
private final ConnectionManagementMode defaultConnectionManagementMode;
public PlainJavaTransactionProvider(TaskanaEngine taskanaEngine, DataSource dataSource) {
this.taskanaEngine = taskanaEngine;
this.dataSource = dataSource;
defaultConnectionManagementMode = taskanaEngine.getConnectionManagementMode();
}
@Override
public <T> T executeInTransaction(Supplier<T> supplier) {
try (Connection connection = dataSource.getConnection()) {
taskanaEngine.setConnection(connection);
final T t = supplier.get();
connection.commit();
return t;
} catch (SQLException ex) {
throw new SystemException("caught exception", ex);
} finally {
taskanaEngine.closeConnection();
taskanaEngine.setConnectionManagementMode(defaultConnectionManagementMode);
}
}
}

View File

@ -6,9 +6,9 @@ import pro.taskana.common.api.exceptions.TaskanaException;
public interface TaskanaJob {
/**
* Runs the TaskanaJob.
* Execute the TaskanaJob.
*
* @throws TaskanaException if an exception occured during the run.
* @throws TaskanaException if any exception occurs during the execution.
*/
void run() throws TaskanaException;
}

View File

@ -67,7 +67,7 @@ public class ObjectReferenceQueryImpl implements ObjectReferenceQuery {
@Override
public List<ObjectReference> list() {
return taskanaEngine.openAndReturnConnection(
return taskanaEngine.executeInDatabaseConnection(
() -> taskanaEngine.getSqlSession().selectList(LINK_TO_MAPPER, this));
}

View File

@ -977,7 +977,7 @@ public class TaskQueryImpl implements TaskQuery {
@Override
public List<TaskSummary> list() {
return taskanaEngine.openAndReturnConnection(
return taskanaEngine.executeInDatabaseConnection(
() -> {
checkForIllegalParamCombinations();
checkOpenAndReadPermissionForSpecifiedWorkbaskets();

View File

@ -31,18 +31,15 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskCleanupJob.class);
private static final SortDirection ASCENDING = SortDirection.ASCENDING;
// Parameter
private final Duration minimumAge;
private final int batchSize;
private final boolean allCompletedSameParentBusiness;
public TaskCleanupJob(
TaskanaEngine taskanaEngine,
TaskanaTransactionProvider<Object> txProvider,
TaskanaTransactionProvider txProvider,
ScheduledJob scheduledJob) {
super(taskanaEngine, txProvider, scheduledJob);
super(taskanaEngine, txProvider, scheduledJob, true);
minimumAge = taskanaEngine.getConfiguration().getCleanupJobMinimumAge();
batchSize = taskanaEngine.getConfiguration().getMaxNumberOfUpdatesPerTransaction();
allCompletedSameParentBusiness =
@ -50,7 +47,7 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
}
@Override
public void run() throws TaskanaException {
public void execute() {
Instant completedBefore = Instant.now().minus(minimumAge);
LOGGER.info("Running job to delete all tasks completed before ({})", completedBefore);
try {
@ -64,8 +61,6 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
LOGGER.info("Job ended successfully. {} tasks deleted.", totalNumberOfTasksDeleted);
} catch (Exception e) {
throw new SystemException("Error while processing TaskCleanupJob.", e);
} finally {
scheduleNextCleanupJob();
}
}
@ -77,9 +72,14 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
*/
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();
jobService.deleteJobs(Type.TASKCLEANUPJOB);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null, null);
job.scheduleNextCleanupJob();
jobService.deleteJobs(Type.TASK_CLEANUP_JOB);
job.scheduleNextJob();
}
@Override
protected Type getType() {
return Type.TASK_CLEANUP_JOB;
}
private List<TaskSummary> getTasksCompletedBefore(Instant untilDate) {
@ -89,7 +89,7 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
.getTaskService()
.createTaskQuery()
.completedWithin(new TimeInterval(null, untilDate))
.orderByBusinessProcessId(ASCENDING)
.orderByBusinessProcessId(SortDirection.ASCENDING)
.list();
if (allCompletedSameParentBusiness) {
@ -131,27 +131,16 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
}
private int deleteTasksTransactionally(List<TaskSummary> tasksToBeDeleted) {
int deletedTaskCount = 0;
if (txProvider != null) {
return (int)
txProvider.executeInTransaction(
() -> {
try {
return deleteTasks(tasksToBeDeleted);
} catch (Exception e) {
LOGGER.warn("Could not delete tasks.", e);
return 0;
}
});
} else {
try {
deletedTaskCount = deleteTasks(tasksToBeDeleted);
} catch (Exception e) {
LOGGER.warn("Could not delete tasks.", e);
}
}
return deletedTaskCount;
return TaskanaTransactionProvider.executeInTransactionIfPossible(
txProvider,
() -> {
try {
return deleteTasks(tasksToBeDeleted);
} catch (Exception ex) {
LOGGER.warn("Could not delete tasks.", ex);
return 0;
}
});
}
private int deleteTasks(List<TaskSummary> tasksToBeDeleted)
@ -175,10 +164,24 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
return tasksIdsToBeDeleted.size() - results.getFailedIds().size();
}
private void scheduleNextCleanupJob() {
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.TASKCLEANUPJOB);
job.setDue(getNextDueForCleanupJob());
taskanaEngineImpl.getJobService().createJob(job);
@Override
public String toString() {
return "TaskCleanupJob [firstRun="
+ firstRun
+ ", runEvery="
+ runEvery
+ ", taskanaEngineImpl="
+ taskanaEngineImpl
+ ", txProvider="
+ txProvider
+ ", scheduledJob="
+ scheduledJob
+ ", minimumAge="
+ minimumAge
+ ", batchSize="
+ batchSize
+ ", allCompletedSameParentBusiness="
+ allCompletedSameParentBusiness
+ "]";
}
}

View File

@ -7,6 +7,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.ScheduledJob.Type;
import pro.taskana.common.api.TaskanaEngine;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.exceptions.TaskanaException;
@ -14,7 +15,7 @@ import pro.taskana.common.internal.jobs.AbstractTaskanaJob;
import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
import pro.taskana.task.internal.TaskServiceImpl;
/** This class executes a job of type CLASSIFICATIONCHANGEDJOB. */
/** This class executes a job of type {@linkplain ScheduledJob.Type#TASK_REFRESH_JOB}. */
public class TaskRefreshJob extends AbstractTaskanaJob {
public static final String TASK_IDS = "taskIds";
@ -26,8 +27,8 @@ public class TaskRefreshJob extends AbstractTaskanaJob {
private final boolean serviceLevelChanged;
public TaskRefreshJob(
TaskanaEngine engine, TaskanaTransactionProvider<Object> txProvider, ScheduledJob job) {
super(engine, txProvider, job);
TaskanaEngine engine, TaskanaTransactionProvider txProvider, ScheduledJob job) {
super(engine, txProvider, job, false);
Map<String, String> args = job.getArguments();
String taskIdsString = args.get(TASK_IDS);
affectedTaskIds = Arrays.asList(taskIdsString.split(","));
@ -36,7 +37,7 @@ public class TaskRefreshJob extends AbstractTaskanaJob {
}
@Override
public void run() throws TaskanaException {
public void execute() throws TaskanaException {
LOGGER.info("Running TaskRefreshJob for {} tasks", affectedTaskIds.size());
try {
TaskServiceImpl taskService = (TaskServiceImpl) taskanaEngineImpl.getTaskService();
@ -48,8 +49,29 @@ public class TaskRefreshJob extends AbstractTaskanaJob {
}
}
@Override
protected Type getType() {
return Type.TASK_REFRESH_JOB;
}
@Override
public String toString() {
return "TaskRefreshJob [affectedTaskIds= " + affectedTaskIds + "]";
return "TaskRefreshJob [firstRun="
+ firstRun
+ ", runEvery="
+ runEvery
+ ", taskanaEngineImpl="
+ taskanaEngineImpl
+ ", txProvider="
+ txProvider
+ ", scheduledJob="
+ scheduledJob
+ ", affectedTaskIds="
+ affectedTaskIds
+ ", priorityChanged="
+ priorityChanged
+ ", serviceLevelChanged="
+ serviceLevelChanged
+ "]";
}
}

View File

@ -19,22 +19,21 @@ import pro.taskana.workbasket.api.models.WorkbasketAccessItem;
* @param <Q> the actual WorkbasketAccessItemQuery behind this abstract class
* @param <T> the workbasket access item
*/
// TODO: this class not never used.. remove?
abstract class AbstractWorkbasketAccessItemQueryImpl<
Q extends AbstractWorkbasketAccessItemQuery<Q, T>, T extends WorkbasketAccessItem>
implements AbstractWorkbasketAccessItemQuery<Q, T> {
private static final String LINK_TO_COUNTER =
"pro.taskana.workbasket.internal.WorkbasketQueryMapper.countQueryWorkbasketAccessItems";
private final InternalTaskanaEngine taskanaEngine;
private final List<String> orderBy;
private final List<String> orderColumns;
private AccessItemQueryColumnName columnName;
private String[] accessIdIn;
private String[] workbasketIdIn;
private String[] idIn;
private InternalTaskanaEngine taskanaEngine;
private List<String> orderBy;
private List<String> orderColumns;
AbstractWorkbasketAccessItemQueryImpl(InternalTaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine;
orderBy = new ArrayList<>();
@ -77,7 +76,7 @@ abstract class AbstractWorkbasketAccessItemQueryImpl<
@Override
public List<T> list() {
return taskanaEngine.openAndReturnConnection(
return taskanaEngine.executeInDatabaseConnection(
() -> taskanaEngine.getSqlSession().selectList(getLinkToMapper(), _this()));
}

View File

@ -31,9 +31,9 @@ public class WorkbasketAccessItemQueryImpl implements WorkbasketAccessItemQuery
private String[] workbasketKeyLike;
private String[] idIn;
private InternalTaskanaEngine taskanaEngine;
private List<String> orderBy;
private List<String> orderColumns;
private final InternalTaskanaEngine taskanaEngine;
private final List<String> orderBy;
private final List<String> orderColumns;
WorkbasketAccessItemQueryImpl(InternalTaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine;
@ -100,7 +100,7 @@ public class WorkbasketAccessItemQueryImpl implements WorkbasketAccessItemQuery
@Override
public List<WorkbasketAccessItem> list() {
return taskanaEngine.openAndReturnConnection(
return taskanaEngine.executeInDatabaseConnection(
() -> taskanaEngine.getSqlSession().selectList(LINK_TO_MAPPER, this));
}

View File

@ -352,7 +352,7 @@ public class WorkbasketQueryImpl implements WorkbasketQuery {
@Override
public List<WorkbasketSummary> list() {
handleCallerRolesAndAccessIds();
return taskanaEngine.openAndReturnConnection(
return taskanaEngine.executeInDatabaseConnection(
() -> taskanaEngine.getSqlSession().selectList(LINK_TO_MAPPER, this));
}

View File

@ -110,7 +110,7 @@ public class WorkbasketServiceImpl implements WorkbasketService {
}
Workbasket workbasket =
taskanaEngine.openAndReturnConnection(
taskanaEngine.executeInDatabaseConnection(
() -> workbasketMapper.findByKeyAndDomain(workbasketKey, domain));
if (workbasket == null) {
throw new WorkbasketNotFoundException(workbasketKey, domain);

View File

@ -27,19 +27,16 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkbasketCleanupJob.class);
// Parameter
private final int batchSize;
public WorkbasketCleanupJob(
TaskanaEngine taskanaEngine,
TaskanaTransactionProvider<Object> txProvider,
ScheduledJob job) {
super(taskanaEngine, txProvider, job);
TaskanaEngine taskanaEngine, TaskanaTransactionProvider txProvider, ScheduledJob job) {
super(taskanaEngine, txProvider, job, true);
batchSize = taskanaEngine.getConfiguration().getMaxNumberOfUpdatesPerTransaction();
}
@Override
public void run() throws TaskanaException {
public void execute() throws TaskanaException {
LOGGER.info("Running job to delete all workbaskets marked for deletion");
try {
List<String> workbasketsMarkedForDeletion = getWorkbasketsMarkedForDeletion();
@ -51,8 +48,6 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
"Job ended successfully. {} workbaskets deleted.", totalNumberOfWorkbasketDeleted);
} catch (Exception e) {
throw new SystemException("Error while processing WorkbasketCleanupJob.", e);
} finally {
scheduleNextCleanupJob();
}
}
@ -64,9 +59,14 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
*/
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();
jobService.deleteJobs(Type.WORKBASKETCLEANUPJOB);
WorkbasketCleanupJob job = new WorkbasketCleanupJob(taskanaEngine, null, null);
job.scheduleNextCleanupJob();
jobService.deleteJobs(job.getType());
job.scheduleNextJob();
}
@Override
protected Type getType() {
return Type.WORKBASKET_CLEANUP_JOB;
}
private List<String> getWorkbasketsMarkedForDeletion() {
@ -79,26 +79,16 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
}
private int deleteWorkbasketsTransactionally(List<String> workbasketsToBeDeleted) {
int deletedWorkbasketsCount = 0;
if (txProvider != null) {
return (Integer)
txProvider.executeInTransaction(
() -> {
try {
return deleteWorkbaskets(workbasketsToBeDeleted);
} catch (Exception e) {
LOGGER.warn("Could not delete workbaskets.", e);
return 0;
}
});
} else {
try {
deletedWorkbasketsCount = deleteWorkbaskets(workbasketsToBeDeleted);
} catch (Exception e) {
LOGGER.warn("Could not delete workbaskets.", e);
}
}
return deletedWorkbasketsCount;
return TaskanaTransactionProvider.executeInTransactionIfPossible(
txProvider,
() -> {
try {
return deleteWorkbaskets(workbasketsToBeDeleted);
} catch (Exception e) {
LOGGER.warn("Could not delete workbaskets.", e);
return 0;
}
});
}
private int deleteWorkbaskets(List<String> workbasketsToBeDeleted)
@ -118,11 +108,4 @@ public class WorkbasketCleanupJob extends AbstractTaskanaJob {
}
return workbasketsToBeDeleted.size() - results.getFailedIds().size();
}
private void scheduleNextCleanupJob() {
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.WORKBASKETCLEANUPJOB);
job.setDue(getNextDueForCleanupJob());
taskanaEngineImpl.getJobService().createJob(job);
}
}

View File

@ -0,0 +1,114 @@
package acceptance.jobs;
import static org.assertj.core.api.Assertions.assertThat;
import acceptance.AbstractAccTest;
import acceptance.TaskanaEngineTestConfiguration;
import java.sql.Connection;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import javax.sql.DataSource;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import org.mockito.invocation.InvocationOnMock;
import pro.taskana.common.api.ScheduledJob;
import pro.taskana.common.api.TaskanaEngine;
import pro.taskana.common.api.TaskanaEngine.ConnectionManagementMode;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.internal.JobServiceImpl;
import pro.taskana.common.internal.jobs.JobRunner;
import pro.taskana.common.internal.jobs.PlainJavaTransactionProvider;
class JobRunnerAccTest extends AbstractAccTest {
private final JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();
@Test
void should_onlyExecuteJobOnce_When_MultipleThreadsTryToRunJobsAtTheSameTime() throws Exception {
resetDb(true); // for some reason clearing the job table is not enough..
assertThat(jobService.findJobsToRun()).isEmpty();
ScheduledJob job = createJob(Instant.now().minus(5, ChronoUnit.MINUTES));
assertThat(jobService.findJobsToRun()).containsExactly(job);
runInThread(
() -> {
try {
TaskanaEngine taskanaEngine = taskanaEngineConfiguration.buildTaskanaEngine();
taskanaEngine.setConnectionManagementMode(ConnectionManagementMode.AUTOCOMMIT);
DataSource dataSource = TaskanaEngineTestConfiguration.getDataSource();
// We have to slow down the transaction.
// This is necessary to guarantee the execution of
// both test threads and therefore test the database lock.
// Without the slow down the test threads would execute too fast and
// would not request executable jobs from the database at the same time.
dataSource = slowDownDatabaseTransaction(dataSource);
PlainJavaTransactionProvider transactionProvider =
new PlainJavaTransactionProvider(taskanaEngine, dataSource);
JobRunner runner = new JobRunner(taskanaEngine);
runner.registerTransactionProvider(transactionProvider);
runner.runJobs();
} catch (Exception e) {
throw new SystemException("Caught Exception", e);
}
},
2);
// runEvery is set to P1D Therefore we need to check which jobs run tomorrow.
// Just to be sure the jobs are found we will look for any job scheduled in the next 2 days.
List<ScheduledJob> jobsToRun =
getJobMapper().findJobsToRun(Instant.now().plus(2, ChronoUnit.DAYS));
assertThat(jobsToRun).hasSize(1).doesNotContain(job);
}
private void runInThread(Runnable runnable, int threadCount) throws Exception {
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(runnable);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
private ScheduledJob createJob(Instant firstDue) {
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.TASK_CLEANUP_JOB);
job.setDue(firstDue);
jobService.createJob(job);
return job;
}
private DataSource slowDownDatabaseTransaction(DataSource dataSource) throws Exception {
dataSource = Mockito.spy(dataSource);
Mockito.doAnswer(
invocationOnMock -> {
Connection connection = (Connection) invocationOnMock.callRealMethod();
connection = Mockito.spy(connection);
Mockito.doAnswer(new CallsRealMethodsWithDelay(100)).when(connection).commit();
return connection;
})
.when(dataSource)
.getConnection();
return dataSource;
}
private static class CallsRealMethodsWithDelay extends CallsRealMethods {
private final int delay;
private CallsRealMethodsWithDelay(int delay) {
this.delay = delay;
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(delay);
return super.answer(invocation);
}
}
}

View File

@ -110,11 +110,11 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
for (int i = 0; i < 10; i++) {
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.TASKCLEANUPJOB);
job.setType(ScheduledJob.Type.TASK_CLEANUP_JOB);
taskanaEngine.getJobService().createJob(job);
job.setType(Type.UPDATETASKSJOB);
job.setType(Type.TASK_REFRESH_JOB);
taskanaEngine.getJobService().createJob(job);
job.setType(Type.CLASSIFICATIONCHANGEDJOB);
job.setType(Type.CLASSIFICATION_CHANGED_JOB);
taskanaEngine.getJobService().createJob(job);
}
@ -124,7 +124,7 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
List<ScheduledJob> taskCleanupJobs =
jobsToRun.stream()
.filter(scheduledJob -> scheduledJob.getType().equals(Type.TASKCLEANUPJOB))
.filter(scheduledJob -> scheduledJob.getType().equals(Type.TASK_CLEANUP_JOB))
.collect(Collectors.toList());
TaskCleanupJob.initializeSchedule(taskanaEngine);
@ -172,7 +172,7 @@ class TaskCleanupJobAccTest extends AbstractAccTest {
Instant firstDue = Instant.now().truncatedTo(ChronoUnit.MILLIS);
ScheduledJob scheduledJob = new ScheduledJob();
scheduledJob.setType(ScheduledJob.Type.TASKCLEANUPJOB);
scheduledJob.setType(ScheduledJob.Type.TASK_CLEANUP_JOB);
scheduledJob.setDue(firstDue);
JobServiceImpl jobService = (JobServiceImpl) taskanaEngine.getJobService();

View File

@ -95,11 +95,11 @@ class WorkbasketCleanupJobAccTest extends AbstractAccTest {
for (int i = 0; i < 10; i++) {
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.WORKBASKETCLEANUPJOB);
job.setType(ScheduledJob.Type.WORKBASKET_CLEANUP_JOB);
taskanaEngine.getJobService().createJob(job);
job.setType(Type.UPDATETASKSJOB);
job.setType(Type.TASK_REFRESH_JOB);
taskanaEngine.getJobService().createJob(job);
job.setType(Type.CLASSIFICATIONCHANGEDJOB);
job.setType(Type.CLASSIFICATION_CHANGED_JOB);
taskanaEngine.getJobService().createJob(job);
}
@ -109,7 +109,7 @@ class WorkbasketCleanupJobAccTest extends AbstractAccTest {
List<ScheduledJob> workbasketCleanupJobs =
jobsToRun.stream()
.filter(scheduledJob -> scheduledJob.getType().equals(Type.WORKBASKETCLEANUPJOB))
.filter(scheduledJob -> scheduledJob.getType().equals(Type.WORKBASKET_CLEANUP_JOB))
.collect(Collectors.toList());
WorkbasketCleanupJob.initializeSchedule(taskanaEngine);

View File

@ -181,7 +181,7 @@ class UpdateObjectsUseUtcTimeStampsAccTest extends AbstractAccTest {
resetDb(true);
ScheduledJob job = new ScheduledJob();
job.setArguments(Map.of("keyBla", "valueBla"));
job.setType(ScheduledJob.Type.TASKCLEANUPJOB);
job.setType(ScheduledJob.Type.TASK_CLEANUP_JOB);
job.setDue(Instant.now().minus(Duration.ofHours(5)));
job.setLockExpires(Instant.now().minus(Duration.ofHours(5)));
JobService jobService = taskanaEngine.getJobService();

View File

@ -39,16 +39,7 @@ class SelectAndClaimTaskAccTest extends AbstractAccTest {
Stream.of("admin", "teamlead-1", "teamlead-2", "taskadmin")
.collect(Collectors.toList()));
Runnable test = getRunnableTest(selectedAndClaimedTasks, accessIds);
Thread[] threads = new Thread[accessIds.size()];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(test);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
runInThread(getRunnableTest(selectedAndClaimedTasks, accessIds), accessIds.size());
assertThat(selectedAndClaimedTasks)
.extracting(Task::getId)
@ -76,6 +67,17 @@ class SelectAndClaimTaskAccTest extends AbstractAccTest {
+ "task query returned nothing!");
}
private void runInThread(Runnable runnable, int threadCount) throws InterruptedException {
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(runnable);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
}
private Runnable getRunnableTest(List<Task> selectedAndClaimedTasks, List<String> accessIds) {
return () -> {
Subject subject = new Subject();

View File

@ -50,7 +50,7 @@ class TaskanaTransactionIntTest {
private static final String INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error";
private static final String INTERNAL_SERVER_ERROR_STATUS = "500";
@Autowired TaskanaTransactionProvider<Object> springTransactionProvider;
@Autowired TaskanaTransactionProvider springTransactionProvider;
@Autowired private TestRestTemplate restTemplate;
@Autowired private DataSource dataSource;
@Autowired private JdbcTemplate jdbcTemplate;

View File

@ -11,7 +11,7 @@ import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
public class TransactionalJobsConfiguration {
@Bean
public TaskanaTransactionProvider<Object> springTransactionProvider() {
public TaskanaTransactionProvider springTransactionProvider() {
return new SpringTransactionProvider();
}
}

View File

@ -1,15 +1,16 @@
package pro.taskana.common.internal.transaction;
import java.util.function.Supplier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/** TODO. */
@Component
public class SpringTransactionProvider implements TaskanaTransactionProvider<Object> {
public class SpringTransactionProvider implements TaskanaTransactionProvider {
@Override
@Transactional(rollbackFor = Exception.class)
public Object executeInTransaction(TaskanaCallable<Object> action) {
return action.call();
public <T> T executeInTransaction(Supplier<T> supplier) {
return supplier.get();
}
}

View File

@ -20,12 +20,12 @@ import pro.taskana.workbasket.internal.jobs.WorkbasketCleanupJob;
public class JobScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);
private final TaskanaTransactionProvider<Object> springTransactionProvider;
private final TaskanaTransactionProvider springTransactionProvider;
private final TaskanaEngine taskanaEngine;
@Autowired
public JobScheduler(
TaskanaTransactionProvider<Object> springTransactionProvider, TaskanaEngine taskanaEngine) {
TaskanaTransactionProvider springTransactionProvider, TaskanaEngine taskanaEngine) {
this.springTransactionProvider = springTransactionProvider;
this.taskanaEngine = taskanaEngine;
}
@ -40,7 +40,7 @@ public class JobScheduler {
if (taskanaEngine.isHistoryEnabled()) {
Thread.currentThread()
.getContextClassLoader()
.loadClass(Type.HISTORYCLEANUPJOB.getClazz())
.loadClass(Type.HISTORY_CLEANUP_JOB.getClazz())
.getDeclaredMethod("initializeSchedule", TaskanaEngine.class)
.invoke(null, taskanaEngine);
}

View File

@ -11,7 +11,7 @@ import pro.taskana.common.internal.transaction.TaskanaTransactionProvider;
public class TransactionalJobsConfiguration {
@Bean
public TaskanaTransactionProvider<Object> springTransactionProvider() {
public TaskanaTransactionProvider springTransactionProvider() {
return new SpringTransactionProvider();
}
}