TSK-511 Add transaction slicing to job runner
This commit is contained in:
parent
5a76d17a54
commit
ae9f00ddc4
|
@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class BulkOperationResults<K, V> {
|
||||
|
||||
private Map<K, V> errorMap = new HashMap<K, V>();
|
||||
private Map<K, V> errorMap = new HashMap<>();
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(BulkOperationResults.class);
|
||||
|
||||
/**
|
||||
|
@ -105,9 +105,11 @@ public class BulkOperationResults<K, V> {
|
|||
* the other log
|
||||
*/
|
||||
public void addAllErrors(BulkOperationResults<K, V> log) {
|
||||
List<K> failedIds = log.getFailedIds();
|
||||
for (K id : failedIds) {
|
||||
addError(id, log.getErrorForId(id));
|
||||
if (log != null && log.containsErrors()) {
|
||||
List<K> failedIds = log.getFailedIds();
|
||||
for (K id : failedIds) {
|
||||
addError(id, log.getErrorForId(id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
package pro.taskana;
|
||||
|
||||
/**
|
||||
* represents a callable Object.
|
||||
*
|
||||
* @param <T>
|
||||
* the type of the returned objects.
|
||||
* @author bbr
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface TaskanaCallable<T> {
|
||||
|
||||
T call();
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package pro.taskana;
|
||||
|
||||
/**
|
||||
* This class provides support for transactions.
|
||||
*
|
||||
* @author bbr
|
||||
* @param <T>
|
||||
* the type of the returned objects.
|
||||
*/
|
||||
public interface TaskanaTransactionProvider<T> {
|
||||
|
||||
T executeInTransaction(TaskanaCallable<T> action);
|
||||
}
|
|
@ -44,6 +44,8 @@ public class TaskanaEngineConfiguration {
|
|||
private static final String H2_DRIVER = "org.h2.Driver";
|
||||
private static final String TASKANA_PROPERTIES = "/taskana.properties";
|
||||
private static final String TASKANA_ROLES_SEPARATOR = "|";
|
||||
private static final String TASKANA_JOB_TASK_UPDATES_PER_TRANSACTION = "taskana.job.max.task.updates.per.transaction";
|
||||
private static final String TASKANA_JOB_RETRIES_FOR_FAILED_TASK_UPDATES = "taskana.job.max.retries.for.failed.task.updates";
|
||||
private static final String TASKANA_DOMAINS_PROPERTY = "taskana.domains";
|
||||
private static final String TASKANA_CLASSIFICATION_TYPES_PROPERTY = "taskana.classification.types";
|
||||
private static final String TASKANA_CLASSIFICATION_CATEGORIES_PROPERTY = "taskana.classification.categories";
|
||||
|
@ -68,6 +70,10 @@ public class TaskanaEngineConfiguration {
|
|||
private boolean germanPublicHolidaysEnabled;
|
||||
private List<LocalDate> customHolidays;
|
||||
|
||||
// Properties for task-update Job execution on classification change
|
||||
private int maxNumberOfTaskUpdatesPerTransaction;
|
||||
private int maxNumberOfRetriesOfFailedTaskUpdates;
|
||||
|
||||
// List of configured domain names
|
||||
protected List<String> domains = new ArrayList<String>();
|
||||
|
||||
|
@ -117,11 +123,32 @@ public class TaskanaEngineConfiguration {
|
|||
LOGGER.debug("Reading taskana configuration from {} with role separator {}", propertiesFile, rolesSeparator);
|
||||
Properties props = readPropertiesFromFile(propertiesFile);
|
||||
initTaskanaRoles(props, rolesSeparator);
|
||||
initJobParameters(props);
|
||||
initDomains(props);
|
||||
initClassificationTypes(props);
|
||||
initClassificationCategories(props);
|
||||
}
|
||||
|
||||
private void initJobParameters(Properties props) {
|
||||
String taskUpdates = props.getProperty(TASKANA_JOB_TASK_UPDATES_PER_TRANSACTION);
|
||||
if (taskUpdates == null || taskUpdates.isEmpty()) {
|
||||
maxNumberOfTaskUpdatesPerTransaction = 50;
|
||||
} else {
|
||||
maxNumberOfTaskUpdatesPerTransaction = Integer.parseInt(taskUpdates);
|
||||
}
|
||||
|
||||
String retries = props.getProperty(TASKANA_JOB_RETRIES_FOR_FAILED_TASK_UPDATES);
|
||||
if (retries == null || retries.isEmpty()) {
|
||||
maxNumberOfRetriesOfFailedTaskUpdates = 3;
|
||||
} else {
|
||||
maxNumberOfRetriesOfFailedTaskUpdates = Integer.parseInt(retries);
|
||||
}
|
||||
|
||||
LOGGER.debug(
|
||||
"Configured number of task updates per transaction: {}, number of retries of failed task updates: {}",
|
||||
maxNumberOfTaskUpdatesPerTransaction, maxNumberOfRetriesOfFailedTaskUpdates);
|
||||
}
|
||||
|
||||
private void initDomains(Properties props) {
|
||||
String domainNames = props.getProperty(TASKANA_DOMAINS_PROPERTY);
|
||||
if (domainNames != null && !domainNames.isEmpty()) {
|
||||
|
@ -173,6 +200,7 @@ public class TaskanaEngineConfiguration {
|
|||
if (key != null) {
|
||||
roleMap.put(key, roleMemberSet);
|
||||
} else {
|
||||
LOGGER.error("Internal System error when processing role property {}.", propertyName);
|
||||
throw new SystemException(
|
||||
"Internal System error when processing role property " + propertyName);
|
||||
}
|
||||
|
@ -202,6 +230,7 @@ public class TaskanaEngineConfiguration {
|
|||
LOGGER.debug("Role properties were loaded from file {}.", propertiesFile);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("caught IOException when processing properties file {}.", propertiesFile);
|
||||
throw new SystemException("internal System error when processing properties file " + propertiesFile,
|
||||
e.getCause());
|
||||
}
|
||||
|
@ -271,6 +300,14 @@ public class TaskanaEngineConfiguration {
|
|||
return this.propertiesFileName;
|
||||
}
|
||||
|
||||
public int getMaxNumberOfTaskUpdatesPerTransaction() {
|
||||
return maxNumberOfTaskUpdatesPerTransaction;
|
||||
}
|
||||
|
||||
public int getMaxNumberOfRetriesOfFailedTaskUpdates() {
|
||||
return maxNumberOfRetriesOfFailedTaskUpdates;
|
||||
}
|
||||
|
||||
public void setPropertiesFileName(String propertiesFileName) {
|
||||
this.propertiesFileName = propertiesFileName;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package pro.taskana.impl;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
import pro.taskana.mappings.AttachmentMapper;
|
||||
import pro.taskana.mappings.ClassificationMapper;
|
||||
import pro.taskana.mappings.JobMapper;
|
||||
import pro.taskana.mappings.TaskMapper;
|
||||
|
||||
/**
|
||||
* This class executes a job of type CLASSIFICATIONCHANGEDJOB.
|
||||
*
|
||||
* @author bbr
|
||||
*/
|
||||
|
||||
public class ClassificationChangedJobExecutor implements SingleJobExecutor {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ClassificationChangedJobExecutor.class);
|
||||
|
||||
private TaskanaEngineImpl taskanaEngine;
|
||||
private Job job;
|
||||
private String classificationId;
|
||||
private boolean priorityChanged;
|
||||
private boolean serviceLevelChanged;
|
||||
private TaskMapper taskMapper;
|
||||
private ClassificationMapper classificationMapper;
|
||||
private AttachmentMapper attachmentMapper;
|
||||
|
||||
@Override
|
||||
public BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine) {
|
||||
|
||||
this.job = job;
|
||||
this.taskanaEngine = taskanaEngine;
|
||||
this.taskMapper = taskanaEngine.getSqlSession().getMapper(TaskMapper.class);
|
||||
this.classificationMapper = taskanaEngine.getSqlSession().getMapper(ClassificationMapper.class);
|
||||
this.attachmentMapper = taskanaEngine.getSqlSession().getMapper(AttachmentMapper.class);
|
||||
Map<String, String> args = job.getArguments();
|
||||
classificationId = args.get(CLASSIFICATION_ID);
|
||||
priorityChanged = Boolean.parseBoolean(args.get(PRIORITY_CHANGED));
|
||||
serviceLevelChanged = Boolean.parseBoolean(args.get(SERVICE_LEVEL_CHANGED));
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
bulkLog.addAllErrors(findAffectedTasksAndScheduleUpdateJobs());
|
||||
|
||||
return bulkLog;
|
||||
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> findAffectedTasksAndScheduleUpdateJobs() {
|
||||
List<TaskSummaryImpl> tasks = taskMapper.findTasksAffectedByClassificationChange(classificationId);
|
||||
List<String> taskIdsFromAttachments = attachmentMapper
|
||||
.findTaskIdsAffectedByClassificationChange(classificationId);
|
||||
|
||||
List<String> filteredTaskIdsFromAttachments = taskIdsFromAttachments.isEmpty() ? new ArrayList<>()
|
||||
: taskMapper.filterTaskIdsForNotCompleted(taskIdsFromAttachments);
|
||||
|
||||
Set<String> affectedTaskIds = new HashSet<>(filteredTaskIdsFromAttachments);
|
||||
for (TaskSummaryImpl task : tasks) {
|
||||
affectedTaskIds.add(task.getTaskId());
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("the following tasks are affected by the update of classification {} : {}", classificationId,
|
||||
LoggerUtils.setToString(affectedTaskIds));
|
||||
}
|
||||
int batchSize = taskanaEngine.getConfiguration().getMaxNumberOfTaskUpdatesPerTransaction();
|
||||
List<List<String>> affectedTaskBatches = JobRunner.partition(affectedTaskIds, batchSize);
|
||||
for (List<String> taskIdBatch : affectedTaskBatches) {
|
||||
Map<String, String> args = new HashMap<>();
|
||||
if (!taskIdBatch.isEmpty()) {
|
||||
String taskIds = String.join(",", taskIdBatch);
|
||||
args.put(ClassificationChangedJobExecutor.TASKIDS, taskIds);
|
||||
args.put(CLASSIFICATION_ID, classificationId);
|
||||
args.put(PRIORITY_CHANGED, new Boolean(priorityChanged).toString());
|
||||
args.put(SERVICE_LEVEL_CHANGED, new Boolean(serviceLevelChanged).toString());
|
||||
Job job = new Job();
|
||||
job.setCreated(Instant.now());
|
||||
job.setState(Job.State.READY);
|
||||
job.setRetryCount(0);
|
||||
job.setType(Job.Type.UPDATETASKSJOB);
|
||||
job.setExecutor(TaskUpdateJobExecutor.class.getName());
|
||||
job.setArguments(args);
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(job);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -8,6 +8,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.ibatis.exceptions.PersistenceException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -98,7 +99,7 @@ public class ClassificationServiceImpl implements ClassificationService {
|
|||
}
|
||||
|
||||
private void addClassificationToRootDomain(ClassificationImpl classificationImpl) {
|
||||
if (!classificationImpl.getDomain().equals("")) {
|
||||
if (!Objects.equals(classificationImpl.getDomain(), "")) {
|
||||
boolean doesExist = true;
|
||||
String idBackup = classificationImpl.getId();
|
||||
String domainBackup = classificationImpl.getDomain();
|
||||
|
@ -178,26 +179,22 @@ public class ClassificationServiceImpl implements ClassificationService {
|
|||
}
|
||||
classificationMapper.update(classificationImpl);
|
||||
boolean priorityChanged = oldClassification.getPriority() != classification.getPriority();
|
||||
boolean serviceLevelChanged = true;
|
||||
if (oldClassification.getServiceLevel() == null) {
|
||||
if (classification.getServiceLevel() == null) {
|
||||
serviceLevelChanged = false;
|
||||
}
|
||||
} else if (oldClassification.getServiceLevel().equals(classification.getServiceLevel())) {
|
||||
serviceLevelChanged = false;
|
||||
}
|
||||
|
||||
boolean serviceLevelChanged = !Objects.equals(oldClassification.getServiceLevel(),
|
||||
classification.getServiceLevel());
|
||||
|
||||
if (priorityChanged || serviceLevelChanged) {
|
||||
Map<String, String> args = new HashMap<>();
|
||||
args.put(TaskUpdateOnClassificationChangeExecutor.CLASSIFICATION_ID, classificationImpl.getId());
|
||||
args.put(TaskUpdateOnClassificationChangeExecutor.PRIORITY_CHANGED, String.valueOf(priorityChanged));
|
||||
args.put(TaskUpdateOnClassificationChangeExecutor.SERVICE_LEVEL_CHANGED,
|
||||
args.put(TaskUpdateJobExecutor.CLASSIFICATION_ID, classificationImpl.getId());
|
||||
args.put(TaskUpdateJobExecutor.PRIORITY_CHANGED, String.valueOf(priorityChanged));
|
||||
args.put(TaskUpdateJobExecutor.SERVICE_LEVEL_CHANGED,
|
||||
String.valueOf(serviceLevelChanged));
|
||||
Job job = new Job();
|
||||
job.setCreated(Instant.now());
|
||||
job.setState(Job.State.READY);
|
||||
job.setExecutor(TaskUpdateOnClassificationChangeExecutor.class.getName());
|
||||
job.setExecutor(ClassificationChangedJobExecutor.class.getName());
|
||||
job.setArguments(args);
|
||||
job.setType(Job.Type.CLASSIFICATIONCHANGEDJOB);
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(job);
|
||||
}
|
||||
|
||||
|
|
|
@ -15,9 +15,18 @@ public class Job {
|
|||
private Instant started;
|
||||
private Instant completed;
|
||||
private State state;
|
||||
private Type type;
|
||||
private int retryCount;
|
||||
private String executor;
|
||||
private String errors;
|
||||
Map<String, String> arguments;
|
||||
|
||||
public Job() {
|
||||
created = Instant.now();
|
||||
state = State.READY;
|
||||
retryCount = 0;
|
||||
}
|
||||
|
||||
public Integer getJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
@ -74,6 +83,30 @@ public class Job {
|
|||
this.arguments = arguments;
|
||||
}
|
||||
|
||||
public Type getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
public void setRetryCount(int retryCount) {
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
||||
public String getErrors() {
|
||||
return errors;
|
||||
}
|
||||
|
||||
public void setErrors(String errors) {
|
||||
this.errors = errors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
|
@ -87,10 +120,16 @@ public class Job {
|
|||
builder.append(completed);
|
||||
builder.append(", state=");
|
||||
builder.append(state);
|
||||
builder.append(", type=");
|
||||
builder.append(type);
|
||||
builder.append(", retryCount=");
|
||||
builder.append(retryCount);
|
||||
builder.append(", executor=");
|
||||
builder.append(executor);
|
||||
builder.append(", arguments=");
|
||||
builder.append(arguments);
|
||||
builder.append(", errors=");
|
||||
builder.append(errors);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
@ -107,4 +146,11 @@ public class Job {
|
|||
COMPLETED
|
||||
}
|
||||
|
||||
/**
|
||||
* This enum controls the type of a job.
|
||||
*/
|
||||
public enum Type {
|
||||
CLASSIFICATIONCHANGEDJOB,
|
||||
UPDATETASKSJOB;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,20 @@
|
|||
package pro.taskana.impl;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
import pro.taskana.exceptions.SystemException;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
import pro.taskana.mappings.JobMapper;
|
||||
|
||||
/**
|
||||
|
@ -17,26 +24,48 @@ import pro.taskana.mappings.JobMapper;
|
|||
*/
|
||||
public class JobRunner {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JobRunner.class);
|
||||
private TaskanaEngineImpl taskanaEngine;
|
||||
private JobMapper jobMapper;
|
||||
private int maxRetryCount;
|
||||
private TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider;
|
||||
|
||||
public JobRunner(TaskanaEngine taskanaEngine) {
|
||||
this.taskanaEngine = (TaskanaEngineImpl) taskanaEngine;
|
||||
jobMapper = this.taskanaEngine.getSqlSession().getMapper(JobMapper.class);
|
||||
maxRetryCount = taskanaEngine.getConfiguration().getMaxNumberOfRetriesOfFailedTaskUpdates();
|
||||
txProvider = null;
|
||||
}
|
||||
|
||||
public void registerTransactionProvider(
|
||||
TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider) {
|
||||
this.txProvider = txProvider;
|
||||
}
|
||||
|
||||
public BulkOperationResults<String, Exception> runJobs() {
|
||||
LOGGER.info("entry to runJobs()");
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
Job currentlyProcessedJob = null;
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
List<Job> jobs = jobMapper.findJobsToRun();
|
||||
for (Job job : jobs) {
|
||||
BulkOperationResults<String, Exception> log = runSingleJob(job);
|
||||
bulkLog.addAllErrors(log);
|
||||
List<Job> jobs = findJobsToRun();
|
||||
while (!jobs.isEmpty()) { // run as long as Jobs are available for processing
|
||||
for (Job job : jobs) {
|
||||
currentlyProcessedJob = job;
|
||||
processAJob(bulkLog, job);
|
||||
}
|
||||
jobs = findJobsToRun();
|
||||
}
|
||||
return bulkLog;
|
||||
} catch (Exception e) {
|
||||
if (currentlyProcessedJob != null) {
|
||||
bulkLog.addError("JobId:" + currentlyProcessedJob.getJobId(), e);
|
||||
setJobFailed(currentlyProcessedJob, bulkLog);
|
||||
return bulkLog;
|
||||
} else {
|
||||
LOGGER.error("tried to run jobs and caught exception {} ", e);
|
||||
bulkLog.addError("unknown", e);
|
||||
return bulkLog;
|
||||
}
|
||||
} finally {
|
||||
taskanaEngine.returnConnection();
|
||||
LOGGER.info("exit from runJobs(). Returning result {} ", bulkLog);
|
||||
|
@ -44,10 +73,228 @@ public class JobRunner {
|
|||
|
||||
}
|
||||
|
||||
private List<Job> findJobsToRun() {
|
||||
final List<Job> result = new ArrayList<>();
|
||||
if (txProvider != null) {
|
||||
txProvider.executeInTransaction(() -> { // each job in its own transaction
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
doFindJobsToRun(result);
|
||||
return null;
|
||||
} finally {
|
||||
taskanaEngine.returnConnection();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doFindJobsToRun(result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> doFindJobsToRun(List<Job> jobs) {
|
||||
List<Job> found = taskanaEngine.getSqlSession().getMapper(JobMapper.class).findJobsToRun();
|
||||
jobs.addAll(found);
|
||||
return null;
|
||||
}
|
||||
|
||||
private void processAJob(BulkOperationResults<String, Exception> bulkLog, Job job) {
|
||||
BulkOperationResults<String, Exception> log;
|
||||
try {
|
||||
if (txProvider != null) {
|
||||
log = txProvider.executeInTransaction(() -> { // each job in its own transaction
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
return runSingleJob(job);
|
||||
} finally {
|
||||
taskanaEngine.returnConnection();
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
log = runSingleJob(job);
|
||||
}
|
||||
if (log != null && log.containsErrors()
|
||||
&& Job.Type.UPDATETASKSJOB.equals(job.getType())) {
|
||||
handleRetryForFailuresFromBulkOperationResult(bulkLog, job, log);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// transaction was rolled back -> split job into 2 half sized jobs
|
||||
if (job.getRetryCount() < maxRetryCount) {
|
||||
rescheduleBisectedJob(bulkLog, job);
|
||||
} else {
|
||||
List<String> objectIds;
|
||||
if (job.getType().equals(Job.Type.UPDATETASKSJOB)) {
|
||||
String taskIdsAsString = job.getArguments().get(SingleJobExecutor.TASKIDS);
|
||||
objectIds = Arrays.asList(taskIdsAsString.split(","));
|
||||
} else if (job.getType().equals(Job.Type.CLASSIFICATIONCHANGEDJOB)) {
|
||||
String classificationId = job.getArguments().get(SingleJobExecutor.CLASSIFICATION_ID);
|
||||
objectIds = Arrays.asList(classificationId);
|
||||
} else {
|
||||
throw new SystemException("Unknown Jobtype " + job.getType() + " encountered.");
|
||||
}
|
||||
for (String objectId : objectIds) {
|
||||
bulkLog.addError(objectId, e);
|
||||
}
|
||||
setJobFailed(job, bulkLog);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setJobFailed(Job job, BulkOperationResults<String, Exception> bulkLog) {
|
||||
try {
|
||||
if (txProvider != null) {
|
||||
txProvider.executeInTransaction(() -> { // each job in its own transaction
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
return doSetJobFailed(job, bulkLog);
|
||||
} finally {
|
||||
taskanaEngine.returnConnection();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doSetJobFailed(job, bulkLog);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// transaction was rolled back -> log an Error
|
||||
LOGGER.error("attempted to set job {} to failed, but caught Exception {}", job, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> doSetJobFailed(Job job,
|
||||
BulkOperationResults<String, Exception> bulkLog) {
|
||||
job.setState(Job.State.FAILED);
|
||||
if (job.getStarted() == null) {
|
||||
job.setStarted(Instant.now());
|
||||
}
|
||||
if (bulkLog.containsErrors()) {
|
||||
Map<String, Exception> errors = bulkLog.getErrorMap();
|
||||
job.setErrors(LoggerUtils.mapToString(errors));
|
||||
}
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).update(job);
|
||||
return null;
|
||||
}
|
||||
|
||||
private void handleRetryForFailuresFromBulkOperationResult(BulkOperationResults<String, Exception> bulkLog, Job job,
|
||||
BulkOperationResults<String, Exception> errorLogForThisJob) {
|
||||
if (job.getRetryCount() < maxRetryCount) {
|
||||
if (errorLogForThisJob.containsErrors()) {
|
||||
List<String> failedTasks = errorLogForThisJob.getFailedIds();
|
||||
if (!failedTasks.isEmpty()) { // some tasks failed to be processed
|
||||
LOGGER.error("Errors occurred when running job {}. Processing will be retried", job);
|
||||
scheduleRetryJob(job, failedTasks);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bulkLog.addAllErrors(errorLogForThisJob);
|
||||
setJobFailed(job, errorLogForThisJob);
|
||||
}
|
||||
}
|
||||
|
||||
private void rescheduleBisectedJob(BulkOperationResults<String, Exception> bulkLog, Job job) {
|
||||
// the transaction that processed the job was rolled back.
|
||||
try {
|
||||
if (txProvider != null) {
|
||||
txProvider.executeInTransaction(() -> { // each job in its own transaction
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
return doRescheduleBisectedJob(job);
|
||||
} finally {
|
||||
taskanaEngine.returnConnection();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doRescheduleBisectedJob(job);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// transaction was rolled back -> log an Error
|
||||
LOGGER.error("attempted to reschedule bisected jobs for {}, but caught Exception {}", job, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> doRescheduleBisectedJob(Job job) {
|
||||
if (job.getType().equals(Job.Type.UPDATETASKSJOB)) { // split the job in halves
|
||||
Map<String, String> args = job.getArguments();
|
||||
String taskIdsString = args.get(SingleJobExecutor.TASKIDS);
|
||||
List<String> taskIds = Arrays.asList(taskIdsString.split(","));
|
||||
int size = taskIds.size();
|
||||
if (size >= 2) {
|
||||
int halfSize = size % 2 == 0 ? size / 2 : (size / 2 + 1);
|
||||
List<List<String>> taskIdListsForNewJobs = partition(taskIds, halfSize);
|
||||
// now schedule new tasks
|
||||
|
||||
for (List<String> halfSizedTaskIds : taskIdListsForNewJobs) {
|
||||
Job newJob = new Job();
|
||||
newJob.setCreated(Instant.now());
|
||||
if (halfSize > 1) {
|
||||
newJob.setRetryCount(0);
|
||||
} else {
|
||||
newJob.setRetryCount(job.getRetryCount() + 1);
|
||||
}
|
||||
newJob.setState(Job.State.READY);
|
||||
newJob.setType(job.getType());
|
||||
args.put(SingleJobExecutor.TASKIDS, String.join(",", halfSizedTaskIds));
|
||||
newJob.setArguments(args);
|
||||
newJob.setCreated(Instant.now());
|
||||
newJob.setExecutor(job.getExecutor());
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(newJob);
|
||||
}
|
||||
LOGGER.debug("doRescheduleBisectedJob deleting job {} ", job);
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).delete(job);
|
||||
}
|
||||
} else { // take care that the job is re-executed
|
||||
job.setState(Job.State.READY);
|
||||
job.setRetryCount(job.getRetryCount() + 1);
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).update(job);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void scheduleRetryJob(Job job, List<String> failedTasks) {
|
||||
if (job.getType().equals(Job.Type.UPDATETASKSJOB)) {
|
||||
try {
|
||||
if (txProvider != null) {
|
||||
txProvider.executeInTransaction(() -> { // each job in its own transaction
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
return doScheduleRetryJob(job, failedTasks);
|
||||
} finally {
|
||||
taskanaEngine.returnConnection();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doScheduleRetryJob(job, failedTasks);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// transaction was rolled back -> log an Error
|
||||
LOGGER.error("attempted to reschedule bisected jobs for {}, but caught Exception {}", job, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> doScheduleRetryJob(Job job, List<String> failedTasks) {
|
||||
LOGGER.debug("entry to doScheduleRetryJob for job {} and failedTasks {}", job,
|
||||
LoggerUtils.listToString(failedTasks));
|
||||
Map<String, String> args = job.getArguments();
|
||||
Job newJob = new Job();
|
||||
newJob.setCreated(Instant.now());
|
||||
newJob.setRetryCount(job.getRetryCount() + 1);
|
||||
newJob.setState(Job.State.READY);
|
||||
newJob.setType(job.getType());
|
||||
args.put(SingleJobExecutor.TASKIDS, String.join(",", failedTasks));
|
||||
newJob.setArguments(args);
|
||||
newJob.setExecutor(job.getExecutor());
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).insertJob(newJob);
|
||||
LOGGER.debug("doScheduleRetryJob deleting job {} and scheduling {} ", job, newJob);
|
||||
taskanaEngine.getSqlSession().getMapper(JobMapper.class).delete(job);
|
||||
return null;
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> runSingleJob(Job job) {
|
||||
LOGGER.debug("entry to runSingleJob(job = {})", job);
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
if (Job.State.READY.equals(job.getState())) {
|
||||
BulkOperationResults<String, Exception> bulkLog;
|
||||
if (job.getStarted() == null) {
|
||||
job.setStarted(Instant.now());
|
||||
}
|
||||
job.setState(Job.State.RUNNING);
|
||||
|
@ -55,26 +302,35 @@ public class JobRunner {
|
|||
SingleJobExecutor executor;
|
||||
try {
|
||||
executor = (SingleJobExecutor) Class.forName(job.getExecutor()).newInstance();
|
||||
bulkLog = executor.runSingleJob(job, taskanaEngine);
|
||||
|
||||
} catch (Exception e) {
|
||||
bulkLog.addError("JobId:" + job.getJobId(), e);
|
||||
job.setCompleted(Instant.now());
|
||||
job.setState(Job.State.FAILED);
|
||||
jobMapper.update(job);
|
||||
return bulkLog;
|
||||
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
|
||||
LOGGER.error("When attempting to load class {} caught Exception {} ", job.getExecutor(), e);
|
||||
throw new SystemException("When attempting to load class " + job.getExecutor() + " caught Exception " + e);
|
||||
}
|
||||
bulkLog = executor.runSingleJob(job, taskanaEngine);
|
||||
|
||||
if (!bulkLog.containsErrors()) {
|
||||
LOGGER.debug("runSingleJob deletin job {} ", job);
|
||||
jobMapper.delete(job);
|
||||
}
|
||||
job.setCompleted(Instant.now());
|
||||
job.setState(Job.State.COMPLETED);
|
||||
jobMapper.update(job);
|
||||
|
||||
LOGGER.debug("exit from runSingleJob");
|
||||
if (bulkLog.containsErrors()) {
|
||||
LOGGER.error("Errors occurred when running job {}.", job);
|
||||
for (String id : bulkLog.getFailedIds()) {
|
||||
LOGGER.error(id + bulkLog.getErrorForId(id));
|
||||
}
|
||||
}
|
||||
return bulkLog;
|
||||
}
|
||||
|
||||
static <T> List<List<T>> partition(Collection<T> members, int maxSize) {
|
||||
List<List<T>> result = new ArrayList<>();
|
||||
List<T> internal = new ArrayList<>();
|
||||
for (T member : members) {
|
||||
internal.add(member);
|
||||
if (internal.size() == maxSize) {
|
||||
result.add(internal);
|
||||
internal = new ArrayList<>();
|
||||
}
|
||||
}
|
||||
if (!internal.isEmpty()) {
|
||||
result.add(internal);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,5 +9,10 @@ import pro.taskana.BulkOperationResults;
|
|||
*/
|
||||
public interface SingleJobExecutor {
|
||||
|
||||
String TASKIDS = "taskIds";
|
||||
String CLASSIFICATION_ID = "classificationId";
|
||||
String PRIORITY_CHANGED = "priorityChanged";
|
||||
String SERVICE_LEVEL_CHANGED = "serviceLevelChanged";
|
||||
|
||||
BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine);
|
||||
}
|
||||
|
|
|
@ -301,7 +301,7 @@ public class TaskServiceImpl implements TaskService {
|
|||
TaskImpl task = (TaskImpl) taskToCreate;
|
||||
try {
|
||||
taskanaEngine.openConnection();
|
||||
if (task.getId() != null && !task.getId().equals("")) {
|
||||
if (task.getId() != null && !"".equals(task.getId())) {
|
||||
throw new TaskAlreadyExistException(task.getId());
|
||||
} else {
|
||||
LOGGER.debug("Task {} cannot be be found, so it can be created.", task.getId());
|
||||
|
@ -1416,7 +1416,6 @@ public class TaskServiceImpl implements TaskService {
|
|||
|
||||
Classification classification = classificationService.getClassification(classificationId);
|
||||
task.setClassificationSummary(classification.asSummary());
|
||||
|
||||
PrioDurationHolder prioDurationFromAttachments = handleAttachmentsOnClassificationUpdate(task);
|
||||
|
||||
updateClassificationRelatedProperties(task, task, prioDurationFromAttachments);
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package pro.taskana.impl;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
|
||||
/**
|
||||
* This class performs task updates if a classification is changed.
|
||||
*
|
||||
* @author bbr
|
||||
*/
|
||||
public class TaskUpdateJobExecutor implements SingleJobExecutor {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskUpdateJobExecutor.class);
|
||||
|
||||
private TaskanaEngineImpl taskanaEngine;
|
||||
private String classificationId;
|
||||
private List<String> affectedTaskIds;
|
||||
|
||||
public TaskUpdateJobExecutor() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine) {
|
||||
this.taskanaEngine = taskanaEngine;
|
||||
Map<String, String> args = job.getArguments();
|
||||
String taskIdsString = args.get(TASKIDS);
|
||||
affectedTaskIds = Arrays.asList(taskIdsString.split(","));
|
||||
|
||||
classificationId = args.get(CLASSIFICATION_ID);
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
bulkLog.addAllErrors(handleAffectedTasks(job));
|
||||
|
||||
return bulkLog;
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> handleAffectedTasks(Job job) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("the following tasks will be updated by the current job {}",
|
||||
LoggerUtils.listToString(affectedTaskIds));
|
||||
}
|
||||
TaskServiceImpl taskService = (TaskServiceImpl) taskanaEngine.getTaskService();
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
for (String taskId : affectedTaskIds) {
|
||||
try {
|
||||
bulkLog.addAllErrors(taskService.classificationChanged(taskId, classificationId));
|
||||
} catch (Exception e) {
|
||||
bulkLog.addError(taskId, e);
|
||||
}
|
||||
}
|
||||
return bulkLog;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,82 +0,0 @@
|
|||
package pro.taskana.impl;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
import pro.taskana.mappings.AttachmentMapper;
|
||||
import pro.taskana.mappings.ClassificationMapper;
|
||||
import pro.taskana.mappings.TaskMapper;
|
||||
|
||||
/**
|
||||
* This class performs task updates if a classification is changed.
|
||||
*
|
||||
* @author bbr
|
||||
*/
|
||||
public class TaskUpdateOnClassificationChangeExecutor implements SingleJobExecutor {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class);
|
||||
public static final String CLASSIFICATION_ID = "classificationId";
|
||||
public static final String PRIORITY_CHANGED = "priorityChanged";
|
||||
public static final String SERVICE_LEVEL_CHANGED = "serviceLevelChanged";
|
||||
|
||||
private TaskanaEngineImpl taskanaEngine;
|
||||
private Job job;
|
||||
private String classificationId;
|
||||
private boolean priorityChanged;
|
||||
private boolean serviceLevelChanged;
|
||||
private TaskMapper taskMapper;
|
||||
private ClassificationMapper classificationMapper;
|
||||
private AttachmentMapper attachmentMapper;
|
||||
|
||||
public TaskUpdateOnClassificationChangeExecutor() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkOperationResults<String, Exception> runSingleJob(Job job, TaskanaEngineImpl taskanaEngine) {
|
||||
this.job = job;
|
||||
this.taskanaEngine = taskanaEngine;
|
||||
this.taskMapper = taskanaEngine.getSqlSession().getMapper(TaskMapper.class);
|
||||
this.classificationMapper = taskanaEngine.getSqlSession().getMapper(ClassificationMapper.class);
|
||||
this.attachmentMapper = taskanaEngine.getSqlSession().getMapper(AttachmentMapper.class);
|
||||
Map<String, String> args = job.getArguments();
|
||||
classificationId = args.get(CLASSIFICATION_ID);
|
||||
priorityChanged = Boolean.getBoolean(args.get(PRIORITY_CHANGED));
|
||||
serviceLevelChanged = Boolean.getBoolean(args.get(SERVICE_LEVEL_CHANGED));
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
bulkLog.addAllErrors(handleAffectedTasks());
|
||||
|
||||
return bulkLog;
|
||||
}
|
||||
|
||||
private BulkOperationResults<String, Exception> handleAffectedTasks() {
|
||||
List<TaskSummaryImpl> tasks = taskMapper.findTasksAffectedByClassificationChange(classificationId);
|
||||
List<String> taskIdsFromAttachments = attachmentMapper
|
||||
.findTaskIdsAffectedByClassificationChange(classificationId);
|
||||
List<String> filteredTaskIdsFromAttachments = taskMapper.filterTaskIdsForNotCompleted(taskIdsFromAttachments);
|
||||
|
||||
Set<String> affectedTaskIds = new HashSet<>(filteredTaskIdsFromAttachments);
|
||||
for (TaskSummaryImpl task : tasks) {
|
||||
affectedTaskIds.add(task.getTaskId());
|
||||
}
|
||||
LOGGER.debug("the following tasks are affected by the update of classification {} : {}", classificationId,
|
||||
LoggerUtils.setToString(affectedTaskIds));
|
||||
TaskServiceImpl taskService = (TaskServiceImpl) taskanaEngine.getTaskService();
|
||||
BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
|
||||
for (String taskId : affectedTaskIds) {
|
||||
try {
|
||||
bulkLog.addAllErrors(taskService.classificationChanged(taskId, classificationId));
|
||||
} catch (Exception e) {
|
||||
bulkLog.addError(taskId, e);
|
||||
}
|
||||
}
|
||||
return bulkLog;
|
||||
}
|
||||
|
||||
}
|
|
@ -3,6 +3,7 @@ package pro.taskana.mappings;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.ibatis.annotations.Delete;
|
||||
import org.apache.ibatis.annotations.Insert;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Result;
|
||||
|
@ -19,7 +20,7 @@ import pro.taskana.impl.persistence.MapTypeHandler;
|
|||
public interface JobMapper {
|
||||
|
||||
@Insert("<script>"
|
||||
+ "INSERT INTO TASKANA.JOB (JOB_ID, CREATED, STARTED, COMPLETED, STATE, EXECUTOR, ARGUMENTS) "
|
||||
+ "INSERT INTO TASKANA.JOB (JOB_ID, CREATED, STARTED, COMPLETED, STATE, TYPE, RETRY_COUNT, EXECUTOR, ERRORS, ARGUMENTS) "
|
||||
+ "VALUES ("
|
||||
+ "<choose>"
|
||||
+ "<when test=\"_databaseId == 'db2'\">"
|
||||
|
@ -29,29 +30,40 @@ public interface JobMapper {
|
|||
+ "nextval('TASKANA.JOB_SEQ')"
|
||||
+ "</otherwise>"
|
||||
+ "</choose>"
|
||||
+ ", #{job.created}, #{job.started}, #{job.completed}, #{job.state}, #{job.executor}, #{job.arguments,javaType=java.util.Map,typeHandler=pro.taskana.impl.persistence.MapTypeHandler} )"
|
||||
+ ", #{job.created}, #{job.started}, #{job.completed}, #{job.state}, #{job.type}, #{job.retryCount}, #{job.executor}, #{job.errors}, #{job.arguments,javaType=java.util.Map,typeHandler=pro.taskana.impl.persistence.MapTypeHandler} )"
|
||||
+ "</script>")
|
||||
void insertJob(@Param("job") Job job);
|
||||
|
||||
@Select("SELECT JOB_ID, CREATED, STARTED, COMPLETED, STATE, EXECUTOR, ARGUMENTS "
|
||||
@Select("<script> SELECT JOB_ID, CREATED, STARTED, COMPLETED, STATE, TYPE, RETRY_COUNT, EXECUTOR, ERRORS, ARGUMENTS "
|
||||
+ "FROM TASKANA.JOB "
|
||||
+ "WHERE STATE IN ( 'READY') "
|
||||
+ "ORDER BY JOB_ID ")
|
||||
+ "ORDER BY JOB_ID "
|
||||
+ "<if test=\"_databaseId == 'db2'\">with UR </if> "
|
||||
+ "</script>")
|
||||
@Results(value = {
|
||||
@Result(property = "jobId", column = "JOB_ID"),
|
||||
@Result(property = "created", column = "CREATED"),
|
||||
@Result(property = "started", column = "STARTED"),
|
||||
@Result(property = "completed", column = "COMPLETED"),
|
||||
@Result(property = "state", column = "STATE"),
|
||||
@Result(property = "type", column = "TYPE"),
|
||||
@Result(property = "retryCount", column = "RETRY_COUNT"),
|
||||
@Result(property = "executor", column = "EXECUTOR"),
|
||||
@Result(property = "errors", column = "ERRORS"),
|
||||
@Result(property = "arguments", column = "ARGUMENTS",
|
||||
javaType = Map.class, typeHandler = MapTypeHandler.class)
|
||||
})
|
||||
List<Job> findJobsToRun();
|
||||
|
||||
@Update(
|
||||
value = "UPDATE TASKANA.JOB SET CREATED = #{created}, STARTED = #{started}, COMPLETED = #{completed}, STATE = #{state}, EXECUTOR = #{executor}, "
|
||||
value = "UPDATE TASKANA.JOB SET CREATED = #{created}, STARTED = #{started}, COMPLETED = #{completed}, STATE = #{state}, "
|
||||
+ "TYPE = #{type}, RETRY_COUNT = #{retryCount}, EXECUTOR = #{executor}, "
|
||||
+ "ERRORS = #{errors}, "
|
||||
+ "ARGUMENTS = #{arguments,jdbcType=CLOB ,javaType=java.util.Map,typeHandler=pro.taskana.impl.persistence.MapTypeHandler} "
|
||||
+ "where JOB_ID = #{jobId}")
|
||||
void update(Job job);
|
||||
|
||||
@Delete(
|
||||
value = "DELETE FROM TASKANA.JOB WHERE JOB_ID = #{jobId}")
|
||||
void delete(Job job);
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ CREATE TABLE TASKANA.TASKANA_SCHEMA_VERSION(
|
|||
VERSION VARCHAR(255) NOT NULL,
|
||||
PRIMARY KEY (ID)
|
||||
);
|
||||
INSERT INTO TASKANA.TASKANA_SCHEMA_VERSION VALUES ('1', '0.1.5');
|
||||
INSERT INTO TASKANA.TASKANA_SCHEMA_VERSION VALUES ('1', '0.9.2');
|
||||
|
||||
CREATE TABLE TASKANA.CLASSIFICATION(
|
||||
ID CHAR(40) NOT NULL,
|
||||
|
@ -173,7 +173,10 @@ CREATE TABLE TASKANA.JOB(
|
|||
STARTED TIMESTAMP NULL,
|
||||
COMPLETED TIMESTAMP NULL,
|
||||
STATE VARCHAR(32) NULL,
|
||||
TYPE VARCHAR(32) NULL,
|
||||
RETRY_COUNT INTEGER NOT NULL,
|
||||
EXECUTOR VARCHAR(128) NOT NULL,
|
||||
ERRORS VARCHAR(4096) NULL,
|
||||
ARGUMENTS CLOB NULL,
|
||||
PRIMARY KEY (JOB_ID)
|
||||
);
|
||||
|
|
|
@ -7,3 +7,6 @@ taskana.domains= Domain_A , DOMAIN_B
|
|||
|
||||
taskana.classification.types= TASK , document
|
||||
taskana.classification.categories= EXTERNAL , manual, autoMAtic ,Process
|
||||
|
||||
taskana.job.max.task.updates.per.transaction=5
|
||||
taskana.job.max.retries.for.failed.task.updates=3
|
||||
|
|
|
@ -27,7 +27,7 @@ import pro.taskana.sampledata.SampleDataGenerator;
|
|||
*/
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
@Import(RestConfiguration.class)
|
||||
@Import(SampleConfiguration.class)
|
||||
public class ExampleRestApplication {
|
||||
|
||||
@Autowired
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
package pro.taskana.rest;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaEngine;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
import pro.taskana.impl.JobRunner;
|
||||
import pro.taskana.impl.util.LoggerUtils;
|
||||
|
||||
|
@ -23,17 +24,30 @@ import pro.taskana.impl.util.LoggerUtils;
|
|||
public class JobScheduler {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(JobScheduler.class);
|
||||
private static AtomicBoolean jobRunning = new AtomicBoolean(false);
|
||||
|
||||
@Autowired
|
||||
private TaskanaEngine taskanaEngine;
|
||||
|
||||
@Scheduled(fixedRate = 60000)
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void triggerJobs() {
|
||||
JobRunner runner = new JobRunner(taskanaEngine);
|
||||
LOGGER.info("Running Jobs");
|
||||
BulkOperationResults<String, Exception> result = runner.runJobs();
|
||||
Map<String, Exception> errors = result.getErrorMap();
|
||||
LOGGER.info("Job run completed. Result = {} ", LoggerUtils.mapToString(errors));
|
||||
}
|
||||
@Autowired
|
||||
TaskanaTransactionProvider<BulkOperationResults<String, Exception>> springTransactionProvider;
|
||||
|
||||
@Scheduled(fixedRate = 60000)
|
||||
public void triggerJobs() {
|
||||
boolean otherJobActive = jobRunning.getAndSet(true);
|
||||
if (!otherJobActive) { // only one job should be active at any time
|
||||
try {
|
||||
JobRunner runner = new JobRunner(taskanaEngine);
|
||||
runner.registerTransactionProvider(springTransactionProvider);
|
||||
LOGGER.info("Running Jobs");
|
||||
BulkOperationResults<String, Exception> result = runner.runJobs();
|
||||
Map<String, Exception> errors = result.getErrorMap();
|
||||
LOGGER.info("Job run completed. Result = {} ", LoggerUtils.mapToString(errors));
|
||||
} finally {
|
||||
jobRunning.set(false);
|
||||
}
|
||||
} else {
|
||||
LOGGER.info("Don't run Jobs because already another JobRunner is running");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package pro.taskana.rest;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
|
||||
/**
|
||||
* Configuration class for Spring sample application.
|
||||
*/
|
||||
@Import(RestConfiguration.class)
|
||||
public class SampleConfiguration {
|
||||
|
||||
@Bean
|
||||
public TaskanaTransactionProvider<BulkOperationResults<String, Exception>> springTransactionProvider() {
|
||||
return new SpringTransactionProvider();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package pro.taskana.rest;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import pro.taskana.BulkOperationResults;
|
||||
import pro.taskana.TaskanaCallable;
|
||||
import pro.taskana.TaskanaTransactionProvider;
|
||||
|
||||
@Component
|
||||
public class SpringTransactionProvider implements TaskanaTransactionProvider<BulkOperationResults<String, Exception>> {
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public BulkOperationResults<String, Exception> executeInTransaction(
|
||||
TaskanaCallable<BulkOperationResults<String, Exception>> action) {
|
||||
return action.call();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
logging.level.pro.taskana=DEBUG
|
||||
l###logging.level.org.springframework=DEBUG
|
||||
### logging.level.org.springframework=DEBUG
|
||||
######## Taskana DB #######
|
||||
datasource.url=jdbc:h2:mem:taskana;IGNORECASE=TRUE;LOCK_MODE=0;INIT=CREATE SCHEMA IF NOT EXISTS TASKANA
|
||||
datasource.driverClassName=org.h2.Driver
|
||||
|
|
|
@ -5,3 +5,6 @@ taskana.roles.monitor=john|teamlead_2 | monitor
|
|||
taskana.domains=DOMAIN_A,DOMAIN_B,DOMAIN_C
|
||||
taskana.classification.types=TASK,DOCUMENT
|
||||
taskana.classification.categories= EXTERNAL , manual, autoMAtic ,Process
|
||||
|
||||
taskana.job.max.task.updates.per.transaction=5
|
||||
taskana.job.max.retries.for.failed.task.updates=3
|
||||
|
|
|
@ -9,11 +9,19 @@ import java.io.IOException;
|
|||
import java.io.OutputStreamWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.json.JSONObject;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
|
||||
import org.springframework.boot.web.server.LocalServerPort;
|
||||
|
@ -37,12 +45,27 @@ import org.springframework.web.client.RestTemplate;
|
|||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import pro.taskana.Classification;
|
||||
import pro.taskana.Task;
|
||||
import pro.taskana.exceptions.InvalidArgumentException;
|
||||
import pro.taskana.exceptions.NotAuthorizedException;
|
||||
import pro.taskana.rest.resource.ClassificationResource;
|
||||
import pro.taskana.rest.resource.ClassificationSummaryResource;
|
||||
import pro.taskana.rest.resource.TaskResource;
|
||||
import pro.taskana.rest.resource.assembler.ClassificationResourceAssembler;
|
||||
import pro.taskana.rest.resource.assembler.TaskResourceAssembler;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = RestConfiguration.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = {"devMode=true"})
|
||||
public class ClassificationControllerIntTest {
|
||||
|
||||
@Autowired
|
||||
private ClassificationResourceAssembler classificationResourceAssembler;
|
||||
|
||||
@Autowired
|
||||
private TaskResourceAssembler taskResourceAssembler;
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ClassificationControllerIntTest.class);
|
||||
String server = "http://127.0.0.1:";
|
||||
RestTemplate template;
|
||||
HttpEntity<String> request;
|
||||
|
@ -212,6 +235,110 @@ public class ClassificationControllerIntTest {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateClassificationPrioServiceLevel()
|
||||
throws IOException, InterruptedException, NotAuthorizedException, InvalidArgumentException {
|
||||
|
||||
// 1st step: get old classification :
|
||||
Instant before = Instant.now();
|
||||
|
||||
ResponseEntity<ClassificationResource> response = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/classifications/CLI:100000000000000000000000000000000003",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<ClassificationResource>() {
|
||||
|
||||
});
|
||||
|
||||
assertNotNull(response.getBody().getLink(Link.REL_SELF));
|
||||
ClassificationResource classification = response.getBody();
|
||||
|
||||
// 2nd step: modify classification and trigger update
|
||||
classification.removeLinks();
|
||||
classification.setServiceLevel("P5D");
|
||||
classification.setPriority(1000);
|
||||
|
||||
String updatedClassification = new JSONObject(classification).toString();
|
||||
|
||||
URL url = new URL(server + port + "/v1/classifications/CLI:100000000000000000000000000000000003");
|
||||
HttpURLConnection con = (HttpURLConnection) url.openConnection();
|
||||
con.setRequestMethod("PUT");
|
||||
con.setRequestProperty("Authorization", "Basic dGVhbWxlYWRfMTp0ZWFtbGVhZF8x");
|
||||
con.setDoOutput(true);
|
||||
con.setRequestProperty("Content-Type", "application/json");
|
||||
BufferedWriter out = new BufferedWriter(new OutputStreamWriter(con.getOutputStream()));
|
||||
out.write(updatedClassification);
|
||||
out.flush();
|
||||
out.close();
|
||||
assertEquals(200, con.getResponseCode());
|
||||
con.disconnect();
|
||||
|
||||
// wait a minute to give JobScheduler a chance to run
|
||||
LOGGER.info("About to sleep for 70 seconds to give JobScheduler a chance to process the classification change");
|
||||
Thread.sleep(70000);
|
||||
LOGGER.info("Sleeping ended. Continuing .... ");
|
||||
|
||||
// verify the classification modified timestamp is after 'before'
|
||||
ResponseEntity<ClassificationResource> repeatedResponse = template.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/classifications/CLI:100000000000000000000000000000000003",
|
||||
HttpMethod.GET,
|
||||
request,
|
||||
new ParameterizedTypeReference<ClassificationResource>() {
|
||||
|
||||
});
|
||||
|
||||
ClassificationResource modifiedClassificationResource = repeatedResponse.getBody();
|
||||
Classification modifiedClassification = classificationResourceAssembler.toModel(modifiedClassificationResource);
|
||||
|
||||
assertTrue(!before.isAfter(modifiedClassification.getModified()));
|
||||
|
||||
List<String> affectedTasks = new ArrayList<>(
|
||||
Arrays.asList("TKI:000000000000000000000000000000000003", "TKI:000000000000000000000000000000000004",
|
||||
"TKI:000000000000000000000000000000000005", "TKI:000000000000000000000000000000000006",
|
||||
"TKI:000000000000000000000000000000000007", "TKI:000000000000000000000000000000000008",
|
||||
"TKI:000000000000000000000000000000000009", "TKI:000000000000000000000000000000000010",
|
||||
"TKI:000000000000000000000000000000000011", "TKI:000000000000000000000000000000000012",
|
||||
"TKI:000000000000000000000000000000000013", "TKI:000000000000000000000000000000000014",
|
||||
"TKI:000000000000000000000000000000000015", "TKI:000000000000000000000000000000000016",
|
||||
"TKI:000000000000000000000000000000000017", "TKI:000000000000000000000000000000000018",
|
||||
"TKI:000000000000000000000000000000000019", "TKI:000000000000000000000000000000000020",
|
||||
"TKI:000000000000000000000000000000000021", "TKI:000000000000000000000000000000000022",
|
||||
"TKI:000000000000000000000000000000000023", "TKI:000000000000000000000000000000000024",
|
||||
"TKI:000000000000000000000000000000000025", "TKI:000000000000000000000000000000000026",
|
||||
"TKI:000000000000000000000000000000000027", "TKI:000000000000000000000000000000000028",
|
||||
"TKI:000000000000000000000000000000000029", "TKI:000000000000000000000000000000000030",
|
||||
"TKI:000000000000000000000000000000000031", "TKI:000000000000000000000000000000000032",
|
||||
"TKI:000000000000000000000000000000000033", "TKI:000000000000000000000000000000000034",
|
||||
"TKI:000000000000000000000000000000000035", "TKI:000000000000000000000000000000000100",
|
||||
"TKI:000000000000000000000000000000000101", "TKI:000000000000000000000000000000000102",
|
||||
"TKI:000000000000000000000000000000000103"));
|
||||
for (String taskId : affectedTasks) {
|
||||
verifyTaskIsModifiedAfter(taskId, before);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void verifyTaskIsModifiedAfter(String taskId, Instant before) throws InvalidArgumentException {
|
||||
RestTemplate admTemplate = getRestTemplate();
|
||||
HttpHeaders admHeaders = new HttpHeaders();
|
||||
admHeaders.add("Authorization", "Basic YWRtaW46YWRtaW4="); // admin:admin
|
||||
|
||||
HttpEntity<String> admRequest = new HttpEntity<String>(admHeaders);
|
||||
|
||||
ResponseEntity<TaskResource> taskResponse = admTemplate.exchange(
|
||||
"http://127.0.0.1:" + port + "/v1/tasks/" + taskId,
|
||||
HttpMethod.GET,
|
||||
admRequest,
|
||||
new ParameterizedTypeReference<TaskResource>() {
|
||||
|
||||
});
|
||||
|
||||
TaskResource taskResource = taskResponse.getBody();
|
||||
Task task = taskResourceAssembler.toModel(taskResource);
|
||||
|
||||
assertTrue(!before.isAfter(task.getModified()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a REST template which is capable of dealing with responses in HAL format
|
||||
*
|
||||
|
@ -226,7 +353,7 @@ public class ClassificationControllerIntTest {
|
|||
converter.setSupportedMediaTypes(MediaType.parseMediaTypes("application/hal+json"));
|
||||
converter.setObjectMapper(mapper);
|
||||
|
||||
RestTemplate template = new RestTemplate(Collections.<HttpMessageConverter<?>>singletonList(converter));
|
||||
RestTemplate template = new RestTemplate(Collections.<HttpMessageConverter<?>> singletonList(converter));
|
||||
return template;
|
||||
}
|
||||
|
||||
|
|
|
@ -205,4 +205,55 @@ public class ClassificationResource extends ResourceSupport {
|
|||
public void setCustom8(String custom8) {
|
||||
this.custom8 = custom8;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("ClassificationResource [classificationId=");
|
||||
builder.append(classificationId);
|
||||
builder.append(", key=");
|
||||
builder.append(key);
|
||||
builder.append(", parentId=");
|
||||
builder.append(parentId);
|
||||
builder.append(", category=");
|
||||
builder.append(category);
|
||||
builder.append(", type=");
|
||||
builder.append(type);
|
||||
builder.append(", domain=");
|
||||
builder.append(domain);
|
||||
builder.append(", isValidInDomain=");
|
||||
builder.append(isValidInDomain);
|
||||
builder.append(", created=");
|
||||
builder.append(created);
|
||||
builder.append(", modified=");
|
||||
builder.append(modified);
|
||||
builder.append(", name=");
|
||||
builder.append(name);
|
||||
builder.append(", description=");
|
||||
builder.append(description);
|
||||
builder.append(", priority=");
|
||||
builder.append(priority);
|
||||
builder.append(", serviceLevel=");
|
||||
builder.append(serviceLevel);
|
||||
builder.append(", applicationEntryPoint=");
|
||||
builder.append(applicationEntryPoint);
|
||||
builder.append(", custom1=");
|
||||
builder.append(custom1);
|
||||
builder.append(", custom2=");
|
||||
builder.append(custom2);
|
||||
builder.append(", custom3=");
|
||||
builder.append(custom3);
|
||||
builder.append(", custom4=");
|
||||
builder.append(custom4);
|
||||
builder.append(", custom5=");
|
||||
builder.append(custom5);
|
||||
builder.append(", custom6=");
|
||||
builder.append(custom6);
|
||||
builder.append(", custom7=");
|
||||
builder.append(custom7);
|
||||
builder.append(", custom8=");
|
||||
builder.append(custom8);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue