TSK-629: first refactoring of job handling.
This commit is contained in:
parent
b1d0f4acdd
commit
b7be70d09c
|
@ -44,8 +44,8 @@ public class TaskanaEngineConfiguration {
|
|||
private static final String H2_DRIVER = "org.h2.Driver";
|
||||
private static final String TASKANA_PROPERTIES = "/taskana.properties";
|
||||
private static final String TASKANA_ROLES_SEPARATOR = "|";
|
||||
private static final String TASKANA_JOB_TASK_UPDATES_PER_TRANSACTION = "taskana.jobs.taskupdate.batchSize";
|
||||
private static final String TASKANA_JOB_RETRIES_FOR_FAILED_TASK_UPDATES = "taskana.jobs.taskupdate.maxRetries";
|
||||
private static final String TASKANA_JOB_TASK_UPDATES_PER_TRANSACTION = "taskana.jobs.batchSize";
|
||||
private static final String TASKANA_JOB_RETRIES_FOR_FAILED_TASK_UPDATES = "taskana.jobs.maxRetries";
|
||||
|
||||
private static final String TASKANA_DOMAINS_PROPERTY = "taskana.domains";
|
||||
private static final String TASKANA_CLASSIFICATION_TYPES_PROPERTY = "taskana.classification.types";
|
||||
|
@ -75,7 +75,7 @@ public class TaskanaEngineConfiguration {
|
|||
|
||||
// Properties for task-update Job execution on classification change
|
||||
private int maxNumberOfTaskUpdatesPerTransaction;
|
||||
private int maxNumberOfRetriesOfFailedTaskUpdates;
|
||||
private int maxNumberOfJobRetries;
|
||||
|
||||
// List of configured domain names
|
||||
protected List<String> domains = new ArrayList<String>();
|
||||
|
@ -147,14 +147,14 @@ public class TaskanaEngineConfiguration {
|
|||
|
||||
String retries = props.getProperty(TASKANA_JOB_RETRIES_FOR_FAILED_TASK_UPDATES);
|
||||
if (retries == null || retries.isEmpty()) {
|
||||
maxNumberOfRetriesOfFailedTaskUpdates = 3;
|
||||
maxNumberOfJobRetries = 3;
|
||||
} else {
|
||||
maxNumberOfRetriesOfFailedTaskUpdates = Integer.parseInt(retries);
|
||||
maxNumberOfJobRetries = Integer.parseInt(retries);
|
||||
}
|
||||
|
||||
LOGGER.debug(
|
||||
"Configured number of task updates per transaction: {}, number of retries of failed task updates: {}",
|
||||
maxNumberOfTaskUpdatesPerTransaction, maxNumberOfRetriesOfFailedTaskUpdates);
|
||||
maxNumberOfTaskUpdatesPerTransaction, maxNumberOfJobRetries);
|
||||
}
|
||||
|
||||
private void initDomains(Properties props) {
|
||||
|
@ -312,8 +312,8 @@ public class TaskanaEngineConfiguration {
|
|||
return maxNumberOfTaskUpdatesPerTransaction;
|
||||
}
|
||||
|
||||
public int getMaxNumberOfRetriesOfFailedTaskUpdates() {
|
||||
return maxNumberOfRetriesOfFailedTaskUpdates;
|
||||
public int getMaxNumberOfJobRetries() {
|
||||
return maxNumberOfJobRetries;
|
||||
}
|
||||
|
||||
public void setPropertiesFileName(String propertiesFileName) {
|
||||
|
|
|
@ -12,10 +12,10 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
import pro.taskana.exceptions.SystemException;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
import pro.taskana.mappings.JobMapper;
|
||||
import pro.taskana.transaction.TaskanaTransactionProvider;
|
||||
|
||||
/**
|
||||
* This is the runner for all jobs scheduled in the Job table.
|
||||
|
@ -28,17 +28,17 @@ public class JobRunner {
|
|||
private TaskanaEngineImpl taskanaEngine;
|
||||
private JobMapper jobMapper;
|
||||
private int maxRetryCount;
|
||||
private TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider;
|
||||
private TaskanaTransactionProvider<Object> txProvider;
|
||||
|
||||
public JobRunner(TaskanaEngine taskanaEngine) {
|
||||
this.taskanaEngine = (TaskanaEngineImpl) taskanaEngine;
|
||||
jobMapper = this.taskanaEngine.getSqlSession().getMapper(JobMapper.class);
|
||||
maxRetryCount = taskanaEngine.getConfiguration().getMaxNumberOfRetriesOfFailedTaskUpdates();
|
||||
maxRetryCount = taskanaEngine.getConfiguration().getMaxNumberOfJobRetries();
|
||||
txProvider = null;
|
||||
}
|
||||
|
||||
public void registerTransactionProvider(
|
||||
TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider) {
|
||||
TaskanaTransactionProvider<Object> txProvider) {
|
||||
this.txProvider = txProvider;
|
||||
}
|
||||
|
||||
|
@ -101,7 +101,9 @@ public class JobRunner {
|
|||
BulkOperationResults<String, Exception> log;
|
||||
try {
|
||||
if (txProvider != null) {
|
||||
log = txProvider.executeInTransaction(() -> { // each job in its own transaction
|
||||
log = (BulkOperationResults<String, Exception>) txProvider.executeInTransaction(() -> { // each job in
|
||||
// its own
|
||||
// transaction
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
return runSingleJob(job);
|
||||
|
|
|
@ -1,99 +0,0 @@
|
|||
package pro.taskana.impl;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskSummary;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
import pro.taskana.TimeInterval;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
|
||||
/**
|
||||
* This is the runner for Tasks jobs.
|
||||
*
|
||||
* @author mmr
|
||||
*/
|
||||
public class JobTaskRunner {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class);
|
||||
private TaskServiceImpl taskanaService;
|
||||
private TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider;
|
||||
private int maxRetryOperations = 3;
|
||||
private int batchSize = 50;
|
||||
|
||||
public JobTaskRunner(TaskanaEngine taskanaEngine) {
|
||||
this.taskanaService = (TaskServiceImpl) taskanaEngine.getTaskService();
|
||||
}
|
||||
|
||||
public BulkOperationResults<String, Exception> runCleanCompletedTasks(Instant untilDate) {
|
||||
return cleanCompletedTasks(untilDate);
|
||||
}
|
||||
|
||||
public void registerTransactionProvider(
|
||||
TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider) {
|
||||
this.txProvider = txProvider;
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> cleanCompletedTasks(Instant untilDate) {
|
||||
LOGGER.info("entry to RunCompletedTasks({})", untilDate.toString());
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
int attempt = 0;
|
||||
|
||||
List<TaskSummary> tasksCompleted = getTasksCompleted(untilDate);
|
||||
List<String> tasksIds = new ArrayList<>();
|
||||
while (tasksCompleted.size() != 0 && attempt < maxRetryOperations) {
|
||||
tasksCompleted.stream().forEach(task -> {
|
||||
tasksIds.add(task.getTaskId());
|
||||
LOGGER.info("task id to be deleted: {}", task.getTaskId());
|
||||
});
|
||||
bulkLog = executeTransactionalDeleting(tasksIds);
|
||||
attempt = getAttempt(bulkLog, attempt);
|
||||
tasksCompleted = getTasksCompleted(untilDate);
|
||||
}
|
||||
|
||||
LOGGER.info("exit from RunCompletedTasks({}). Returning result: " + bulkLog, untilDate.toString());
|
||||
return bulkLog;
|
||||
|
||||
}
|
||||
|
||||
private List<TaskSummary> getTasksCompleted(Instant untilDate) {
|
||||
return taskanaService.createTaskQuery()
|
||||
.completedWithin(new TimeInterval(null, untilDate))
|
||||
.list(0, batchSize);
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> executeTransactionalDeleting(List<String> tasksIds) {
|
||||
if (txProvider == null) {
|
||||
return doDeleteTasks(tasksIds);
|
||||
}
|
||||
return txProvider.executeInTransaction(() -> doDeleteTasks(tasksIds));
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> doDeleteTasks(List<String> tasksIds) {
|
||||
if (tasksIds.isEmpty()) {
|
||||
return new BulkOperationResults<>();
|
||||
}
|
||||
|
||||
try {
|
||||
return taskanaService.deleteTasks(tasksIds).mapBulkOperationResults();
|
||||
} catch (InvalidArgumentException e) {
|
||||
LOGGER.error("could not delete next tasksIds batch: {}, error:" + e.getMessage(),
|
||||
String.join(",", tasksIds));
|
||||
return new BulkOperationResults<>();
|
||||
}
|
||||
}
|
||||
|
||||
private int getAttempt(BulkOperationResults<String, Exception> bulkLog, int attempt) {
|
||||
if (!bulkLog.getErrorMap().isEmpty()) {
|
||||
return ++attempt;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -990,10 +990,15 @@ public class TaskServiceImpl implements TaskService {
|
|||
try {
|
||||
taskanaEngine.openConnection();
|
||||
if (taskIds == null) {
|
||||
throw new InvalidArgumentException("TaskIds can´t be NULL as parameter for deleteTasks().");
|
||||
throw new InvalidArgumentException("List of TaskIds must not be null.");
|
||||
}
|
||||
|
||||
BulkOperationResults<String, TaskanaException> bulkLog = new BulkOperationResults<>();
|
||||
|
||||
if (taskIds.isEmpty()) {
|
||||
return bulkLog;
|
||||
}
|
||||
|
||||
List<MinimalTaskSummary> taskSummaries = taskMapper.findExistingTasks(taskIds);
|
||||
|
||||
Iterator<String> taskIdIterator = taskIds.iterator();
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import pro.taskana.TaskanaEngine;
|
||||
|
||||
/**
|
||||
* Abstract base for all background jobs of TASKANA.
|
||||
*/
|
||||
public abstract class AbstractTaskanaJob implements TaskanaJob {
|
||||
|
||||
protected TaskanaEngine taskanaEngine;
|
||||
|
||||
public AbstractTaskanaJob(TaskanaEngine taskanaEngine) {
|
||||
this.taskanaEngine = taskanaEngine;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.impl.TaskServiceImpl;
|
||||
import pro.taskana.transaction.TaskanaTransactionProvider;
|
||||
|
||||
/**
|
||||
* This is the runner for Tasks jobs.
|
||||
*/
|
||||
public class JobRunner {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class);
|
||||
private TaskanaEngine taskanaEngine;
|
||||
private TaskanaTransactionProvider<Object> txProvider;
|
||||
private int batchSize = 50;
|
||||
private int maxRetryCount;
|
||||
private int attempt = 0;
|
||||
|
||||
public JobRunner(TaskanaEngine taskanaEngine) {
|
||||
this.taskanaEngine = taskanaEngine;
|
||||
maxRetryCount = taskanaEngine.getConfiguration().getMaxNumberOfJobRetries();
|
||||
batchSize = taskanaEngine.getConfiguration().getMaxNumberOfTaskUpdatesPerTransaction();
|
||||
}
|
||||
|
||||
public void registerTransactionProvider(
|
||||
TaskanaTransactionProvider<Object> txProvider) {
|
||||
this.txProvider = txProvider;
|
||||
}
|
||||
|
||||
public void runJob(TaskanaJob job) throws Exception {
|
||||
if (txProvider != null) {
|
||||
txProvider.executeInTransaction(() -> {
|
||||
try {
|
||||
job.run();
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Exception caught while processing job transactionally. ", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
job.run();
|
||||
}
|
||||
}
|
||||
|
||||
public void runJobWithRetries(TaskanaJob job) throws Exception {
|
||||
try {
|
||||
runJob(job);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Job failed due to an Exception.", e);
|
||||
if (attempt < maxRetryCount) {
|
||||
attempt++;
|
||||
LOGGER.info("Retrying for the {} time.", attempt);
|
||||
runJobWithRetries(job);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskSummary;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TimeInterval;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.exceptions.TaskanaException;
|
||||
|
||||
/**
|
||||
* Job to cleanup completed tasks after a period of time.
|
||||
*/
|
||||
public class TaskCleanupJob extends AbstractTaskanaJob {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskCleanupJob.class);
|
||||
|
||||
// Parameter
|
||||
private Instant completedBefore;
|
||||
|
||||
// Results
|
||||
private BulkOperationResults<String, TaskanaException> results;
|
||||
|
||||
public TaskCleanupJob(TaskanaEngine taskanaEngine, Instant completedBefore) {
|
||||
super(taskanaEngine);
|
||||
this.completedBefore = completedBefore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws TaskanaException {
|
||||
LOGGER.info("Running job to delete all tasks completed before ({})", completedBefore.toString());
|
||||
try {
|
||||
List<TaskSummary> tasksCompletedBefore = getTasksCompletedBefore(completedBefore);
|
||||
deleteTasks(tasksCompletedBefore);
|
||||
LOGGER.info("Job ended successfully.");
|
||||
} catch (InvalidArgumentException e) {
|
||||
throw new TaskanaException("Error while processing TaskCleanupJob.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<TaskSummary> getTasksCompletedBefore(Instant untilDate) {
|
||||
return taskanaEngine.getTaskService()
|
||||
.createTaskQuery()
|
||||
.completedWithin(new TimeInterval(null, untilDate))
|
||||
.list();
|
||||
}
|
||||
|
||||
private void deleteTasks(List<TaskSummary> tasksToBeDeleted) throws InvalidArgumentException {
|
||||
List<String> tasksIdsToBeDeleted = tasksToBeDeleted.stream()
|
||||
.map(task -> task.getTaskId())
|
||||
.collect(Collectors.toList());
|
||||
results = taskanaEngine.getTaskService().deleteTasks(tasksIdsToBeDeleted);
|
||||
LOGGER.info("{} tasks deleted.", tasksIdsToBeDeleted.size() - results.getFailedIds().size());
|
||||
for (String failedId : results.getFailedIds()) {
|
||||
LOGGER.warn("Task with id {} could not be deleted. Reason: {}", failedId, results.getErrorForId(failedId));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import pro.taskana.exceptions.TaskanaException;
|
||||
|
||||
/**
|
||||
* Interface for all background TASKANA jobs.
|
||||
*/
|
||||
public interface TaskanaJob {
|
||||
|
||||
/**
|
||||
* Runs the TaskanaJob.
|
||||
*/
|
||||
void run() throws TaskanaException;
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package pro.taskana;
|
||||
package pro.taskana.transaction;
|
||||
|
||||
/**
|
||||
* represents a callable Object.
|
|
@ -1,9 +1,8 @@
|
|||
package pro.taskana;
|
||||
package pro.taskana.transaction;
|
||||
|
||||
/**
|
||||
* This class provides support for transactions.
|
||||
*
|
||||
* @author bbr
|
||||
* @param <T>
|
||||
* the type of the returned objects.
|
||||
*/
|
||||
|
@ -11,4 +10,5 @@ package pro.taskana;
|
|||
public interface TaskanaTransactionProvider<T> {
|
||||
|
||||
T executeInTransaction(TaskanaCallable<T> action);
|
||||
|
||||
}
|
|
@ -1,8 +1,7 @@
|
|||
package acceptance.task;
|
||||
package acceptance.jobs;
|
||||
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
|
@ -16,20 +15,12 @@ import org.junit.Test;
|
|||
import org.junit.runner.RunWith;
|
||||
|
||||
import acceptance.AbstractAccTest;
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.Task;
|
||||
import pro.taskana.TaskService;
|
||||
import pro.taskana.TaskSummary;
|
||||
import pro.taskana.TimeInterval;
|
||||
import pro.taskana.exceptions.ClassificationNotFoundException;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.exceptions.InvalidOwnerException;
|
||||
import pro.taskana.exceptions.InvalidStateException;
|
||||
import pro.taskana.exceptions.NotAuthorizedException;
|
||||
import pro.taskana.exceptions.TaskAlreadyExistException;
|
||||
import pro.taskana.exceptions.TaskNotFoundException;
|
||||
import pro.taskana.exceptions.WorkbasketNotFoundException;
|
||||
import pro.taskana.impl.JobTaskRunner;
|
||||
import pro.taskana.jobs.JobRunner;
|
||||
import pro.taskana.jobs.TaskCleanupJob;
|
||||
import pro.taskana.security.JAASRunner;
|
||||
import pro.taskana.security.WithAccessId;
|
||||
|
||||
|
@ -37,7 +28,7 @@ import pro.taskana.security.WithAccessId;
|
|||
* Acceptance test for all "jobs tasks runner" scenarios.
|
||||
*/
|
||||
@RunWith(JAASRunner.class)
|
||||
public class JobTaskRunnerAccTest extends AbstractAccTest {
|
||||
public class JobRunnerAccTest extends AbstractAccTest {
|
||||
|
||||
TaskService taskService;
|
||||
|
||||
|
@ -50,31 +41,30 @@ public class JobTaskRunnerAccTest extends AbstractAccTest {
|
|||
userName = "teamlead_1",
|
||||
groupNames = {"group_1", "group_2"})
|
||||
@Test
|
||||
public void shouldCleanCompletedTasksUntilDate() {
|
||||
public void shouldCleanCompletedTasksUntilDate() throws Exception {
|
||||
|
||||
JobTaskRunner runner = new JobTaskRunner(taskanaEngine);
|
||||
JobRunner runner = new JobRunner(taskanaEngine);
|
||||
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.minusDays(14)
|
||||
.toInstant();
|
||||
|
||||
List<TaskSummary> tasksCompletedUntilDateBefore = getTaskCompletedUntilDate(completeUntilDate);
|
||||
BulkOperationResults<String, Exception> results = runner.runCleanCompletedTasks(completeUntilDate);
|
||||
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, completeUntilDate);
|
||||
runner.runJob(job);
|
||||
List<TaskSummary> tasksCompletedUntilDateAfter = getTaskCompletedUntilDate(completeUntilDate);
|
||||
|
||||
assertFalse(results.containsErrors());
|
||||
assertTrue(tasksCompletedUntilDateBefore.size() > 0);
|
||||
assertTrue(tasksCompletedUntilDateAfter.size() == 0);
|
||||
|
||||
}
|
||||
|
||||
@WithAccessId(
|
||||
userName = "teamlead_1",
|
||||
groupNames = {"group_1", "group_2"})
|
||||
@Test
|
||||
public void shouldNotCleanCompleteTasksAfterDefinedDay()
|
||||
throws TaskNotFoundException, NotAuthorizedException, InvalidStateException, InvalidOwnerException,
|
||||
TaskAlreadyExistException, InvalidArgumentException, WorkbasketNotFoundException,
|
||||
ClassificationNotFoundException {
|
||||
throws Exception {
|
||||
|
||||
Task newTask = taskService.newTask("USER_1_1", "DOMAIN_A");
|
||||
newTask.setClassificationKey("T2100");
|
||||
|
@ -83,12 +73,16 @@ public class JobTaskRunnerAccTest extends AbstractAccTest {
|
|||
taskService.claim(createdTask.getId());
|
||||
taskService.completeTask(createdTask.getId());
|
||||
|
||||
JobTaskRunner runner = new JobTaskRunner(taskanaEngine);
|
||||
JobRunner runner = new JobRunner(taskanaEngine);
|
||||
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.minusDays(14)
|
||||
.toInstant();
|
||||
runner.runCleanCompletedTasks(completeUntilDate);
|
||||
|
||||
List<TaskSummary> tasksCompletedUntilDateBefore = getTaskCompletedUntilDate(completeUntilDate);
|
||||
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, completeUntilDate);
|
||||
runner.runJob(job);
|
||||
|
||||
Task completedCreatedTask = taskService.getTask(createdTask.getId());
|
||||
assertNotNull(completedCreatedTask);
|
||||
|
|
@ -159,6 +159,12 @@
|
|||
<version>${spring.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
|
@ -166,12 +172,6 @@
|
|||
<version>${spring-boot.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-context</artifactId>
|
||||
<version>${spring.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-jdbc</artifactId>
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package pro.taskana.transaction;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@Component
|
||||
public class SpringTransactionProvider implements TaskanaTransactionProvider<Object> {
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Object executeInTransaction(
|
||||
TaskanaCallable<Object> action) {
|
||||
return action.call();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import java.security.Principal;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TaskanaRole;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
import pro.taskana.security.UserPrincipal;
|
||||
import pro.taskana.transaction.TaskanaTransactionProvider;
|
||||
|
||||
/**
|
||||
* This class invokes the JobRunner periodically to schedule long running jobs.
|
||||
*/
|
||||
@Component
|
||||
public class JobScheduler {
|
||||
|
||||
private final long untilDays = 14;
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);
|
||||
private static AtomicBoolean jobRunning = new AtomicBoolean(false);
|
||||
|
||||
@Autowired
|
||||
private TaskanaEngine taskanaEngine;
|
||||
|
||||
@Autowired
|
||||
TaskanaTransactionProvider<Object> springTransactionProvider;
|
||||
|
||||
@Scheduled(cron = "${taskana.jobscheduler.async.cron}")
|
||||
public void triggerJobs() {
|
||||
LOGGER.info("AsyncJobs started.");
|
||||
boolean otherJobActive = jobRunning.getAndSet(true);
|
||||
if (!otherJobActive) { // only one job should be active at any time
|
||||
try {
|
||||
pro.taskana.impl.JobRunner runner = new pro.taskana.impl.JobRunner(taskanaEngine);
|
||||
runner.registerTransactionProvider(springTransactionProvider);
|
||||
LOGGER.info("Running Jobs");
|
||||
BulkOperationResults<String, Exception> result = runner.runJobs();
|
||||
Map<String, Exception> errors = result.getErrorMap();
|
||||
LOGGER.info("AsyncJobs completed. Result = {} ", LoggerUtils.mapToString(errors));
|
||||
} finally {
|
||||
jobRunning.set(false);
|
||||
}
|
||||
} else {
|
||||
LOGGER.info("AsyncJobs: Don't run Jobs because already another JobRunner is running");
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(cron = "${taskana.jobscheduler.cleanup.cron}")
|
||||
public void triggerTaskCleanupJob() {
|
||||
LOGGER.info("CleanupJob started.");
|
||||
try {
|
||||
runTaskCleanupJobAsAdmin();
|
||||
LOGGER.info("CleanupJob completed.");
|
||||
} catch (PrivilegedActionException e) {
|
||||
LOGGER.error("CleanupJob failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Creates an admin subject and runs the job using the subject.
|
||||
*/
|
||||
private void runTaskCleanupJobAsAdmin() throws PrivilegedActionException {
|
||||
Subject.doAs(getAdminSubject(),
|
||||
new PrivilegedExceptionAction<Object>() {
|
||||
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
|
||||
try {
|
||||
JobRunner runner = new JobRunner(taskanaEngine);
|
||||
runner.registerTransactionProvider(springTransactionProvider);
|
||||
Instant completedBefore = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.minusDays(untilDays)
|
||||
.toInstant();
|
||||
|
||||
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, completedBefore);
|
||||
runner.runJobWithRetries(job);
|
||||
|
||||
return "Successful";
|
||||
} catch (Throwable e) {
|
||||
throw new Exception(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private Subject getAdminSubject() {
|
||||
Subject subject = new Subject();
|
||||
List<Principal> principalList = new ArrayList<>();
|
||||
try {
|
||||
principalList
|
||||
.add(new UserPrincipal(
|
||||
taskanaEngine.getConfiguration().getRoleMap().get(TaskanaRole.ADMIN).iterator().next()));
|
||||
} catch (Throwable t) {
|
||||
LOGGER.warn("Could not determine a configured admin user.", t);
|
||||
}
|
||||
subject.getPrincipals().addAll(principalList);
|
||||
return subject;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import pro.taskana.transaction.TaskanaTransactionProvider;
|
||||
|
||||
/**
|
||||
* Configuration class for Spring sample application.
|
||||
*/
|
||||
@Configuration
|
||||
public class TransactionalJobsConfiguration {
|
||||
|
||||
@Bean
|
||||
public TaskanaTransactionProvider<Object> springTransactionProvider() {
|
||||
return new pro.taskana.transaction.SpringTransactionProvider();
|
||||
}
|
||||
|
||||
}
|
|
@ -11,6 +11,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
@ -18,6 +19,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
|||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
import pro.taskana.jobs.TransactionalJobsConfiguration;
|
||||
import pro.taskana.ldap.LdapCacheTestImpl;
|
||||
import pro.taskana.ldap.LdapClient;
|
||||
import pro.taskana.ldap.LdapConfiguration;
|
||||
|
@ -28,7 +30,8 @@ import pro.taskana.sampledata.SampleDataGenerator;
|
|||
*/
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
@Import({SampleConfiguration.class, LdapConfiguration.class, RestConfiguration.class})
|
||||
@ComponentScan(basePackages = "pro.taskana")
|
||||
@Import({TransactionalJobsConfiguration.class, LdapConfiguration.class, RestConfiguration.class})
|
||||
public class ExampleRestApplication {
|
||||
|
||||
@Autowired
|
||||
|
|
|
@ -1,77 +0,0 @@
|
|||
package pro.taskana.rest;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
import pro.taskana.impl.JobRunner;
|
||||
import pro.taskana.impl.JobTaskRunner;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
|
||||
/**
|
||||
* This class invokes the JobRunner periodically to schedule long running jobs.
|
||||
*
|
||||
* @author bbr
|
||||
*/
|
||||
@Component
|
||||
public class JobScheduler {
|
||||
|
||||
private final long untilDays = 14;
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);
|
||||
private static AtomicBoolean jobRunning = new AtomicBoolean(false);
|
||||
|
||||
@Autowired
|
||||
private TaskanaEngine taskanaEngine;
|
||||
|
||||
@Autowired
|
||||
TaskanaTransactionProvider<BulkOperationResults<String, Exception>> springTransactionProvider;
|
||||
|
||||
@Scheduled(cron = "${taskana.jobscheduler.cron}")
|
||||
public void triggerJobs() {
|
||||
boolean otherJobActive = jobRunning.getAndSet(true);
|
||||
if (!otherJobActive) { // only one job should be active at any time
|
||||
try {
|
||||
JobRunner runner = new JobRunner(taskanaEngine);
|
||||
runner.registerTransactionProvider(springTransactionProvider);
|
||||
LOGGER.info("Running Jobs");
|
||||
BulkOperationResults<String, Exception> result = runner.runJobs();
|
||||
Map<String, Exception> errors = result.getErrorMap();
|
||||
LOGGER.info("Job run completed. Result = {} ", LoggerUtils.mapToString(errors));
|
||||
} finally {
|
||||
jobRunning.set(false);
|
||||
}
|
||||
} else {
|
||||
LOGGER.info("Don't run Jobs because already another JobRunner is running");
|
||||
}
|
||||
}
|
||||
|
||||
// Run everyDay at mid night
|
||||
@Scheduled(cron = "0 0 0 * * *")
|
||||
public void triggerTaskCompletedCleanUpJob() {
|
||||
LOGGER.info("triggerTaskCompletedCleanUpJob");
|
||||
JobTaskRunner runner = new JobTaskRunner(taskanaEngine);
|
||||
runner.registerTransactionProvider(springTransactionProvider);
|
||||
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.minusDays(untilDays)
|
||||
.toInstant();
|
||||
|
||||
BulkOperationResults<String, Exception> result = runner.runCleanCompletedTasks(completeUntilDate);
|
||||
Map<String, Exception> errors = result.getErrorMap();
|
||||
|
||||
LOGGER.info("triggerTaskCompletedCleanUpJob Completed Result = {} ", LoggerUtils.mapToString(errors));
|
||||
}
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
package pro.taskana.rest;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
|
||||
/**
|
||||
* Configuration class for Spring sample application.
|
||||
*/
|
||||
@Configuration
|
||||
public class SampleConfiguration {
|
||||
|
||||
@Bean
|
||||
public TaskanaTransactionProvider<BulkOperationResults<String, Exception>> springTransactionProvider() {
|
||||
return new SpringTransactionProvider();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
package pro.taskana.rest;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaCallable;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
|
||||
@Component
|
||||
public class SpringTransactionProvider implements TaskanaTransactionProvider<BulkOperationResults<String, Exception>> {
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public BulkOperationResults<String, Exception> executeInTransaction(
|
||||
TaskanaCallable<BulkOperationResults<String, Exception>> action) {
|
||||
return action.call();
|
||||
}
|
||||
|
||||
}
|
|
@ -28,7 +28,8 @@ taskana.ldap.groupNameAttribute=cn
|
|||
taskana.ldap.minSearchForLength=3
|
||||
taskana.ldap.maxNumberOfReturnedAccessIds=50
|
||||
####### JobScheduler cron expression that specifies when the JobSchedler runs
|
||||
taskana.jobscheduler.cron=0 * * * * *
|
||||
taskana.jobscheduler.async.cron=0 * * * * *
|
||||
taskana.jobscheduler.cleanup.cron=58 * * * * *
|
||||
####### cache static resources properties
|
||||
spring.resources.cache.cachecontrol.cache-private=true
|
||||
####### tomcat is not detecting the x-forward headers from bluemix as a trustworthy proxy
|
||||
|
|
|
@ -6,5 +6,5 @@ taskana.domains=DOMAIN_A,DOMAIN_B,DOMAIN_C
|
|||
taskana.classification.types=TASK,DOCUMENT
|
||||
taskana.classification.categories= EXTERNAL , manual, autoMAtic ,Process
|
||||
|
||||
taskana.jobs.taskupdate.maxRetries=3
|
||||
taskana.jobs.taskupdate.batchSize=50
|
||||
taskana.jobs.maxRetries=3
|
||||
taskana.jobs.batchSize=50
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import org.json.JSONObject;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.hateoas.Link;
|
||||
import org.springframework.hateoas.hal.Jackson2HalModule;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.converter.HttpMessageConverter;
|
||||
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
|
||||
import org.springframework.scheduling.TriggerContext;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import pro.taskana.Classification;
|
||||
import pro.taskana.Task;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.rest.RestConfiguration;
|
||||
import pro.taskana.rest.resource.ClassificationResource;
|
||||
import pro.taskana.rest.resource.TaskResource;
|
||||
import pro.taskana.rest.resource.assembler.ClassificationResourceAssembler;
|
||||
import pro.taskana.rest.resource.assembler.TaskResourceAssembler;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = RestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT,
|
||||
properties = {"devMode=true"})
|
||||
public class AsyncUpdateJobIntTest {
|
||||
|
||||
@Autowired
|
||||
private ClassificationResourceAssembler classificationResourceAssembler;
|
||||
|
||||
@Autowired
|
||||
private TaskResourceAssembler taskResourceAssembler;
|
||||
|
||||
@Autowired
|
||||
Environment env;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUpdateJobIntTest.class);
|
||||
String server = "http://127.0.0.1:";
|
||||
RestTemplate template;
|
||||
HttpEntity<String> request;
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
@LocalServerPort
|
||||
int port;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
template = getRestTemplate();
|
||||
headers.add("Authorization", "Basic dGVhbWxlYWRfMTp0ZWFtbGVhZF8x");
|
||||
request = new HttpEntity<String>(headers);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateClassificationPrioServiceLevel()
|
||||
throws IOException, InterruptedException, InvalidArgumentException {
|
||||
|
||||
// 1st step: get old classification :
|
||||
Instant before = Instant.now();
|
||||
|
||||
ResponseEntity<ClassificationResource> response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/classifications/CLI:100000000000000000000000000000000003",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<ClassificationResource>() {
|
||||
|
||||
});
|
||||
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
ClassificationResource classification = response.getBody();
|
||||
|
||||
// 2nd step: modify classification and trigger update
|
||||
classification.removeLinks();
|
||||
classification.setServiceLevel("P5D");
|
||||
classification.setPriority(1000);
|
||||
|
||||
String updatedClassification = new JSONObject(classification).toString();
|
||||
|
||||
URL url = new URL(server + port + "/v1/classifications/CLI:100000000000000000000000000000000003");
|
||||
HttpURLConnection con = (HttpURLConnection) url.openConnection();
|
||||
con.setRequestMethod("PUT");
|
||||
con.setRequestProperty("Authorization", "Basic dGVhbWxlYWRfMTp0ZWFtbGVhZF8x");
|
||||
con.setDoOutput(true);
|
||||
con.setRequestProperty("Content-Type", "application/json");
|
||||
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(con.getOutputStream()));
|
||||
out.write(updatedClassification);
|
||||
out.flush();
|
||||
out.close();
|
||||
assertEquals(200, con.getResponseCode());
|
||||
con.disconnect();
|
||||
|
||||
// wait until the next trigger time + 2 seconds to give JobScheduler a chance to run
|
||||
String cron = env.getProperty("taskana.jobscheduler.async.cron");
|
||||
CronTrigger trigger = new CronTrigger(cron);
|
||||
TriggerContext context = new TriggerContext() {
|
||||
|
||||
@Override
|
||||
public Date lastScheduledExecutionTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date lastActualExecutionTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date lastCompletionTime() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Date now = new Date();
|
||||
long delay = trigger.nextExecutionTime(context).getTime() - now.getTime() + 2000;
|
||||
|
||||
LOGGER.info("About to sleep for " + delay / 1000
|
||||
+ " seconds to give JobScheduler a chance to process the classification change");
|
||||
Thread.sleep(delay);
|
||||
LOGGER.info("Sleeping ended. Continuing .... ");
|
||||
|
||||
// verify the classification modified timestamp is after 'before'
|
||||
ResponseEntity<ClassificationResource> repeatedResponse = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/classifications/CLI:100000000000000000000000000000000003",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<ClassificationResource>() {
|
||||
|
||||
});
|
||||
|
||||
ClassificationResource modifiedClassificationResource = repeatedResponse.getBody();
|
||||
Classification modifiedClassification = classificationResourceAssembler.toModel(modifiedClassificationResource);
|
||||
|
||||
assertTrue(!before.isAfter(modifiedClassification.getModified()));
|
||||
|
||||
List<String> affectedTasks = new ArrayList<>(
|
||||
Arrays.asList("TKI:000000000000000000000000000000000003", "TKI:000000000000000000000000000000000004",
|
||||
"TKI:000000000000000000000000000000000005", "TKI:000000000000000000000000000000000006",
|
||||
"TKI:000000000000000000000000000000000007", "TKI:000000000000000000000000000000000008",
|
||||
"TKI:000000000000000000000000000000000009", "TKI:000000000000000000000000000000000010",
|
||||
"TKI:000000000000000000000000000000000011", "TKI:000000000000000000000000000000000012",
|
||||
"TKI:000000000000000000000000000000000013", "TKI:000000000000000000000000000000000014",
|
||||
"TKI:000000000000000000000000000000000015", "TKI:000000000000000000000000000000000016",
|
||||
"TKI:000000000000000000000000000000000017", "TKI:000000000000000000000000000000000018",
|
||||
"TKI:000000000000000000000000000000000019", "TKI:000000000000000000000000000000000020",
|
||||
"TKI:000000000000000000000000000000000021", "TKI:000000000000000000000000000000000022",
|
||||
"TKI:000000000000000000000000000000000023", "TKI:000000000000000000000000000000000024",
|
||||
"TKI:000000000000000000000000000000000025", "TKI:000000000000000000000000000000000026",
|
||||
"TKI:000000000000000000000000000000000027", "TKI:000000000000000000000000000000000028",
|
||||
"TKI:000000000000000000000000000000000029", "TKI:000000000000000000000000000000000030",
|
||||
"TKI:000000000000000000000000000000000031", "TKI:000000000000000000000000000000000032",
|
||||
"TKI:000000000000000000000000000000000033", "TKI:000000000000000000000000000000000034",
|
||||
"TKI:000000000000000000000000000000000035", "TKI:000000000000000000000000000000000100",
|
||||
"TKI:000000000000000000000000000000000101", "TKI:000000000000000000000000000000000102",
|
||||
"TKI:000000000000000000000000000000000103"));
|
||||
for (String taskId : affectedTasks) {
|
||||
verifyTaskIsModifiedAfter(taskId, before);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void verifyTaskIsModifiedAfter(String taskId, Instant before) throws InvalidArgumentException {
|
||||
RestTemplate admTemplate = getRestTemplate();
|
||||
HttpHeaders admHeaders = new HttpHeaders();
|
||||
admHeaders.add("Authorization", "Basic YWRtaW46YWRtaW4="); // admin:admin
|
||||
|
||||
HttpEntity<String> admRequest = new HttpEntity<String>(admHeaders);
|
||||
|
||||
ResponseEntity<TaskResource> taskResponse = admTemplate.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks/" + taskId,
|
||||
HttpMethod.GET,
|
||||
admRequest,
|
||||
new ParameterizedTypeReference<TaskResource>() {
|
||||
|
||||
});
|
||||
|
||||
TaskResource taskResource = taskResponse.getBody();
|
||||
Task task = taskResourceAssembler.toModel(taskResource);
|
||||
|
||||
assertTrue(!before.isAfter(task.getModified()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a REST template which is capable of dealing with responses in HAL format
|
||||
*
|
||||
* @return RestTemplate
|
||||
*/
|
||||
private RestTemplate getRestTemplate() {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
mapper.registerModule(new Jackson2HalModule());
|
||||
|
||||
MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
|
||||
converter.setSupportedMediaTypes(MediaType.parseMediaTypes("application/hal+json"));
|
||||
converter.setObjectMapper(mapper);
|
||||
|
||||
RestTemplate template = new RestTemplate(Collections.<HttpMessageConverter<?>> singletonList(converter));
|
||||
return template;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
package pro.taskana.jobs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.hateoas.Link;
|
||||
import org.springframework.hateoas.PagedResources;
|
||||
import org.springframework.hateoas.hal.Jackson2HalModule;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.converter.HttpMessageConverter;
|
||||
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
|
||||
import org.springframework.scheduling.TriggerContext;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import pro.taskana.Task;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.exceptions.SystemException;
|
||||
import pro.taskana.rest.RestConfiguration;
|
||||
import pro.taskana.rest.resource.TaskResource;
|
||||
import pro.taskana.rest.resource.TaskSummaryResource;
|
||||
import pro.taskana.rest.resource.assembler.TaskResourceAssembler;
|
||||
import pro.taskana.sampledata.SampleDataGenerator;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = RestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT,
|
||||
properties = {"devMode=true"})
|
||||
public class TaskCleanupJobIntTest {
|
||||
|
||||
@Autowired
|
||||
private TaskResourceAssembler taskResourceAssembler;
|
||||
|
||||
@Autowired
|
||||
private DataSource dataSource;
|
||||
|
||||
@Autowired
|
||||
Environment env;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskCleanupJobIntTest.class);
|
||||
String server = "http://127.0.0.1:";
|
||||
RestTemplate template;
|
||||
HttpEntity<String> request;
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
@LocalServerPort
|
||||
int port;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
resetDb();
|
||||
template = getRestTemplate();
|
||||
headers.add("Authorization", "Basic dGVhbWxlYWRfMTp0ZWFtbGVhZF8x");
|
||||
request = new HttpEntity<String>(headers);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCleanupCompletedTasksJob()
|
||||
throws InterruptedException {
|
||||
|
||||
RestTemplate template = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Authorization", "Basic YWRtaW46YWRtaW4="); // Role Admin
|
||||
HttpEntity<String> request = new HttpEntity<String>(headers);
|
||||
ResponseEntity<PagedResources<TaskSummaryResource>> response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks", HttpMethod.GET, request,
|
||||
new ParameterizedTypeReference<PagedResources<TaskSummaryResource>>() {
|
||||
});
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
assertEquals(71, response.getBody().getContent().size());
|
||||
|
||||
// wait until the next trigger time + 2 seconds to give JobScheduler a chance to run
|
||||
String cron = env.getProperty("taskana.jobscheduler.async.cron");
|
||||
CronTrigger trigger = new CronTrigger(cron);
|
||||
TriggerContext context = new TriggerContext() {
|
||||
|
||||
@Override
|
||||
public Date lastScheduledExecutionTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date lastActualExecutionTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date lastCompletionTime() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Date now = new Date();
|
||||
long delay = trigger.nextExecutionTime(context).getTime() - now.getTime() + 2000;
|
||||
|
||||
LOGGER.info("About to sleep for " + delay / 1000
|
||||
+ " seconds to give JobScheduler a chance to process the task cleanup");
|
||||
Thread.sleep(delay);
|
||||
LOGGER.info("Sleeping ended. Continuing .... ");
|
||||
|
||||
// verify the 25 completed tasks have been deleted.
|
||||
template = getRestTemplate();
|
||||
headers = new HttpHeaders();
|
||||
headers.add("Authorization", "Basic YWRtaW46YWRtaW4="); // Role Admin
|
||||
request = new HttpEntity<String>(headers);
|
||||
response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks", HttpMethod.GET, request,
|
||||
new ParameterizedTypeReference<PagedResources<TaskSummaryResource>>() {
|
||||
});
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
// TODO
|
||||
assertEquals(66, response.getBody().getContent().size());
|
||||
}
|
||||
|
||||
private void verifyTaskIsModifiedAfter(String taskId, Instant before) throws InvalidArgumentException {
|
||||
RestTemplate admTemplate = getRestTemplate();
|
||||
HttpHeaders admHeaders = new HttpHeaders();
|
||||
admHeaders.add("Authorization", "Basic YWRtaW46YWRtaW4="); // admin:admin
|
||||
|
||||
HttpEntity<String> admRequest = new HttpEntity<String>(admHeaders);
|
||||
|
||||
ResponseEntity<TaskResource> taskResponse = admTemplate.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks/" + taskId,
|
||||
HttpMethod.GET,
|
||||
admRequest,
|
||||
new ParameterizedTypeReference<TaskResource>() {
|
||||
|
||||
});
|
||||
|
||||
TaskResource taskResource = taskResponse.getBody();
|
||||
Task task = taskResourceAssembler.toModel(taskResource);
|
||||
|
||||
assertTrue(!before.isAfter(task.getModified()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a REST template which is capable of dealing with responses in HAL format
|
||||
*
|
||||
* @return RestTemplate
|
||||
*/
|
||||
private RestTemplate getRestTemplate() {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
mapper.registerModule(new Jackson2HalModule());
|
||||
|
||||
MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
|
||||
converter.setSupportedMediaTypes(MediaType.parseMediaTypes("application/hal+json"));
|
||||
converter.setObjectMapper(mapper);
|
||||
|
||||
RestTemplate template = new RestTemplate(Collections.<HttpMessageConverter<?>> singletonList(converter));
|
||||
return template;
|
||||
}
|
||||
|
||||
public void resetDb() {
|
||||
SampleDataGenerator sampleDataGenerator;
|
||||
try {
|
||||
sampleDataGenerator = new SampleDataGenerator(dataSource);
|
||||
sampleDataGenerator.generateSampleData();
|
||||
} catch (SQLException e) {
|
||||
throw new SystemException("tried to reset DB and caught Exception " + e, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -10,13 +10,8 @@ import java.io.OutputStreamWriter;
|
|||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import org.json.JSONObject;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -39,8 +34,6 @@ import org.springframework.http.MediaType;
|
|||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.converter.HttpMessageConverter;
|
||||
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
|
||||
import org.springframework.scheduling.TriggerContext;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.web.client.HttpClientErrorException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
@ -48,10 +41,8 @@ import org.springframework.web.client.RestTemplate;
|
|||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import pro.taskana.Classification;
|
||||
import pro.taskana.Task;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.rest.resource.ClassificationResource;
|
||||
import pro.taskana.rest.resource.ClassificationSummaryResource;
|
||||
import pro.taskana.rest.resource.TaskResource;
|
||||
import pro.taskana.rest.resource.assembler.ClassificationResourceAssembler;
|
||||
|
@ -328,112 +319,6 @@ public class ClassificationControllerIntTest {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateClassificationPrioServiceLevel()
|
||||
throws IOException, InterruptedException, InvalidArgumentException {
|
||||
|
||||
// 1st step: get old classification :
|
||||
Instant before = Instant.now();
|
||||
|
||||
ResponseEntity<ClassificationResource> response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/classifications/CLI:100000000000000000000000000000000003",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<ClassificationResource>() {
|
||||
|
||||
});
|
||||
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
ClassificationResource classification = response.getBody();
|
||||
|
||||
// 2nd step: modify classification and trigger update
|
||||
classification.removeLinks();
|
||||
classification.setServiceLevel("P5D");
|
||||
classification.setPriority(1000);
|
||||
|
||||
String updatedClassification = new JSONObject(classification).toString();
|
||||
|
||||
URL url = new URL(server + port + "/v1/classifications/CLI:100000000000000000000000000000000003");
|
||||
HttpURLConnection con = (HttpURLConnection) url.openConnection();
|
||||
con.setRequestMethod("PUT");
|
||||
con.setRequestProperty("Authorization", "Basic dGVhbWxlYWRfMTp0ZWFtbGVhZF8x");
|
||||
con.setDoOutput(true);
|
||||
con.setRequestProperty("Content-Type", "application/json");
|
||||
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(con.getOutputStream()));
|
||||
out.write(updatedClassification);
|
||||
out.flush();
|
||||
out.close();
|
||||
assertEquals(200, con.getResponseCode());
|
||||
con.disconnect();
|
||||
|
||||
// wait until the next trigger time + 2 seconds to give JobScheduler a chance to run
|
||||
String cron = env.getProperty("taskana.jobscheduler.cron");
|
||||
CronTrigger trigger = new CronTrigger(cron);
|
||||
TriggerContext context = new TriggerContext() {
|
||||
|
||||
@Override
|
||||
public Date lastScheduledExecutionTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date lastActualExecutionTime() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date lastCompletionTime() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Date now = new Date();
|
||||
long delay = trigger.nextExecutionTime(context).getTime() - now.getTime() + 2000;
|
||||
|
||||
LOGGER.info("About to sleep for " + delay / 1000
|
||||
+ " seconds to give JobScheduler a chance to process the classification change");
|
||||
Thread.sleep(delay);
|
||||
LOGGER.info("Sleeping ended. Continuing .... ");
|
||||
|
||||
// verify the classification modified timestamp is after 'before'
|
||||
ResponseEntity<ClassificationResource> repeatedResponse = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/classifications/CLI:100000000000000000000000000000000003",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<ClassificationResource>() {
|
||||
|
||||
});
|
||||
|
||||
ClassificationResource modifiedClassificationResource = repeatedResponse.getBody();
|
||||
Classification modifiedClassification = classificationResourceAssembler.toModel(modifiedClassificationResource);
|
||||
|
||||
assertTrue(!before.isAfter(modifiedClassification.getModified()));
|
||||
|
||||
List<String> affectedTasks = new ArrayList<>(
|
||||
Arrays.asList("TKI:000000000000000000000000000000000003", "TKI:000000000000000000000000000000000004",
|
||||
"TKI:000000000000000000000000000000000005", "TKI:000000000000000000000000000000000006",
|
||||
"TKI:000000000000000000000000000000000007", "TKI:000000000000000000000000000000000008",
|
||||
"TKI:000000000000000000000000000000000009", "TKI:000000000000000000000000000000000010",
|
||||
"TKI:000000000000000000000000000000000011", "TKI:000000000000000000000000000000000012",
|
||||
"TKI:000000000000000000000000000000000013", "TKI:000000000000000000000000000000000014",
|
||||
"TKI:000000000000000000000000000000000015", "TKI:000000000000000000000000000000000016",
|
||||
"TKI:000000000000000000000000000000000017", "TKI:000000000000000000000000000000000018",
|
||||
"TKI:000000000000000000000000000000000019", "TKI:000000000000000000000000000000000020",
|
||||
"TKI:000000000000000000000000000000000021", "TKI:000000000000000000000000000000000022",
|
||||
"TKI:000000000000000000000000000000000023", "TKI:000000000000000000000000000000000024",
|
||||
"TKI:000000000000000000000000000000000025", "TKI:000000000000000000000000000000000026",
|
||||
"TKI:000000000000000000000000000000000027", "TKI:000000000000000000000000000000000028",
|
||||
"TKI:000000000000000000000000000000000029", "TKI:000000000000000000000000000000000030",
|
||||
"TKI:000000000000000000000000000000000031", "TKI:000000000000000000000000000000000032",
|
||||
"TKI:000000000000000000000000000000000033", "TKI:000000000000000000000000000000000034",
|
||||
"TKI:000000000000000000000000000000000035", "TKI:000000000000000000000000000000000100",
|
||||
"TKI:000000000000000000000000000000000101", "TKI:000000000000000000000000000000000102",
|
||||
"TKI:000000000000000000000000000000000103"));
|
||||
for (String taskId : affectedTasks) {
|
||||
verifyTaskIsModifiedAfter(taskId, before);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void verifyTaskIsModifiedAfter(String taskId, Instant before) throws InvalidArgumentException {
|
||||
RestTemplate admTemplate = getRestTemplate();
|
||||
HttpHeaders admHeaders = new HttpHeaders();
|
||||
|
@ -469,7 +354,7 @@ public class ClassificationControllerIntTest {
|
|||
converter.setSupportedMediaTypes(MediaType.parseMediaTypes("application/hal+json"));
|
||||
converter.setObjectMapper(mapper);
|
||||
|
||||
RestTemplate template = new RestTemplate(Collections.<HttpMessageConverter<?>>singletonList(converter));
|
||||
RestTemplate template = new RestTemplate(Collections.<HttpMessageConverter<?>> singletonList(converter));
|
||||
return template;
|
||||
}
|
||||
|
||||
|
|
|
@ -86,15 +86,15 @@ public class TaskControllerIntTest {
|
|||
public void testGetAllTasksByWorkbasketId() {
|
||||
RestTemplate template = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Authorization", "Basic dXNlcl8xXzI6dXNlcl8xXzI="); // user_1_2
|
||||
headers.add("Authorization", "Basic dGVhbWxlYWRfMTp0ZWFtbGVhZF8x"); // teamlead_1
|
||||
HttpEntity<String> request = new HttpEntity<String>(headers);
|
||||
ResponseEntity<PagedResources<TaskSummaryResource>> response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks?workbasket-id=WBI:100000000000000000000000000000000007",
|
||||
"http://127.0.0.1:" + port + "/v1/tasks?workbasket-id=WBI:100000000000000000000000000000000001",
|
||||
HttpMethod.GET, request,
|
||||
new ParameterizedTypeReference<PagedResources<TaskSummaryResource>>() {
|
||||
});
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
assertEquals(20, response.getBody().getContent().size());
|
||||
assertEquals(22, response.getBody().getContent().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -188,19 +188,21 @@ public class TaskControllerIntTest {
|
|||
headers.add("Authorization", "Basic YWRtaW46YWRtaW4="); // Role Admin
|
||||
HttpEntity<String> request = new HttpEntity<String>(headers);
|
||||
ResponseEntity<PagedResources<TaskSummaryResource>> response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks?sortBy=por.value&order=desc&page=15&page-size=5", HttpMethod.GET,
|
||||
"http://127.0.0.1:" + port
|
||||
+ "/v1/tasks?state=READY,CLAIMED&sortBy=por.value&order=desc&page=15&page-size=5",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<PagedResources<TaskSummaryResource>>() {
|
||||
});
|
||||
assertEquals(3, response.getBody().getContent().size());
|
||||
assertTrue(response.getBody().getLink(Link.REL_LAST).getHref().contains("page=15"));
|
||||
assertEquals("TKI:000000000000000000000000000000000039",
|
||||
assertEquals(1, response.getBody().getContent().size());
|
||||
assertTrue(response.getBody().getLink(Link.REL_LAST).getHref().contains("page=14"));
|
||||
assertEquals("TKI:100000000000000000000000000000000000",
|
||||
response.getBody().getContent().iterator().next().getTaskId());
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
assertTrue(response.getBody()
|
||||
.getLink(Link.REL_SELF)
|
||||
.getHref()
|
||||
.endsWith("/v1/tasks?sortBy=por.value&order=desc&page=15&page-size=5"));
|
||||
.endsWith("/v1/tasks?state=READY,CLAIMED&sortBy=por.value&order=desc&page=15&page-size=5"));
|
||||
assertNotNull(response.getBody().getLink("allTasks"));
|
||||
assertTrue(response.getBody()
|
||||
.getLink("allTasks")
|
||||
|
@ -226,7 +228,7 @@ public class TaskControllerIntTest {
|
|||
new ParameterizedTypeReference<PagedResources<TaskSummaryResource>>() {
|
||||
});
|
||||
assertEquals(25, response.getBody().getContent().size());
|
||||
|
||||
|
||||
response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks?sortBy=due&order=desc&page=5&page-size=5", HttpMethod.GET,
|
||||
request,
|
||||
|
|
Loading…
Reference in New Issue