TSK-629: Hamonized all job handling.

This commit is contained in:
Holger Hagen 2018-07-25 11:37:32 +02:00 committed by Martin Rojas Miguel Angel
parent 3e0334ee6f
commit 2981dad384
34 changed files with 754 additions and 771 deletions

View File

@ -0,0 +1,15 @@
package pro.taskana;
import pro.taskana.jobs.ScheduledJob;
/**
* Service to manage the TASKANA jobs.
*/
public interface JobService {
/**
* Create a schedule a new job.
*/
ScheduledJob createJob(ScheduledJob job);
}

View File

@ -38,6 +38,13 @@ public interface TaskanaEngine {
*/
ClassificationService getClassificationService();
/**
* The JobService can be user for all job operations.
*
* @return the JobService
*/
JobService getJobService();
/**
* The Taskana configuration.
*
@ -104,4 +111,5 @@ public interface TaskanaEngine {
AUTOCOMMIT,
EXPLICIT
}
}

View File

@ -6,6 +6,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
@ -46,11 +48,14 @@ public class TaskanaEngineConfiguration {
private static final String TASKANA_ROLES_SEPARATOR = "|";
private static final String TASKANA_JOB_BATCHSIZE = "taskana.jobs.batchSize";
private static final String TASKANA_JOB_RETRIES = "taskana.jobs.maxRetries";
private static final String TASKANA_JOB_TASK_CLEANUP_RUN_EVERY = "taskana.jobs.cleanup.runEvery";
private static final String TASKANA_JOB_TASK_CLEANUP_FIRST_RUN = "taskana.jobs.cleanup.firstRunAt";
private static final String TASKANA_JOB_TASK_CLEANUP_MINIMUM_AGE = "taskana.jobs.cleanup.minimumAge";
private static final String TASKANA_DOMAINS_PROPERTY = "taskana.domains";
private static final String TASKANA_CLASSIFICATION_TYPES_PROPERTY = "taskana.classification.types";
private static final String TASKANA_CLASSIFICATION_CATEGORIES_PROPERTY = "taskana.classification.categories";
protected static final String TASKANA_SCHEMA_VERSION = "0.9.2"; // must match the VERSION value in table
protected static final String TASKANA_SCHEMA_VERSION = "1.0.2"; // must match the VERSION value in table
// TASKANA.TASKANA_SCHEMA_VERSION
// Taskana properties file
@ -73,10 +78,15 @@ public class TaskanaEngineConfiguration {
private boolean germanPublicHolidaysEnabled;
private List<LocalDate> customHolidays;
// Properties for task-update Job execution on classification change
// Properties for generalo job execution
private int jobBatchSize = 100;
private int maxNumberOfJobRetries = 3;
// Properties for the cleanup job
private Instant taskCleanupJobFirstRun = Instant.parse("2018-01-01T00:00:00Z");
private Duration taskCleanupJobRunEvery = Duration.parse("P1D");
private Duration taskCleanupJobMinimumAge = Duration.parse("P14D");
// List of configured domain names
protected List<String> domains = new ArrayList<String>();
@ -140,17 +150,60 @@ public class TaskanaEngineConfiguration {
private void initJobParameters(Properties props) {
String jobBatchSizeProperty = props.getProperty(TASKANA_JOB_BATCHSIZE);
if (jobBatchSizeProperty != null && !jobBatchSizeProperty.isEmpty()) {
jobBatchSize = Integer.parseInt(jobBatchSizeProperty);
try {
jobBatchSize = Integer.parseInt(jobBatchSizeProperty);
} catch (Exception e) {
LOGGER.warn("Could not parse jobBatchSizeProperty ({}). Using default. Exception: {} ",
jobBatchSizeProperty, e.getMessage());
}
}
String maxNumberOfJobRetriesProperty = props.getProperty(TASKANA_JOB_RETRIES);
if (maxNumberOfJobRetriesProperty != null && !maxNumberOfJobRetriesProperty.isEmpty()) {
maxNumberOfJobRetries = Integer.parseInt(maxNumberOfJobRetriesProperty);
try {
maxNumberOfJobRetries = Integer.parseInt(maxNumberOfJobRetriesProperty);
} catch (Exception e) {
LOGGER.warn("Could not parse maxNumberOfJobRetriesProperty ({}). Using default. Exception: {} ",
maxNumberOfJobRetriesProperty, e.getMessage());
}
}
LOGGER.debug(
"Configured number of task updates per transaction: {}, number of retries of failed task updates: {}",
jobBatchSize, maxNumberOfJobRetries);
String taskCleanupJobFirstRunProperty = props.getProperty(TASKANA_JOB_TASK_CLEANUP_FIRST_RUN);
if (taskCleanupJobFirstRunProperty != null && !taskCleanupJobFirstRunProperty.isEmpty()) {
try {
taskCleanupJobFirstRun = Instant.parse(taskCleanupJobFirstRunProperty);
} catch (Exception e) {
LOGGER.warn("Could not parse taskCleanupJobFirstRunProperty ({}). Using default. Exception: {} ",
taskCleanupJobFirstRunProperty, e.getMessage());
}
}
String taskCleanupJobRunEveryProperty = props.getProperty(TASKANA_JOB_TASK_CLEANUP_RUN_EVERY);
if (taskCleanupJobRunEveryProperty != null && !taskCleanupJobRunEveryProperty.isEmpty()) {
try {
taskCleanupJobRunEvery = Duration.parse(taskCleanupJobRunEveryProperty);
} catch (Exception e) {
LOGGER.warn("Could not parse taskCleanupJobRunEveryProperty ({}). Using default. Exception: {} ",
taskCleanupJobRunEveryProperty, e.getMessage());
}
}
String taskCleanupJobMinimumAgeProperty = props.getProperty(TASKANA_JOB_TASK_CLEANUP_MINIMUM_AGE);
if (taskCleanupJobMinimumAgeProperty != null && !taskCleanupJobMinimumAgeProperty.isEmpty()) {
try {
taskCleanupJobMinimumAge = Duration.parse(taskCleanupJobMinimumAgeProperty);
} catch (Exception e) {
LOGGER.warn("Could not parse taskCleanupJobMinimumAgeProperty ({}). Using default. Exception: {} ",
taskCleanupJobMinimumAgeProperty, e.getMessage());
}
}
LOGGER.debug("Configured number of task updates per transaction: {}", jobBatchSize);
LOGGER.debug("Number of retries of failed task updates: {}", maxNumberOfJobRetries);
LOGGER.debug("TaskCleanupJob configuration: first run at {}", taskCleanupJobFirstRun);
LOGGER.debug("TaskCleanupJob configuration: runs every {}", taskCleanupJobRunEvery);
LOGGER.debug("TaskCleanupJob configuration: minimum age of tasks to be cleanup up is {}",
taskCleanupJobMinimumAge);
}
private void initDomains(Properties props) {
@ -372,6 +425,18 @@ public class TaskanaEngineConfiguration {
this.classificationCategories = classificationCategories;
}
public Instant getTaskCleanupJobFirstRun() {
return taskCleanupJobFirstRun;
}
public Duration getTaskCleanupJobRunEvery() {
return taskCleanupJobRunEvery;
}
public Duration getTaskCleanupJobMinimumAge() {
return taskCleanupJobMinimumAge;
}
/**
* Helper method to determine whether all access ids (user Id and group ids) should be used in lower case.
*
@ -380,4 +445,5 @@ public class TaskanaEngineConfiguration {
public static boolean shouldUseLowerCaseForAccessIds() {
return true;
}
}

View File

@ -1,97 +0,0 @@
package pro.taskana.impl;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.BulkOperationResults;
import pro.taskana.impl.util.LoggerUtils;
import pro.taskana.mappings.AttachmentMapper;
import pro.taskana.mappings.ClassificationMapper;
import pro.taskana.mappings.JobMapper;
import pro.taskana.mappings.TaskMapper;
/**
* This class executes a job of type CLASSIFICATIONCHANGEDJOB.
*
* @author bbr
*/
public class ClassificationChangedJobExecutor implements SingleJobExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClassificationChangedJobExecutor.class);
private TaskanaEngineImpl taskanaEngine;
private Job job;
private String classificationId;
private boolean priorityChanged;
private boolean serviceLevelChanged;
private TaskMapper taskMapper;
private ClassificationMapper classificationMapper;
private AttachmentMapper attachmentMapper;
@Override
public BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine) {
this.job = job;
this.taskanaEngine = taskanaEngine;
this.taskMapper = taskanaEngine.getSqlSession().getMapper(TaskMapper.class);
this.classificationMapper = taskanaEngine.getSqlSession().getMapper(ClassificationMapper.class);
this.attachmentMapper = taskanaEngine.getSqlSession().getMapper(AttachmentMapper.class);
Map<String, String> args = job.getArguments();
classificationId = args.get(CLASSIFICATION_ID);
priorityChanged = Boolean.parseBoolean(args.get(PRIORITY_CHANGED));
serviceLevelChanged = Boolean.parseBoolean(args.get(SERVICE_LEVEL_CHANGED));
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
bulkLog.addAllErrors(findAffectedTasksAndScheduleUpdateJobs());
return bulkLog;
}
private BulkOperationResults<String, Exception> findAffectedTasksAndScheduleUpdateJobs() {
List<TaskSummaryImpl> tasks = taskMapper.findTasksAffectedByClassificationChange(classificationId);
List<String> taskIdsFromAttachments = attachmentMapper
.findTaskIdsAffectedByClassificationChange(classificationId);
List<String> filteredTaskIdsFromAttachments = taskIdsFromAttachments.isEmpty() ? new ArrayList<>()
: taskMapper.filterTaskIdsForNotCompleted(taskIdsFromAttachments);
Set<String> affectedTaskIds = new HashSet<>(filteredTaskIdsFromAttachments);
for (TaskSummaryImpl task : tasks) {
affectedTaskIds.add(task.getTaskId());
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("the following tasks are affected by the update of classification {} : {}", classificationId,
LoggerUtils.setToString(affectedTaskIds));
}
int batchSize = taskanaEngine.getConfiguration().getMaxNumberOfTaskUpdatesPerTransaction();
List<List<String>> affectedTaskBatches = JobRunner.partition(affectedTaskIds, batchSize);
for (List<String> taskIdBatch : affectedTaskBatches) {
Map<String, String> args = new HashMap<>();
if (!taskIdBatch.isEmpty()) {
String taskIds = String.join(",", taskIdBatch);
args.put(ClassificationChangedJobExecutor.TASKIDS, taskIds);
args.put(PRIORITY_CHANGED, new Boolean(priorityChanged).toString());
args.put(SERVICE_LEVEL_CHANGED, new Boolean(serviceLevelChanged).toString());
Job job = new Job();
job.setCreated(Instant.now());
job.setState(Job.State.READY);
job.setRetryCount(0);
job.setType(Job.Type.UPDATETASKSJOB);
job.setExecutor(TaskUpdateJobExecutor.class.getName());
job.setArguments(args);
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(job);
}
}
return null;
}
}

View File

@ -29,8 +29,9 @@ import pro.taskana.exceptions.DomainNotFoundException;
import pro.taskana.exceptions.InvalidArgumentException;
import pro.taskana.exceptions.NotAuthorizedException;
import pro.taskana.impl.util.IdGenerator;
import pro.taskana.jobs.ClassificationChangedJob;
import pro.taskana.jobs.ScheduledJob;
import pro.taskana.mappings.ClassificationMapper;
import pro.taskana.mappings.JobMapper;
import pro.taskana.mappings.TaskMapper;
/**
@ -217,17 +218,14 @@ public class ClassificationServiceImpl implements ClassificationService {
if (priorityChanged || serviceLevelChanged) {
Map<String, String> args = new HashMap<>();
args.put(TaskUpdateJobExecutor.CLASSIFICATION_ID, classificationImpl.getId());
args.put(TaskUpdateJobExecutor.PRIORITY_CHANGED, String.valueOf(priorityChanged));
args.put(TaskUpdateJobExecutor.SERVICE_LEVEL_CHANGED,
args.put(ClassificationChangedJob.CLASSIFICATION_ID, classificationImpl.getId());
args.put(ClassificationChangedJob.PRIORITY_CHANGED, String.valueOf(priorityChanged));
args.put(ClassificationChangedJob.SERVICE_LEVEL_CHANGED,
String.valueOf(serviceLevelChanged));
Job job = new Job();
job.setCreated(Instant.now());
job.setState(Job.State.READY);
job.setExecutor(ClassificationChangedJobExecutor.class.getName());
ScheduledJob job = new ScheduledJob();
job.setArguments(args);
job.setType(Job.Type.CLASSIFICATIONCHANGEDJOB);
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(job);
job.setType(ScheduledJob.Type.CLASSIFICATIONCHANGEDJOB);
taskanaEngine.getJobService().createJob(job);
}
LOGGER.debug("Method updateClassification() updated the classification {}.",

View File

@ -1,341 +0,0 @@
package pro.taskana.impl;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.BulkOperationResults;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.SystemException;
import pro.taskana.impl.util.LoggerUtils;
import pro.taskana.mappings.JobMapper;
import pro.taskana.transaction.TaskanaTransactionProvider;
/**
* This is the runner for all jobs scheduled in the Job table.
*
* @author bbr
*/
public class JobRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(JobRunner.class);
private TaskanaEngineImpl taskanaEngine;
private JobMapper jobMapper;
private int maxRetryCount;
private TaskanaTransactionProvider<Object> txProvider;
public JobRunner(TaskanaEngine taskanaEngine) {
this.taskanaEngine = (TaskanaEngineImpl) taskanaEngine;
jobMapper = this.taskanaEngine.getSqlSession().getMapper(JobMapper.class);
maxRetryCount = taskanaEngine.getConfiguration().getMaxNumberOfJobRetries();
txProvider = null;
}
public void registerTransactionProvider(
TaskanaTransactionProvider<Object> txProvider) {
this.txProvider = txProvider;
}
public BulkOperationResults<String, Exception> runJobs() {
LOGGER.info("entry to runJobs()");
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
Job currentlyProcessedJob = null;
try {
List<Job> jobs = findJobsToRun();
while (!jobs.isEmpty()) { // run as long as Jobs are available for processing
for (Job job : jobs) {
currentlyProcessedJob = job;
processAJob(bulkLog, job);
}
jobs = findJobsToRun();
}
return bulkLog;
} catch (Exception e) {
if (currentlyProcessedJob != null) {
bulkLog.addError("JobId:" + currentlyProcessedJob.getJobId(), e);
setJobFailed(currentlyProcessedJob, bulkLog);
return bulkLog;
} else {
LOGGER.error("tried to run jobs and caught exception {} ", e);
bulkLog.addError("unknown", e);
return bulkLog;
}
} finally {
taskanaEngine.returnConnection();
LOGGER.info("exit from runJobs(). Returning result {} ", bulkLog);
}
}
private List<Job> findJobsToRun() {
final List<Job> result = new ArrayList<>();
if (txProvider != null) {
txProvider.executeInTransaction(() -> { // each job in its own transaction
try {
taskanaEngine.openConnection();
doFindJobsToRun(result);
return null;
} finally {
taskanaEngine.returnConnection();
}
});
} else {
doFindJobsToRun(result);
}
return result;
}
private BulkOperationResults<String, Exception> doFindJobsToRun(List<Job> jobs) {
List<Job> found = taskanaEngine.getSqlSession().getMapper(JobMapper.class).findJobsToRun();
jobs.addAll(found);
return null;
}
private void processAJob(BulkOperationResults<String, Exception> bulkLog, Job job) {
BulkOperationResults<String, Exception> log;
try {
if (txProvider != null) {
log = (BulkOperationResults<String, Exception>) txProvider.executeInTransaction(() -> { // each job in
// its own
// transaction
try {
taskanaEngine.openConnection();
return runSingleJob(job);
} finally {
taskanaEngine.returnConnection();
}
});
} else {
log = runSingleJob(job);
}
if (log != null && log.containsErrors()
&& Job.Type.UPDATETASKSJOB.equals(job.getType())) {
handleRetryForFailuresFromBulkOperationResult(bulkLog, job, log);
}
} catch (Exception e) {
// transaction was rolled back -> split job into 2 half sized jobs
LOGGER.warn("Processing of job " + job.getJobId() + " failed. Trying to split it up into two pieces...", e);
if (job.getRetryCount() < maxRetryCount) {
rescheduleBisectedJob(bulkLog, job);
} else {
List<String> objectIds;
if (job.getType().equals(Job.Type.UPDATETASKSJOB)) {
String taskIdsAsString = job.getArguments().get(SingleJobExecutor.TASKIDS);
objectIds = Arrays.asList(taskIdsAsString.split(","));
} else if (job.getType().equals(Job.Type.CLASSIFICATIONCHANGEDJOB)) {
String classificationId = job.getArguments().get(SingleJobExecutor.CLASSIFICATION_ID);
objectIds = Arrays.asList(classificationId);
} else {
throw new SystemException("Unknown Jobtype " + job.getType() + " encountered.");
}
for (String objectId : objectIds) {
bulkLog.addError(objectId, e);
}
setJobFailed(job, bulkLog);
}
}
}
private void setJobFailed(Job job, BulkOperationResults<String, Exception> bulkLog) {
try {
if (txProvider != null) {
txProvider.executeInTransaction(() -> { // each job in its own transaction
try {
taskanaEngine.openConnection();
return doSetJobFailed(job, bulkLog);
} finally {
taskanaEngine.returnConnection();
}
});
} else {
doSetJobFailed(job, bulkLog);
}
} catch (Exception e) {
// transaction was rolled back -> log an Error
LOGGER.error("attempted to set job {} to failed, but caught Exception {}", job, e);
}
}
private BulkOperationResults<String, Exception> doSetJobFailed(Job job,
BulkOperationResults<String, Exception> bulkLog) {
job.setState(Job.State.FAILED);
if (job.getStarted() == null) {
job.setStarted(Instant.now());
}
if (bulkLog.containsErrors()) {
Map<String, Exception> errors = bulkLog.getErrorMap();
job.setErrors(LoggerUtils.mapToString(errors));
}
taskanaEngine.getSqlSession().getMapper(JobMapper.class).update(job);
return null;
}
private void handleRetryForFailuresFromBulkOperationResult(BulkOperationResults<String, Exception> bulkLog, Job job,
BulkOperationResults<String, Exception> errorLogForThisJob) {
if (job.getRetryCount() < maxRetryCount) {
if (errorLogForThisJob.containsErrors()) {
List<String> failedTasks = errorLogForThisJob.getFailedIds();
if (!failedTasks.isEmpty()) { // some tasks failed to be processed
LOGGER.error("Errors occurred when running job {}. Processing will be retried", job);
scheduleRetryJob(job, failedTasks);
}
}
} else {
bulkLog.addAllErrors(errorLogForThisJob);
setJobFailed(job, errorLogForThisJob);
}
}
private void rescheduleBisectedJob(BulkOperationResults<String, Exception> bulkLog, Job job) {
// the transaction that processed the job was rolled back.
try {
if (txProvider != null) {
txProvider.executeInTransaction(() -> { // each job in its own transaction
try {
taskanaEngine.openConnection();
return doRescheduleBisectedJob(job);
} finally {
taskanaEngine.returnConnection();
}
});
} else {
doRescheduleBisectedJob(job);
}
} catch (Exception e) {
// transaction was rolled back -> log an Error
LOGGER.error("attempted to reschedule bisected jobs for {}, but caught Exception {}", job, e);
}
}
private BulkOperationResults<String, Exception> doRescheduleBisectedJob(Job job) {
if (job.getType().equals(Job.Type.UPDATETASKSJOB)) { // split the job in halves
Map<String, String> args = job.getArguments();
String taskIdsString = args.get(SingleJobExecutor.TASKIDS);
List<String> taskIds = Arrays.asList(taskIdsString.split(","));
int size = taskIds.size();
if (size >= 2) {
int halfSize = size % 2 == 0 ? size / 2 : (size / 2 + 1);
List<List<String>> taskIdListsForNewJobs = partition(taskIds, halfSize);
// now schedule new tasks
for (List<String> halfSizedTaskIds : taskIdListsForNewJobs) {
Job newJob = new Job();
newJob.setCreated(Instant.now());
if (halfSize > 1) {
newJob.setRetryCount(0);
} else {
newJob.setRetryCount(job.getRetryCount() + 1);
}
newJob.setState(Job.State.READY);
newJob.setType(job.getType());
args.put(SingleJobExecutor.TASKIDS, String.join(",", halfSizedTaskIds));
newJob.setArguments(args);
newJob.setCreated(Instant.now());
newJob.setExecutor(job.getExecutor());
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(newJob);
}
LOGGER.debug("doRescheduleBisectedJob deleting job {} ", job);
taskanaEngine.getSqlSession().getMapper(JobMapper.class).delete(job);
}
} else { // take care that the job is re-executed
job.setState(Job.State.READY);
job.setRetryCount(job.getRetryCount() + 1);
taskanaEngine.getSqlSession().getMapper(JobMapper.class).update(job);
}
return null;
}
private void scheduleRetryJob(Job job, List<String> failedTasks) {
if (job.getType().equals(Job.Type.UPDATETASKSJOB)) {
try {
if (txProvider != null) {
txProvider.executeInTransaction(() -> { // each job in its own transaction
try {
taskanaEngine.openConnection();
return doScheduleRetryJob(job, failedTasks);
} finally {
taskanaEngine.returnConnection();
}
});
} else {
doScheduleRetryJob(job, failedTasks);
}
} catch (Exception e) {
// transaction was rolled back -> log an Error
LOGGER.error("attempted to reschedule bisected jobs for {}, but caught Exception {}", job, e);
}
}
}
private BulkOperationResults<String, Exception> doScheduleRetryJob(Job job, List<String> failedTasks) {
LOGGER.debug("entry to doScheduleRetryJob for job {} and failedTasks {}", job,
LoggerUtils.listToString(failedTasks));
Map<String, String> args = job.getArguments();
Job newJob = new Job();
newJob.setCreated(Instant.now());
newJob.setRetryCount(job.getRetryCount() + 1);
newJob.setState(Job.State.READY);
newJob.setType(job.getType());
args.put(SingleJobExecutor.TASKIDS, String.join(",", failedTasks));
newJob.setArguments(args);
newJob.setExecutor(job.getExecutor());
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(newJob);
LOGGER.debug("doScheduleRetryJob deleting job {} and scheduling {} ", job, newJob);
taskanaEngine.getSqlSession().getMapper(JobMapper.class).delete(job);
return null;
}
private BulkOperationResults<String, Exception> runSingleJob(Job job) {
LOGGER.debug("entry to runSingleJob(job = {})", job);
BulkOperationResults<String, Exception> bulkLog;
if (job.getStarted() == null) {
job.setStarted(Instant.now());
}
job.setState(Job.State.RUNNING);
jobMapper.update(job);
SingleJobExecutor executor;
try {
executor = (SingleJobExecutor) Class.forName(job.getExecutor()).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
LOGGER.error("When attempting to load class {} caught Exception {} ", job.getExecutor(), e);
throw new SystemException(
"When attempting to load class " + job.getExecutor() + " caught Exception " + e.getMessage(),
e);
}
bulkLog = executor.runSingleJob(job, taskanaEngine);
if (!bulkLog.containsErrors()) {
LOGGER.debug("runSingleJob deletin job {} ", job);
jobMapper.delete(job);
}
LOGGER.debug("exit from runSingleJob");
return bulkLog;
}
static <T> List<List<T>> partition(Collection<T> members, int maxSize) {
List<List<T>> result = new ArrayList<>();
List<T> internal = new ArrayList<>();
for (T member : members) {
internal.add(member);
if (internal.size() == maxSize) {
result.add(internal);
internal = new ArrayList<>();
}
}
if (!internal.isEmpty()) {
result.add(internal);
}
return result;
}
}

View File

@ -0,0 +1,101 @@
package pro.taskana.impl;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.JobService;
import pro.taskana.TaskanaEngine;
import pro.taskana.jobs.ScheduledJob;
import pro.taskana.mappings.JobMapper;
/**
* Controls all job activities.
*/
public class JobServiceImpl implements JobService {
public static final Integer JOB_DEFAULT_PRIORITY = 50;
public static final long DEFAULT_LOCK_EXPIRATION_PERIOD = 60000;
private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
private JobMapper jobMapper;
private TaskanaEngineImpl taskanaEngineImpl;
public JobServiceImpl(TaskanaEngine taskanaEngine, JobMapper jobMapper) {
super();
this.taskanaEngineImpl = (TaskanaEngineImpl) taskanaEngine;
this.jobMapper = jobMapper;
}
@Override
public ScheduledJob createJob(ScheduledJob job) {
LOGGER.debug("Entry to createJob({})", job);
try {
taskanaEngineImpl.openConnection();
job = initializeJobDefault(job);
jobMapper.insertJob(job);
LOGGER.debug("Created job {}", job);
} finally {
taskanaEngineImpl.returnConnection();
}
LOGGER.debug("Exit from createJob");
return job;
}
public ScheduledJob lockJob(ScheduledJob job, String owner) {
LOGGER.debug("entry to lockJob(jobId = {}, owner = {})", job.getJobId(), owner);
try {
taskanaEngineImpl.openConnection();
job.setLockedBy(owner);
job.setLockExpires(Instant.now().plusMillis(DEFAULT_LOCK_EXPIRATION_PERIOD));
job.setRetryCount(job.getRetryCount() - 1);
jobMapper.update(job);
LOGGER.debug("Job {} locked. Remaining retries: {}", job.getJobId(), job.getRetryCount());
} finally {
taskanaEngineImpl.returnConnection();
LOGGER.debug("exit from lockJob()");
}
return job;
}
public List<ScheduledJob> findJobsToRun() {
LOGGER.debug("entry to findJobsToRun");
List<ScheduledJob> availableJobs;
try {
taskanaEngineImpl.openConnection();
availableJobs = jobMapper.findJobsToRun();
LOGGER.debug("Found available jobs: {}", availableJobs);
} finally {
taskanaEngineImpl.returnConnection();
LOGGER.debug("exit from findJobsToRun()");
}
return availableJobs;
}
public void deleteJob(ScheduledJob job) {
LOGGER.debug("entry to deleteJob(jobId = {})", job.getJobId());
try {
taskanaEngineImpl.openConnection();
jobMapper.delete(job);
LOGGER.debug("Deleted job: {}", job);
} finally {
taskanaEngineImpl.returnConnection();
LOGGER.debug("exit from deleteJob()");
}
}
private ScheduledJob initializeJobDefault(ScheduledJob job) {
job.setCreated(Instant.now());
job.setState(ScheduledJob.State.READY);
job.setPriority(JOB_DEFAULT_PRIORITY);
if (job.getDue() == null) {
job.setDue(Instant.now());
}
job.setRetryCount(taskanaEngineImpl.getConfiguration().getMaxNumberOfJobRetries());
LOGGER.debug("Job after initialization: {}", job);
return job;
}
}

View File

@ -1,18 +0,0 @@
package pro.taskana.impl;
import pro.taskana.BulkOperationResults;
/**
* This interface must be implemented by classes that execut a single job.
*
* @author bbr
*/
public interface SingleJobExecutor {
String TASKIDS = "taskIds";
String CLASSIFICATION_ID = "classificationId";
String PRIORITY_CHANGED = "priorityChanged";
String SERVICE_LEVEL_CHANGED = "serviceLevelChanged";
BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine);
}

View File

@ -144,7 +144,7 @@ public class TaskServiceImpl implements TaskService {
private Task cancelClaim(String taskId, boolean forceUnclaim)
throws TaskNotFoundException, InvalidStateException, InvalidOwnerException, NotAuthorizedException {
String userId = CurrentUserContext.getUserid();
LOGGER.debug("entry to cancelClaim(taskId = {}), userId = {}, forceUnclaim = {}", taskId, userId,
LOGGER.debug("entry to cancelClaim(taskId = {}), userId = {}, forceUnclaim = {})", taskId, userId,
forceUnclaim);
TaskImpl task = null;
try {
@ -1391,15 +1391,15 @@ public class TaskServiceImpl implements TaskService {
}
}
BulkOperationResults<String, Exception> refreshPriorityAndDueDate(String taskId)
public void refreshPriorityAndDueDate(String taskId)
throws ClassificationNotFoundException {
LOGGER.debug("entry to classificationChanged(taskId = {})", taskId);
LOGGER.debug("entry to refreshPriorityAndDueDate(taskId = {})", taskId);
TaskImpl task = null;
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
try {
taskanaEngine.openConnection();
if (taskId == null || taskId.isEmpty()) {
return bulkLog;
return;
}
task = taskMapper.findById(taskId);
@ -1420,10 +1420,9 @@ public class TaskServiceImpl implements TaskService {
task.setModified(Instant.now());
taskMapper.update(task);
return bulkLog;
} finally {
taskanaEngine.returnConnection();
LOGGER.debug("exit from deleteTask(). ");
LOGGER.debug("exit from refreshPriorityAndDueDate(). ");
}
}

View File

@ -1,58 +0,0 @@
package pro.taskana.impl;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.BulkOperationResults;
import pro.taskana.impl.util.LoggerUtils;
/**
* This class performs task updates if a classification is changed.
*
* @author bbr
*/
public class TaskUpdateJobExecutor implements SingleJobExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskUpdateJobExecutor.class);
private TaskanaEngineImpl taskanaEngine;
private List<String> affectedTaskIds;
public TaskUpdateJobExecutor() {
}
@Override
public BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine) {
this.taskanaEngine = taskanaEngine;
Map<String, String> args = job.getArguments();
String taskIdsString = args.get(TASKIDS);
affectedTaskIds = Arrays.asList(taskIdsString.split(","));
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
bulkLog.addAllErrors(handleAffectedTasks(job));
return bulkLog;
}
private BulkOperationResults<String, Exception> handleAffectedTasks(Job job) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("the following tasks will be updated by the current job {}",
LoggerUtils.listToString(affectedTaskIds));
}
TaskServiceImpl taskService = (TaskServiceImpl) taskanaEngine.getTaskService();
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
for (String taskId : affectedTaskIds) {
try {
bulkLog.addAllErrors(taskService.refreshPriorityAndDueDate(taskId));
} catch (Exception e) {
bulkLog.addError(taskId, e);
}
}
return bulkLog;
}
}

View File

@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.ClassificationService;
import pro.taskana.JobService;
import pro.taskana.TaskMonitorService;
import pro.taskana.TaskService;
import pro.taskana.TaskanaEngine;
@ -153,6 +154,12 @@ public class TaskanaEngineImpl implements TaskanaEngine {
session.getMapper(TaskMapper.class));
}
@Override
public JobService getJobService() {
SqlSession session = this.sessionManager;
return new JobServiceImpl(this, session.getMapper(JobMapper.class));
}
@Override
public TaskanaEngineConfiguration getConfiguration() {
return this.taskanaEngineConfiguration;
@ -389,4 +396,5 @@ public class TaskanaEngineImpl implements TaskanaEngine {
public boolean domainExists(String domain) {
return getConfiguration().getDomains().contains(domain);
}
}

View File

@ -1,16 +1,54 @@
package pro.taskana.jobs;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.TaskanaEngineImpl;
/**
* Abstract base for all background jobs of TASKANA.
*/
public abstract class AbstractTaskanaJob implements TaskanaJob {
protected TaskanaEngine taskanaEngine;
protected TaskanaEngineImpl taskanaEngineImpl;
protected ScheduledJob scheduledJob;
public AbstractTaskanaJob(TaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine;
public AbstractTaskanaJob(TaskanaEngine taskanaEngine, ScheduledJob job) {
this.taskanaEngineImpl = (TaskanaEngineImpl) taskanaEngine;
this.scheduledJob = job;
}
public static TaskanaJob createFromScheduledJob(TaskanaEngine engine, ScheduledJob job) throws TaskanaException {
switch (job.getType()) {
case CLASSIFICATIONCHANGEDJOB:
return new ClassificationChangedJob(engine, job);
case UPDATETASKSJOB:
return new TaskRefreshJob(engine, job);
case TASKCLEANUPJOB:
return new TaskCleanupJob(engine, job);
default:
throw new TaskanaException(
"No matching job found for " + job.getType() + " of ScheduledJob " + job.getJobId() + ".");
}
}
<T> List<List<T>> partition(Collection<T> members, int maxSize) {
List<List<T>> result = new ArrayList<>();
List<T> internal = new ArrayList<>();
for (T member : members) {
internal.add(member);
if (internal.size() == maxSize) {
result.add(internal);
internal = new ArrayList<>();
}
}
if (!internal.isEmpty()) {
result.add(internal);
}
return result;
}
}

View File

@ -0,0 +1,101 @@
package pro.taskana.jobs;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.TaskState;
import pro.taskana.TaskSummary;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.util.LoggerUtils;
/**
* This class executes a job of type CLASSIFICATIONCHANGEDJOB.
*
* @author bbr
*/
public class ClassificationChangedJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(ClassificationChangedJob.class);
public static final String TASK_IDS = "taskIds";
public static final String CLASSIFICATION_ID = "classificationId";
public static final String PRIORITY_CHANGED = "priorityChanged";
public static final String SERVICE_LEVEL_CHANGED = "serviceLevelChanged";
private String classificationId;
private boolean priorityChanged;
private boolean serviceLevelChanged;
public ClassificationChangedJob(TaskanaEngine engine, ScheduledJob job) {
super(engine, job);
Map<String, String> args = job.getArguments();
classificationId = args.get(CLASSIFICATION_ID);
priorityChanged = Boolean.parseBoolean(args.get(PRIORITY_CHANGED));
serviceLevelChanged = Boolean.parseBoolean(args.get(SERVICE_LEVEL_CHANGED));
}
@Override
public void run() throws TaskanaException {
LOGGER.info("Running ClassificationChangedJob for classification ({})", classificationId);
try {
Set<String> affectedTaskIds = findTasksIdsAffectedByClassificationChange();
scheduleTaskRefreshJobs(affectedTaskIds);
LOGGER.info("ClassificationChangedJob ended successfully.");
} catch (Exception e) {
throw new TaskanaException("Error while processing ClassificationChangedJob.", e);
}
}
private Set<String> findTasksIdsAffectedByClassificationChange() {
List<TaskSummary> tasks = taskanaEngineImpl.getTaskService()
.createTaskQuery()
.classificationIdIn(classificationId)
.stateIn(TaskState.READY, TaskState.CLAIMED)
.list();
// TODO - implement once the attachment filter is available
// List<String> taskIdsFromAttachments = attachmentMapper
// .findTaskIdsAffectedByClassificationChange(classificationId);
// List<String> filteredTaskIdsFromAttachments = taskIdsFromAttachments.isEmpty() ? new ArrayList<>()
// : taskMapper.filterTaskIdsForNotCompleted(taskIdsFromAttachments);
// Set<String> affectedTaskIds = new HashSet<>(filteredTaskIdsFromAttachments);
Set<String> affectedTaskIds = new HashSet<>();
for (TaskSummary task : tasks) {
affectedTaskIds.add(task.getTaskId());
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("the following tasks are affected by the update of classification {} : {}", classificationId,
LoggerUtils.setToString(affectedTaskIds));
}
return affectedTaskIds;
}
private void scheduleTaskRefreshJobs(Set<String> affectedTaskIds) {
int batchSize = taskanaEngineImpl.getConfiguration().getMaxNumberOfTaskUpdatesPerTransaction();
List<List<String>> affectedTaskBatches = partition(affectedTaskIds, batchSize);
LOGGER.debug("Creating {} TaskRefreshJobs out of {} affected tasks with a maximum number of {} tasks each. ",
affectedTaskBatches.size(), affectedTaskIds.size(), batchSize);
for (List<String> taskIdBatch : affectedTaskBatches) {
Map<String, String> args = new HashMap<>();
if (!taskIdBatch.isEmpty()) {
String taskIds = String.join(",", affectedTaskIds);
args.put(TASK_IDS, taskIds);
args.put(PRIORITY_CHANGED, new Boolean(priorityChanged).toString());
args.put(SERVICE_LEVEL_CHANGED, new Boolean(serviceLevelChanged).toString());
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.UPDATETASKSJOB);
job.setArguments(args);
taskanaEngineImpl.getJobService().createJob(job);
}
}
}
}

View File

@ -1,10 +1,18 @@
package pro.taskana.jobs;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.SystemException;
import pro.taskana.impl.JobServiceImpl;
import pro.taskana.impl.TaskServiceImpl;
import pro.taskana.impl.TaskanaEngineImpl;
import pro.taskana.transaction.TaskanaTransactionProvider;
/**
@ -13,13 +21,15 @@ import pro.taskana.transaction.TaskanaTransactionProvider;
public class JobRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class);
private TaskanaEngine taskanaEngine;
private TaskanaEngineImpl taskanaEngine;
private JobServiceImpl jobService;
private TaskanaTransactionProvider<Object> txProvider;
private int maxRetryCount;
private int attempt = 0;
public JobRunner(TaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine;
this.taskanaEngine = (TaskanaEngineImpl) taskanaEngine;
jobService = (JobServiceImpl) taskanaEngine.getJobService();
maxRetryCount = taskanaEngine.getConfiguration().getMaxNumberOfJobRetries();
}
@ -28,35 +38,103 @@ public class JobRunner {
this.txProvider = txProvider;
}
public void runJob(TaskanaJob job) throws Exception {
public void runJobs() {
LOGGER.info("entry to runJobs()");
try {
List<ScheduledJob> jobsToRun = findAndLockJobsToRun();
for (ScheduledJob scheduledJob : jobsToRun) {
runJobTransactionally(scheduledJob);
}
} catch (Exception e) {
LOGGER.error("Error occured whle running jobs: ", e);
} finally {
LOGGER.info("exit from runJobs().");
}
}
private List<ScheduledJob> findAndLockJobsToRun() {
List<ScheduledJob> availableJobs = jobService.findJobsToRun();
List<ScheduledJob> lockedJobs = new ArrayList<ScheduledJob>();
for (ScheduledJob job : availableJobs) {
lockedJobs.add(lockJobTransactionally(job));
}
return lockedJobs;
}
private ScheduledJob lockJobTransactionally(ScheduledJob job) {
ScheduledJob lockedJob = null;
if (txProvider != null) {
txProvider.executeInTransaction(() -> {
try {
job.run();
return null;
} catch (Exception e) {
LOGGER.warn("Exception caught while processing job transactionally. ", e);
throw new RuntimeException(e);
}
lockedJob = (ScheduledJob) txProvider.executeInTransaction(() -> {
return lockJob(job);
});
} else {
job.run();
lockedJob = lockJob(job);
}
LOGGER.debug("Locked job: {}", lockedJob);
return lockedJob;
}
private ScheduledJob lockJob(ScheduledJob job) {
String hostAddress = "UNKNOWN_ADDRESS";
try {
hostAddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
}
job.setLockedBy(
hostAddress + " - " + Thread.currentThread().getName());
String owner = hostAddress + " - " + Thread.currentThread().getName();
ScheduledJob lockedJob = jobService.lockJob(job, owner);
return lockedJob;
}
private void runJobTransactionally(ScheduledJob scheduledJob) {
try {
if (txProvider != null) {
txProvider.executeInTransaction(() -> {
runScheduledJob(scheduledJob);
return null;
});
} else {
runScheduledJob(scheduledJob);
}
jobService.deleteJob(scheduledJob);
} catch (Exception e) {
e.printStackTrace();
// transaction was rolled back -> split job into 2 half sized jobs
LOGGER.warn(
"Processing of job " + scheduledJob.getJobId() + " failed. Trying to split it up into two pieces...",
e);
// rescheduleBisectedJob(bulkLog, job);
// List<String> objectIds;
// if (job.getType().equals(ScheduledJob.Type.UPDATETASKSJOB)) {
// String taskIdsAsString = job.getArguments().get(SingleJobExecutor.TASKIDS);
// objectIds = Arrays.asList(taskIdsAsString.split(","));
// } else if (job.getType().equals(ScheduledJob.Type.CLASSIFICATIONCHANGEDJOB)) {
// String classificationId = job.getArguments().get(SingleJobExecutor.CLASSIFICATION_ID);
// objectIds = Arrays.asList(classificationId);
// } else {
// throw new SystemException("Unknown Jobtype " + job.getType() + " encountered.");
// }
// for (String objectId : objectIds) {
// bulkLog.addError(objectId, e);
// }
// setJobFailed(job, bulkLog);
}
}
public void runJobWithRetries(TaskanaJob job) throws Exception {
private void runScheduledJob(ScheduledJob scheduledJob) {
LOGGER.debug("entry to runScheduledJob(job = {})", scheduledJob);
try {
runJob(job);
TaskanaJob job = AbstractTaskanaJob.createFromScheduledJob(taskanaEngine, scheduledJob);
job.run();
} catch (Exception e) {
LOGGER.warn("Job failed due to an Exception.", e);
if (attempt < maxRetryCount) {
attempt++;
LOGGER.info("Retrying for the {} time.", attempt);
runJobWithRetries(job);
} else {
throw e;
}
LOGGER.error("Error running job: {} ", scheduledJob.getType(), e);
throw new SystemException(
"When attempting to load class " + scheduledJob.getType() + " caught Exception " + e.getMessage(),
e);
}
LOGGER.debug("exit from runScheduledJob");
}
}

View File

@ -1,4 +1,4 @@
package pro.taskana.impl;
package pro.taskana.jobs;
import java.time.Instant;
import java.util.Map;
@ -8,20 +8,20 @@ import java.util.Map;
*
* @author bbr
*/
public class Job {
public class ScheduledJob {
private Integer jobId;
private Integer priority;
private Instant created;
private Instant started;
private Instant completed;
private Instant due;
private State state;
private String lockedBy;
private Instant lockExpires;
private Type type;
private int retryCount;
private String executor;
private String errors;
Map<String, String> arguments;
public Job() {
public ScheduledJob() {
created = Instant.now();
state = State.READY;
retryCount = 0;
@ -35,6 +35,14 @@ public class Job {
this.jobId = jobId;
}
public Integer getPriority() {
return priority;
}
public void setPriority(Integer priority) {
this.priority = priority;
}
public Instant getCreated() {
return created;
}
@ -43,20 +51,12 @@ public class Job {
this.created = created;
}
public Instant getStarted() {
return started;
public Instant getDue() {
return due;
}
public void setStarted(Instant started) {
this.started = started;
}
public Instant getCompleted() {
return completed;
}
public void setCompleted(Instant completed) {
this.completed = completed;
public void setDue(Instant due) {
this.due = due;
}
public State getState() {
@ -67,12 +67,20 @@ public class Job {
this.state = state;
}
public String getExecutor() {
return executor;
public String getLockedBy() {
return lockedBy;
}
public void setExecutor(String executor) {
this.executor = executor;
public void setLockedBy(String lockedBy) {
this.lockedBy = lockedBy;
}
public Instant getLockExpires() {
return lockExpires;
}
public void setLockExpires(Instant lockExpires) {
this.lockExpires = lockExpires;
}
public Map<String, String> getArguments() {
@ -99,40 +107,29 @@ public class Job {
this.retryCount = retryCount;
}
public String getErrors() {
return errors;
}
public void setErrors(String errors) {
this.errors = errors;
if (this.errors != null && this.errors.length() > 4096) {
this.errors = errors.substring(0, 4095);
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("Job [jobId=");
builder.append("ScheduledJob [jobId=");
builder.append(jobId);
builder.append(", priority=");
builder.append(priority);
builder.append(", created=");
builder.append(created);
builder.append(", started=");
builder.append(started);
builder.append(", completed=");
builder.append(completed);
builder.append(", due=");
builder.append(due);
builder.append(", state=");
builder.append(state);
builder.append(", lockedBy=");
builder.append(lockedBy);
builder.append(", lockExpires=");
builder.append(lockExpires);
builder.append(", type=");
builder.append(type);
builder.append(", retryCount=");
builder.append(retryCount);
builder.append(", executor=");
builder.append(executor);
builder.append(", arguments=");
builder.append(arguments);
builder.append(", errors=");
builder.append(errors);
builder.append("]");
return builder.toString();
}
@ -144,9 +141,7 @@ public class Job {
*/
public enum State {
READY,
RUNNING,
FAILED,
COMPLETED
FAILED
}
/**
@ -154,6 +149,7 @@ public class Job {
*/
public enum Type {
CLASSIFICATIONCHANGEDJOB,
UPDATETASKSJOB;
UPDATETASKSJOB,
TASKCLEANUPJOB;
}
}

View File

@ -1,5 +1,6 @@
package pro.taskana.jobs;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
@ -22,22 +23,25 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskCleanupJob.class);
// Parameter
private Instant completedBefore;
private Instant firstRun;
private Duration runEvery;
private Duration minimumAge;
// Results
private BulkOperationResults<String, TaskanaException> results;
public TaskCleanupJob(TaskanaEngine taskanaEngine, Instant completedBefore) {
super(taskanaEngine);
this.completedBefore = completedBefore;
public TaskCleanupJob(TaskanaEngine taskanaEngine, ScheduledJob scheduledJob) {
super(taskanaEngine, scheduledJob);
firstRun = taskanaEngine.getConfiguration().getTaskCleanupJobFirstRun();
runEvery = taskanaEngine.getConfiguration().getTaskCleanupJobRunEvery();
minimumAge = taskanaEngine.getConfiguration().getTaskCleanupJobMinimumAge();
}
@Override
public void run() throws TaskanaException {
Instant completedBefore = Instant.now().minus(minimumAge);
LOGGER.info("Running job to delete all tasks completed before ({})", completedBefore.toString());
try {
List<TaskSummary> tasksCompletedBefore = getTasksCompletedBefore(completedBefore);
deleteTasks(tasksCompletedBefore);
scheduleNextCleanupJob();
LOGGER.info("Job ended successfully.");
} catch (InvalidArgumentException e) {
throw new TaskanaException("Error while processing TaskCleanupJob.", e);
@ -45,7 +49,7 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
}
private List<TaskSummary> getTasksCompletedBefore(Instant untilDate) {
return taskanaEngine.getTaskService()
return taskanaEngineImpl.getTaskService()
.createTaskQuery()
.completedWithin(new TimeInterval(null, untilDate))
.list();
@ -55,11 +59,41 @@ public class TaskCleanupJob extends AbstractTaskanaJob {
List<String> tasksIdsToBeDeleted = tasksToBeDeleted.stream()
.map(task -> task.getTaskId())
.collect(Collectors.toList());
results = taskanaEngine.getTaskService().deleteTasks(tasksIdsToBeDeleted);
BulkOperationResults<String, TaskanaException> results = taskanaEngineImpl.getTaskService()
.deleteTasks(tasksIdsToBeDeleted);
LOGGER.info("{} tasks deleted.", tasksIdsToBeDeleted.size() - results.getFailedIds().size());
for (String failedId : results.getFailedIds()) {
LOGGER.warn("Task with id {} could not be deleted. Reason: {}", failedId, results.getErrorForId(failedId));
}
}
public void scheduleNextCleanupJob() {
LOGGER.debug("Entry to scheduleNextCleanupJob.");
ScheduledJob job = new ScheduledJob();
job.setType(ScheduledJob.Type.TASKCLEANUPJOB);
job.setDue(getNextDueForTaskCleanupJob());
taskanaEngineImpl.getJobService().createJob(job);
LOGGER.debug("Exit from scheduleNextCleanupJob.");
}
private Instant getNextDueForTaskCleanupJob() {
Instant nextRunAt = firstRun;
while (nextRunAt.isBefore(Instant.now())) {
nextRunAt = nextRunAt.plus(runEvery);
}
LOGGER.info("Scheduling next run of the TaskCleanupJob for {}", nextRunAt);
return nextRunAt;
}
/**
* Initializes the TaskCleanupJob schedule. <br>
* All scheduled cleanup jobs are cancelled/deleted and a new one is scheduled.
*
* @param taskanaEngine
*/
public static void initializeSchedule(TaskanaEngine taskanaEngine) {
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null);
job.scheduleNextCleanupJob();
}
}

View File

@ -0,0 +1,53 @@
package pro.taskana.jobs;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pro.taskana.TaskanaEngine;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.TaskServiceImpl;
/**
* This class executes a job of type CLASSIFICATIONCHANGEDJOB.
*
* @author bbr
*/
public class TaskRefreshJob extends AbstractTaskanaJob {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskRefreshJob.class);
public static final String ARG_TASK_IDS = "taskIds";
private List<String> affectedTaskIds;
public TaskRefreshJob(TaskanaEngine engine, ScheduledJob job) {
super(engine, job);
Map<String, String> args = job.getArguments();
String taskIdsString = args.get(ARG_TASK_IDS);
affectedTaskIds = Arrays.asList(taskIdsString.split(","));
}
@Override
public void run() throws TaskanaException {
LOGGER.info("Running TaskRefreshJob for {} tasks", affectedTaskIds.size());
try {
TaskServiceImpl taskService = (TaskServiceImpl) taskanaEngineImpl.getTaskService();
for (String taskId : affectedTaskIds) {
try {
taskService.refreshPriorityAndDueDate(taskId);
} catch (Exception e) {
LOGGER.warn("Task {} could not be refreshed because of exception: {}", taskId, e.getMessage());
}
}
LOGGER.info("TaskRefreshJob ended successfully.");
} catch (Exception e) {
throw new TaskanaException("Error while processing TaskRefreshJob.", e);
}
}
}

View File

@ -11,8 +11,8 @@ import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import pro.taskana.impl.Job;
import pro.taskana.impl.persistence.MapTypeHandler;
import pro.taskana.jobs.ScheduledJob;
/**
* This class is the mybatis mapping of the JOB table.
@ -20,50 +20,50 @@ import pro.taskana.impl.persistence.MapTypeHandler;
public interface JobMapper {
@Insert("<script>"
+ "INSERT INTO TASKANA.JOB (JOB_ID, CREATED, STARTED, COMPLETED, STATE, TYPE, RETRY_COUNT, EXECUTOR, ERRORS, ARGUMENTS) "
+ "INSERT INTO TASKANA.SCHEDULED_JOB (JOB_ID, PRIORITY, CREATED, DUE, STATE, LOCKED_BY, LOCK_EXPIRES, TYPE, RETRY_COUNT, ARGUMENTS) "
+ "VALUES ("
+ "<choose>"
+ "<when test=\"_databaseId == 'db2'\">"
+ "TASKANA.JOB_SEQ.NEXTVAL"
+ "TASKANA.SCHEDULED_JOB_SEQ.NEXTVAL"
+ "</when>"
+ "<otherwise>"
+ "nextval('TASKANA.JOB_SEQ')"
+ "nextval('TASKANA.SCHEDULED_JOB_SEQ')"
+ "</otherwise>"
+ "</choose>"
+ ", #{job.created}, #{job.started}, #{job.completed}, #{job.state}, #{job.type}, #{job.retryCount}, #{job.executor}, #{job.errors}, #{job.arguments,javaType=java.util.Map,typeHandler=pro.taskana.impl.persistence.MapTypeHandler} )"
+ ", #{job.priority}, #{job.created}, #{job.due}, #{job.state}, #{job.lockedBy}, #{job.lockExpires}, #{job.type}, #{job.retryCount}, #{job.arguments,javaType=java.util.Map,typeHandler=pro.taskana.impl.persistence.MapTypeHandler} )"
+ "</script>")
void insertJob(@Param("job") Job job);
void insertJob(@Param("job") ScheduledJob job);
@Select("<script> SELECT JOB_ID, CREATED, STARTED, COMPLETED, STATE, TYPE, RETRY_COUNT, EXECUTOR, ERRORS, ARGUMENTS "
+ "FROM TASKANA.JOB "
+ "WHERE STATE IN ( 'READY') "
+ "ORDER BY JOB_ID "
@Select("<script> SELECT JOB_ID, PRIORITY, CREATED, DUE, STATE, LOCKED_BY, LOCK_EXPIRES, TYPE, RETRY_COUNT, ARGUMENTS "
+ "FROM TASKANA.SCHEDULED_JOB "
+ "WHERE STATE IN ( 'READY') AND (DUE is null OR DUE &lt; CURRENT_TIMESTAMP) AND (LOCK_EXPIRES is null OR LOCK_EXPIRES &lt; CURRENT_TIMESTAMP) AND RETRY_COUNT > 0 "
+ "ORDER BY PRIORITY DESC "
+ "<if test=\"_databaseId == 'db2'\">with UR </if> "
+ "</script>")
@Results(value = {
@Result(property = "jobId", column = "JOB_ID"),
@Result(property = "priority", column = "PRIORITY"),
@Result(property = "created", column = "CREATED"),
@Result(property = "started", column = "STARTED"),
@Result(property = "completed", column = "COMPLETED"),
@Result(property = "due", column = "DUE"),
@Result(property = "state", column = "STATE"),
@Result(property = "lockedBy", column = "LOCKED_BY"),
@Result(property = "lockExpires", column = "LOCK_EXPIRES"),
@Result(property = "type", column = "TYPE"),
@Result(property = "retryCount", column = "RETRY_COUNT"),
@Result(property = "executor", column = "EXECUTOR"),
@Result(property = "errors", column = "ERRORS"),
@Result(property = "arguments", column = "ARGUMENTS",
javaType = Map.class, typeHandler = MapTypeHandler.class)
})
List<Job> findJobsToRun();
List<ScheduledJob> findJobsToRun();
@Update(
value = "UPDATE TASKANA.JOB SET CREATED = #{created}, STARTED = #{started}, COMPLETED = #{completed}, STATE = #{state}, "
+ "TYPE = #{type}, RETRY_COUNT = #{retryCount}, EXECUTOR = #{executor}, "
+ "ERRORS = #{errors}, "
value = "UPDATE TASKANA.SCHEDULED_JOB SET CREATED = #{created}, PRIORITY = #{priority}, DUE = #{due}, STATE = #{state}, "
+ "LOCKED_BY = #{lockedBy}, LOCK_EXPIRES = #{lockExpires}, TYPE = #{type}, RETRY_COUNT = #{retryCount}, "
+ "ARGUMENTS = #{arguments,jdbcType=CLOB ,javaType=java.util.Map,typeHandler=pro.taskana.impl.persistence.MapTypeHandler} "
+ "where JOB_ID = #{jobId}")
void update(Job job);
void update(ScheduledJob job);
@Delete(
value = "DELETE FROM TASKANA.JOB WHERE JOB_ID = #{jobId}")
void delete(Job job);
value = "DELETE FROM TASKANA.SCHEDULED_JOB WHERE JOB_ID = #{jobId}")
void delete(ScheduledJob job);
}

View File

@ -6,7 +6,7 @@ CREATE TABLE TASKANA.TASKANA_SCHEMA_VERSION(
PRIMARY KEY (ID)
);
-- The VERSION value must match the value of TaskanaEngineConfiguration.TASKANA_SCHEMA_VERSION
INSERT INTO TASKANA.TASKANA_SCHEMA_VERSION VALUES ('1', '0.9.2');
INSERT INTO TASKANA.TASKANA_SCHEMA_VERSION VALUES ('1', '1.0.2');
CREATE TABLE TASKANA.CLASSIFICATION(
ID CHAR(40) NOT NULL,
@ -171,21 +171,21 @@ CREATE TABLE TASKANA.ATTACHMENT(
CONSTRAINT ATT_CLASS FOREIGN KEY (CLASSIFICATION_ID) REFERENCES TASKANA.CLASSIFICATION ON DELETE NO ACTION
);
CREATE TABLE TASKANA.JOB(
CREATE TABLE TASKANA.SCHEDULED_JOB(
JOB_ID INTEGER NOT NULL,
PRIORITY INTEGER NULL,
CREATED TIMESTAMP NULL,
STARTED TIMESTAMP NULL,
COMPLETED TIMESTAMP NULL,
DUE TIMESTAMP NULL,
STATE VARCHAR(32) NULL,
LOCKED_BY VARCHAR(32) NULL,
LOCK_EXPIRES TIMESTAMP NULL,
TYPE VARCHAR(32) NULL,
RETRY_COUNT INTEGER NOT NULL,
EXECUTOR VARCHAR(128) NOT NULL,
ERRORS VARCHAR(4096) NULL,
ARGUMENTS TEXT NULL,
PRIMARY KEY (JOB_ID)
);
CREATE SEQUENCE TASKANA.JOB_SEQ
CREATE SEQUENCE TASKANA.SCHEDULED_JOB_SEQ
MINVALUE 1
START WITH 1
INCREMENT BY 1

View File

@ -4,7 +4,7 @@ CREATE TABLE TASKANA.TASKANA_SCHEMA_VERSION(
PRIMARY KEY (ID)
);
-- The VERSION value must match the value of TaskanaEngineConfiguration.TASKANA_SCHEMA_VERSION
INSERT INTO TASKANA.TASKANA_SCHEMA_VERSION VALUES ('1', '0.9.2');
INSERT INTO TASKANA.TASKANA_SCHEMA_VERSION VALUES ('1', '1.0.2');
CREATE TABLE TASKANA.CLASSIFICATION(
ID CHAR(40) NOT NULL,
@ -169,21 +169,21 @@ CREATE TABLE TASKANA.ATTACHMENT(
CONSTRAINT ATT_CLASS FOREIGN KEY (CLASSIFICATION_ID) REFERENCES TASKANA.CLASSIFICATION ON DELETE NO ACTION
);
CREATE TABLE TASKANA.JOB(
CREATE TABLE TASKANA.SCHEDULED_JOB(
JOB_ID INTEGER NOT NULL,
PRIORITY INTEGER NULL,
CREATED TIMESTAMP NULL,
STARTED TIMESTAMP NULL,
COMPLETED TIMESTAMP NULL,
DUE TIMESTAMP NULL,
STATE VARCHAR(32) NULL,
LOCKED_BY VARCHAR(32) NULL,
LOCK_EXPIRES TIMESTAMP NULL,
TYPE VARCHAR(32) NULL,
RETRY_COUNT INTEGER NOT NULL,
EXECUTOR VARCHAR(128) NOT NULL,
ERRORS VARCHAR(4096) NULL,
ARGUMENTS CLOB NULL,
PRIMARY KEY (JOB_ID)
);
CREATE SEQUENCE TASKANA.JOB_SEQ
CREATE SEQUENCE TASKANA.SCHEDULED_JOB_SEQ
MINVALUE 1
START WITH 1
INCREMENT BY 1

View File

@ -28,9 +28,9 @@ import pro.taskana.exceptions.InvalidArgumentException;
import pro.taskana.exceptions.NotAuthorizedException;
import pro.taskana.exceptions.TaskNotFoundException;
import pro.taskana.impl.DaysToWorkingDaysConverter;
import pro.taskana.impl.JobRunner;
import pro.taskana.impl.TaskImpl;
import pro.taskana.impl.report.impl.TimeIntervalColumnHeader;
import pro.taskana.jobs.JobRunner;
import pro.taskana.security.JAASRunner;
import pro.taskana.security.WithAccessId;
@ -216,6 +216,8 @@ public class UpdateClassificationAccTest extends AbstractAccTest {
classificationService.updateClassification(classification);
Thread.sleep(100);
JobRunner runner = new JobRunner(taskanaEngine);
// need to run jobs twice, since the first job creates a second one.
runner.runJobs();
runner.runJobs();
// Get and check the new value
Classification updatedClassification = classificationService
@ -224,30 +226,33 @@ public class UpdateClassificationAccTest extends AbstractAccTest {
assertTrue(!modifiedBefore.isAfter(updatedClassification.getModified()));
List<String> affectedTasks = new ArrayList<>(
Arrays.asList("TKI:000000000000000000000000000000000000", "TKI:000000000000000000000000000000000003",
"TKI:000000000000000000000000000000000004", "TKI:000000000000000000000000000000000005",
"TKI:000000000000000000000000000000000006", "TKI:000000000000000000000000000000000007",
"TKI:000000000000000000000000000000000008", "TKI:000000000000000000000000000000000009",
"TKI:000000000000000000000000000000000010", "TKI:000000000000000000000000000000000011",
"TKI:000000000000000000000000000000000012", "TKI:000000000000000000000000000000000013",
"TKI:000000000000000000000000000000000014", "TKI:000000000000000000000000000000000015",
"TKI:000000000000000000000000000000000016", "TKI:000000000000000000000000000000000017",
"TKI:000000000000000000000000000000000018", "TKI:000000000000000000000000000000000019",
"TKI:000000000000000000000000000000000020", "TKI:000000000000000000000000000000000021",
"TKI:000000000000000000000000000000000022", "TKI:000000000000000000000000000000000023",
"TKI:000000000000000000000000000000000024", "TKI:000000000000000000000000000000000025",
"TKI:000000000000000000000000000000000026", "TKI:000000000000000000000000000000000027",
"TKI:000000000000000000000000000000000028", "TKI:000000000000000000000000000000000029",
"TKI:000000000000000000000000000000000030", "TKI:000000000000000000000000000000000031",
"TKI:000000000000000000000000000000000032", "TKI:000000000000000000000000000000000033",
"TKI:000000000000000000000000000000000034", "TKI:000000000000000000000000000000000035",
"TKI:000000000000000000000000000000000053", "TKI:000000000000000000000000000000000054",
"TKI:000000000000000000000000000000000055", "TKI:000000000000000000000000000000000100",
"TKI:000000000000000000000000000000000101", "TKI:000000000000000000000000000000000102",
"TKI:000000000000000000000000000000000103"));
List<String> indirectlyAffectedTasks = new ArrayList<>(Arrays.asList(
"TKI:000000000000000000000000000000000000", "TKI:000000000000000000000000000000000053",
"TKI:000000000000000000000000000000000054", "TKI:000000000000000000000000000000000055"));
Arrays.asList("TKI:000000000000000000000000000000000003", "TKI:000000000000000000000000000000000004"));
// TODO - resume old behaviour after attachment query is possible.
// List<String> affectedTasks = new ArrayList<>(
// Arrays.asList("TKI:000000000000000000000000000000000000", "TKI:000000000000000000000000000000000003",
// "TKI:000000000000000000000000000000000004", "TKI:000000000000000000000000000000000005",
// "TKI:000000000000000000000000000000000006", "TKI:000000000000000000000000000000000007",
// "TKI:000000000000000000000000000000000008", "TKI:000000000000000000000000000000000009",
// "TKI:000000000000000000000000000000000010", "TKI:000000000000000000000000000000000011",
// "TKI:000000000000000000000000000000000012", "TKI:000000000000000000000000000000000013",
// "TKI:000000000000000000000000000000000014", "TKI:000000000000000000000000000000000015",
// "TKI:000000000000000000000000000000000016", "TKI:000000000000000000000000000000000017",
// "TKI:000000000000000000000000000000000018", "TKI:000000000000000000000000000000000019",
// "TKI:000000000000000000000000000000000020", "TKI:000000000000000000000000000000000021",
// "TKI:000000000000000000000000000000000022", "TKI:000000000000000000000000000000000023",
// "TKI:000000000000000000000000000000000024", "TKI:000000000000000000000000000000000025",
// "TKI:000000000000000000000000000000000026", "TKI:000000000000000000000000000000000027",
// "TKI:000000000000000000000000000000000028", "TKI:000000000000000000000000000000000029",
// "TKI:000000000000000000000000000000000030", "TKI:000000000000000000000000000000000031",
// "TKI:000000000000000000000000000000000032", "TKI:000000000000000000000000000000000033",
// "TKI:000000000000000000000000000000000034", "TKI:000000000000000000000000000000000035",
// "TKI:000000000000000000000000000000000053", "TKI:000000000000000000000000000000000054",
// "TKI:000000000000000000000000000000000055", "TKI:000000000000000000000000000000000100",
// "TKI:000000000000000000000000000000000101", "TKI:000000000000000000000000000000000102",
// "TKI:000000000000000000000000000000000103"));
// List<String> indirectlyAffectedTasks = new ArrayList<>(Arrays.asList(
// "TKI:000000000000000000000000000000000000", "TKI:000000000000000000000000000000000053",
// "TKI:000000000000000000000000000000000054", "TKI:000000000000000000000000000000000055"));
TaskService taskService = taskanaEngine.getTaskService();
@ -256,7 +261,7 @@ public class UpdateClassificationAccTest extends AbstractAccTest {
for (String taskId : affectedTasks) {
Task task = taskService.getTask(taskId);
assertTrue(task.getModified().isAfter(before));
assertTrue("Task " + task.getId() + " has not been refreshed.", task.getModified().isAfter(before));
assertTrue(task.getPriority() == 1000);
// the following excluded tasks are affected via attachments. The task or an attachment still has a service
// level below 15 days
@ -264,9 +269,9 @@ public class UpdateClassificationAccTest extends AbstractAccTest {
if (taskId.equals("TKI:000000000000000000000000000000000008")) {
long calendarDays = converter.convertWorkingDaysToDays(task.getPlanned(), 8);
assertTrue(task.getDue().equals(task.getPlanned().plus(Duration.ofDays(calendarDays))));
} else if (indirectlyAffectedTasks.contains(taskId)) {
long calendarDays = converter.convertWorkingDaysToDays(task.getPlanned(), 1);
assertTrue(task.getDue().equals(task.getPlanned().plus(Duration.ofDays(calendarDays))));
// } else if (indirectlyAffectedTasks.contains(taskId)) {
// long calendarDays = converter.convertWorkingDaysToDays(task.getPlanned(), 1);
// assertTrue(task.getDue().equals(task.getPlanned().plus(Duration.ofDays(calendarDays))));
} else {
long calendarDays = converter.convertWorkingDaysToDays(task.getPlanned(), 15);
assertTrue(task.getDue().equals(task.getPlanned().plus(Duration.ofDays(calendarDays))));

View File

@ -24,7 +24,6 @@ import pro.taskana.exceptions.NotAuthorizedException;
import pro.taskana.exceptions.TaskAlreadyExistException;
import pro.taskana.exceptions.TaskNotFoundException;
import pro.taskana.exceptions.WorkbasketNotFoundException;
import pro.taskana.jobs.JobRunner;
import pro.taskana.jobs.TaskCleanupJob;
import pro.taskana.security.JAASRunner;
import pro.taskana.security.WithAccessId;
@ -49,14 +48,13 @@ public class TaskCleanupJobAccTest extends AbstractAccTest {
long totalTasksCount = taskService.createTaskQuery().count();
assertEquals(72, totalTasksCount);
JobRunner runner = new JobRunner(taskanaEngine);
Instant completedBefore = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault())
.minusDays(14)
.toInstant();
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, completedBefore);
runner.runJob(job);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null);
job.run();
totalTasksCount = taskService.createTaskQuery().count();
assertEquals(66, totalTasksCount);
@ -70,14 +68,13 @@ public class TaskCleanupJobAccTest extends AbstractAccTest {
Task createdTask = createAndCompleteTask();
JobRunner runner = new JobRunner(taskanaEngine);
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault())
.minusDays(14)
.toInstant();
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, completeUntilDate);
runner.runJob(job);
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, null);
job.run();
Task completedCreatedTask = taskService.getTask(createdTask.getId());
assertNotNull(completedCreatedTask);

View File

@ -26,12 +26,14 @@ import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import pro.taskana.Classification;
import pro.taskana.JobService;
import pro.taskana.exceptions.ClassificationAlreadyExistException;
import pro.taskana.exceptions.ClassificationNotFoundException;
import pro.taskana.exceptions.ConcurrencyException;
import pro.taskana.exceptions.DomainNotFoundException;
import pro.taskana.exceptions.InvalidArgumentException;
import pro.taskana.exceptions.NotAuthorizedException;
import pro.taskana.jobs.ScheduledJob;
import pro.taskana.mappings.ClassificationMapper;
import pro.taskana.mappings.JobMapper;
@ -61,6 +63,9 @@ public class ClassificationServiceImplTest {
@Mock
private SqlSession sqlSessionMock;
@Mock
private JobService jobServiceMock;
@Before
public void setup() {
doNothing().when(taskanaEngineImplMock).openConnection();
@ -285,8 +290,7 @@ public class ClassificationServiceImplTest {
ClassificationImpl oldClassification = (ClassificationImpl) createDummyClassification();
oldClassification.setModified(now);
doReturn(oldClassification).when(cutSpy).getClassification(classification.getKey(), classification.getDomain());
doReturn(sqlSessionMock).when(taskanaEngineImplMock).getSqlSession();
doReturn(new JobRunnerMock()).when(sqlSessionMock).getMapper(any());
doReturn(jobServiceMock).when(taskanaEngineImplMock).getJobService();
cutSpy.updateClassification(classification);
@ -444,28 +448,29 @@ public class ClassificationServiceImplTest {
classificationImpl.setParentKey("");
return classificationImpl;
}
/**
* This is the mock of a jobRunner.
*/
private class JobRunnerMock implements JobMapper {
@Override
public void insertJob(Job job) {
public void insertJob(ScheduledJob job) {
}
@Override
public List<Job> findJobsToRun() {
public List<ScheduledJob> findJobsToRun() {
return null;
}
@Override
public void update(Job job) {
public void update(ScheduledJob job) {
}
@Override
public void delete(Job job) {
public void delete(ScheduledJob job) {
}
}

View File

@ -6,5 +6,6 @@ DELETE FROM TASKANA.WORKBASKET;
DELETE FROM TASKANA.DISTRIBUTION_TARGETS;
DELETE FROM TASKANA.CLASSIFICATION;
DELETE FROM TASKANA.OBJECT_REFERENCE;
DELETE FROM TASKANA.JOB;
-- do not clean JOB table
-- DELETE FROM TASKANA.SCHEDULED_JOB;
COMMIT;

View File

@ -6,6 +6,6 @@ DROP TABLE TASKANA.WORKBASKET;
DROP TABLE TASKANA.DISTRIBUTION_TARGETS;
DROP TABLE TASKANA.CLASSIFICATION;
DROP TABLE TASKANA.OBJECT_REFERENCE;
DROP TABLE TASKANA.JOB;
DROP SEQUENCE TASKANA.JOB_SEQ;
DROP TABLE TASKANA.SCHEDULED_JOB;
DROP SEQUENCE TASKANA.SCHEDULED_JOB_SEQ;
COMMIT;

View File

@ -10,3 +10,6 @@ taskana.classification.categories= EXTERNAL , manual, autoMAtic ,Process
taskana.jobs.maxRetries=3
taskana.jobs.batchSize=50
taskana.jobs.cleanup.runEvery=P1D
taskana.jobs.cleanup.firstRunAt=2018-07-25T08:00:00Z
taskana.jobs.cleanup.minimumAge=P14D

View File

@ -5,3 +5,8 @@ taskana.roles.businessadmin=max|Moritz|businessadmin
taskana.domains= Domain_A , DOMAIN_B
taskana.classification.types= TASK , document
taskana.classification.categories= EXTERNAL , manual, autoMAtic ,Process
taskana.jobs.cleanup.schedule=0 0 3 * * *
taskana.jobs.cleanup.runEvery=P1D
taskana.jobs.cleanup.firstRunAt=2018-07-25T08:00:00Z
taskana.jobs.cleanup.minimumAge=P14D

View File

@ -3,16 +3,10 @@ package pro.taskana.jobs;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.security.auth.Subject;
import org.slf4j.Logger;
@ -21,10 +15,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import pro.taskana.BulkOperationResults;
import pro.taskana.TaskanaEngine;
import pro.taskana.TaskanaRole;
import pro.taskana.impl.util.LoggerUtils;
import pro.taskana.security.UserPrincipal;
import pro.taskana.transaction.TaskanaTransactionProvider;
@ -34,9 +26,7 @@ import pro.taskana.transaction.TaskanaTransactionProvider;
@Component
public class JobScheduler {
private final long untilDays = 14;
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);
private static AtomicBoolean jobRunning = new AtomicBoolean(false);
@Autowired
private TaskanaEngine taskanaEngine;
@ -44,41 +34,28 @@ public class JobScheduler {
@Autowired
TaskanaTransactionProvider<Object> springTransactionProvider;
@PostConstruct
public void scheduleCleanupJob() {
LOGGER.debug("Entry to scheduleCleanupJob.");
TaskCleanupJob.initializeSchedule(taskanaEngine);
LOGGER.debug("Exit from scheduleCleanupJob.");
}
@Scheduled(cron = "${taskana.jobscheduler.async.cron}")
public void triggerJobs() {
LOGGER.info("AsyncJobs started.");
boolean otherJobActive = jobRunning.getAndSet(true);
if (!otherJobActive) { // only one job should be active at any time
try {
pro.taskana.impl.JobRunner runner = new pro.taskana.impl.JobRunner(taskanaEngine);
runner.registerTransactionProvider(springTransactionProvider);
LOGGER.info("Running Jobs");
BulkOperationResults<String, Exception> result = runner.runJobs();
Map<String, Exception> errors = result.getErrorMap();
LOGGER.info("AsyncJobs completed. Result = {} ", LoggerUtils.mapToString(errors));
} finally {
jobRunning.set(false);
}
} else {
LOGGER.info("AsyncJobs: Don't run Jobs because already another JobRunner is running");
}
}
@Scheduled(cron = "${taskana.jobscheduler.cleanup.cron}")
public void triggerTaskCleanupJob() {
LOGGER.info("CleanupJob started.");
try {
runTaskCleanupJobAsAdmin();
LOGGER.info("CleanupJob completed.");
runAsyncJobsAsAdmin();
LOGGER.info("AsyncJobs completed.");
} catch (PrivilegedActionException e) {
LOGGER.error("CleanupJob failed.", e);
LOGGER.info("AsyncJobs failed.", e);
}
}
/*
* Creates an admin subject and runs the job using the subject.
*/
private void runTaskCleanupJobAsAdmin() throws PrivilegedActionException {
private void runAsyncJobsAsAdmin() throws PrivilegedActionException {
Subject.doAs(getAdminSubject(),
new PrivilegedExceptionAction<Object>() {
@ -88,14 +65,8 @@ public class JobScheduler {
try {
JobRunner runner = new JobRunner(taskanaEngine);
runner.registerTransactionProvider(springTransactionProvider);
Instant completedBefore = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault())
.minusDays(untilDays)
.toInstant();
TaskCleanupJob job = new TaskCleanupJob(taskanaEngine, completedBefore);
runner.runJobWithRetries(job);
LOGGER.info("Running Jobs");
runner.runJobs();
return "Successful";
} catch (Throwable e) {
throw new Exception(e);

View File

@ -29,7 +29,6 @@ taskana.ldap.minSearchForLength=3
taskana.ldap.maxNumberOfReturnedAccessIds=50
####### JobScheduler cron expression that specifies when the JobSchedler runs
taskana.jobscheduler.async.cron=0 * * * * *
taskana.jobscheduler.cleanup.cron=0 0 0 * * *
####### cache static resources properties
spring.resources.cache.cachecontrol.cache-private=true
####### tomcat is not detecting the x-forward headers from bluemix as a trustworthy proxy

View File

@ -6,5 +6,6 @@ DELETE FROM TASKANA.WORKBASKET;
DELETE FROM TASKANA.DISTRIBUTION_TARGETS;
DELETE FROM TASKANA.CLASSIFICATION;
DELETE FROM TASKANA.OBJECT_REFERENCE;
DELETE FROM TASKANA.JOB;
-- do not clean JOB table
-- DELETE FROM TASKANA.SCHEDULED_JOB;
COMMIT;

View File

@ -6,6 +6,6 @@ DROP TABLE TASKANA.WORKBASKET;
DROP TABLE TASKANA.DISTRIBUTION_TARGETS;
DROP TABLE TASKANA.CLASSIFICATION;
DROP TABLE TASKANA.OBJECT_REFERENCE;
DROP TABLE TASKANA.JOB;
DROP SEQUENCE TASKANA.JOB_SEQ;
DROP TABLE TASKANA.SCHEDULED_JOB;
DROP SEQUENCE TASKANA.SCHEDULED_JOB_SEQ;
COMMIT;

View File

@ -8,3 +8,6 @@ taskana.classification.categories= EXTERNAL , manual, autoMAtic ,Process
taskana.jobs.maxRetries=3
taskana.jobs.batchSize=50
taskana.jobs.cleanup.runEvery=P1D
taskana.jobs.cleanup.firstRunAt=2018-07-25T08:00:00Z
taskana.jobs.cleanup.minimumAge=P14D

View File

@ -13,7 +13,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.json.JSONObject;
@ -37,8 +36,6 @@ import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
@ -121,28 +118,7 @@ public class AsyncUpdateJobIntTest {
assertEquals(200, con.getResponseCode());
con.disconnect();
// wait until the next trigger time + 2 seconds to give JobScheduler a chance to run
String cron = env.getProperty("taskana.jobscheduler.async.cron");
CronTrigger trigger = new CronTrigger(cron);
TriggerContext context = new TriggerContext() {
@Override
public Date lastScheduledExecutionTime() {
return null;
}
@Override
public Date lastActualExecutionTime() {
return null;
}
@Override
public Date lastCompletionTime() {
return null;
}
};
Date now = new Date();
long delay = trigger.nextExecutionTime(context).getTime() - now.getTime() + 2000;
long delay = 16000;
LOGGER.info("About to sleep for " + delay / 1000
+ " seconds to give JobScheduler a chance to process the classification change");
@ -207,7 +183,7 @@ public class AsyncUpdateJobIntTest {
TaskResource taskResource = taskResponse.getBody();
Task task = taskResourceAssembler.toModel(taskResource);
assertTrue(!before.isAfter(task.getModified()));
assertTrue("Task " + task.getId() + " has not been refreshed.", !before.isAfter(task.getModified()));
}
/**

View File

@ -0,0 +1,36 @@
logging.level.pro.taskana=DEBUG
### logging.level.org.springframework=DEBUG
######## Taskana DB #######
datasource.url=jdbc:h2:mem:taskana;IGNORECASE=TRUE;LOCK_MODE=0;INIT=CREATE SCHEMA IF NOT EXISTS TASKANA
datasource.driverClassName=org.h2.Driver
datasource.username=sa
datasource.password=sa
####### property that control rest api security deploy use true for no security.
devMode=false
####### control LDAP usage
taskana.ldap.useLdap=false
####### properties to connect to LDAP
taskana.ldap.serverUrl=ldap://localhost:10389
taskana.ldap.bindDn=uid=admin,ou=system
taskana.ldap.bindPassword=secret
taskana.ldap.baseDn=o=TaskanaTest
####### properties that control search for users and groups
taskana.ldap.userSearchBase=ou=people
taskana.ldap.userSearchFilterName=objectclass
taskana.ldap.userSearchFilterValue=person
taskana.ldap.userFirstnameAttribute=givenName
taskana.ldap.userLastnameAttribute=sn
taskana.ldap.userIdAttribute=uid
taskana.ldap.groupSearchBase=ou=groups
taskana.ldap.groupSearchFilterName=objectclass
taskana.ldap.groupSearchFilterValue=groupOfUniqueNames
taskana.ldap.groupNameAttribute=cn
taskana.ldap.minSearchForLength=3
taskana.ldap.maxNumberOfReturnedAccessIds=50
####### JobScheduler cron expression that specifies when the JobSchedler runs
taskana.jobscheduler.async.cron=0/5 * * * * *
####### cache static resources properties
spring.resources.cache.cachecontrol.cache-private=true
####### tomcat is not detecting the x-forward headers from bluemix as a trustworthy proxy
server.tomcat.internal-proxies=.*
server.use-forward-headers=true