TSK-567 Add transactional support {Part1}

This commit is contained in:
Martin Rojas Miguel Angel 2018-06-21 10:33:14 +02:00 committed by Holger Hagen
parent 8b060b2a39
commit f09a31776b
4 changed files with 86 additions and 29 deletions

View File

@ -112,4 +112,21 @@ public class BulkOperationResults<K, V> {
} }
} }
} }
/**
* Map from any exception type to Exception.
*
* @return map of errors which can´t be null.
*/
public BulkOperationResults<K, Exception> mapBulkOperationResults() {
BulkOperationResults<K, Exception> bulkLogMapped = new BulkOperationResults<>();
List<K> failedIds = this.getFailedIds();
for (K id : failedIds) {
bulkLogMapped.addError(id, (Exception) this.getErrorForId(id));
}
return bulkLogMapped;
}
} }

View File

@ -8,11 +8,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import pro.taskana.BulkOperationResults; import pro.taskana.BulkOperationResults;
import pro.taskana.TaskService;
import pro.taskana.TaskSummary; import pro.taskana.TaskSummary;
import pro.taskana.TaskanaEngine; import pro.taskana.TaskanaEngine;
import pro.taskana.TaskanaTransactionProvider;
import pro.taskana.TimeInterval; import pro.taskana.TimeInterval;
import pro.taskana.exceptions.TaskanaException; import pro.taskana.exceptions.InvalidArgumentException;
/** /**
* This is the runner for Tasks jobs. * This is the runner for Tasks jobs.
@ -22,37 +22,78 @@ import pro.taskana.exceptions.TaskanaException;
public class JobTaskRunner { public class JobTaskRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class); private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class);
private TaskanaEngineImpl taskanaEngine; private TaskServiceImpl taskanaService;
private TaskServiceImpl taskService; private TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider;
private int maxRetryOperations = 3;
private int batchSize = 50;
public JobTaskRunner(TaskanaEngine taskanaEngine, TaskService taskService) { public JobTaskRunner(TaskanaEngine taskanaEngine) {
this.taskanaEngine = (TaskanaEngineImpl) taskanaEngine; this.taskanaService = (TaskServiceImpl) taskanaEngine.getTaskService();
this.taskService = (TaskServiceImpl) taskService;
} }
public BulkOperationResults<String, TaskanaException> runCleanCompletedTasks(Instant untilDate) { public BulkOperationResults<String, Exception> runCleanCompletedTasks(Instant untilDate) {
return cleanCompletedTasks(untilDate); return cleanCompletedTasks(untilDate);
} }
private BulkOperationResults<String, TaskanaException> cleanCompletedTasks(Instant untilDate) { public void registerTransactionProvider(
TaskanaTransactionProvider<BulkOperationResults<String, Exception>> txProvider) {
this.txProvider = txProvider;
}
private BulkOperationResults<String, Exception> cleanCompletedTasks(Instant untilDate) {
LOGGER.info("entry to RunCompletedTasks({})", untilDate.toString()); LOGGER.info("entry to RunCompletedTasks({})", untilDate.toString());
BulkOperationResults<String, TaskanaException> bulkLog = new BulkOperationResults<>(); BulkOperationResults<String, Exception> bulkLog = new BulkOperationResults<>();
try { int attempt = 0;
List<String> tasksIds = new ArrayList<>();
List<TaskSummary> tasksCompleted = taskService.createTaskQuery() List<TaskSummary> tasksCompleted = getTasksCompleted(untilDate);
.completedWithin(new TimeInterval(null, untilDate)) List<String> tasksIds = new ArrayList<>();
.list(); while (tasksCompleted.size() != 0 && attempt < maxRetryOperations) {
tasksCompleted.stream().forEach(task -> { tasksCompleted.stream().forEach(task -> {
tasksIds.add(task.getTaskId()); tasksIds.add(task.getTaskId());
LOGGER.info("task id deleted: {}", task.getTaskId()); LOGGER.info("task id to be deleted: {}", task.getTaskId());
}); });
bulkLog = taskService.deleteTasks(tasksIds); bulkLog = executeTransactionalDeleting(tasksIds);
} catch (Exception e) { attempt = getAttempt(bulkLog, attempt);
taskanaEngine.returnConnection(); tasksCompleted = getTasksCompleted(untilDate);
LOGGER.info("exit from RunCompletedTasks(). Returning result {} ", bulkLog);
} }
LOGGER.info("exit from RunCompletedTasks({})", untilDate.toString());
LOGGER.info("exit from RunCompletedTasks({}). Returning result: " + bulkLog, untilDate.toString());
return bulkLog; return bulkLog;
} }
private List<TaskSummary> getTasksCompleted(Instant untilDate) {
return taskanaService.createTaskQuery()
.completedWithin(new TimeInterval(null, untilDate))
.list(0, batchSize);
}
private BulkOperationResults<String, Exception> executeTransactionalDeleting(List<String> tasksIds) {
if (txProvider == null) {
return doDeleteTasks(tasksIds);
}
return txProvider.executeInTransaction(() -> doDeleteTasks(tasksIds));
}
private BulkOperationResults<String, Exception> doDeleteTasks(List<String> tasksIds) {
if (tasksIds.isEmpty()) {
return new BulkOperationResults<>();
}
try {
return taskanaService.deleteTasks(tasksIds).mapBulkOperationResults();
} catch (InvalidArgumentException e) {
LOGGER.error("could not delete next tasksIds batch: {}, error:" + e.getMessage(),
String.join(",", tasksIds));
return new BulkOperationResults<>();
}
}
private int getAttempt(BulkOperationResults<String, Exception> bulkLog, int attempt) {
if (!bulkLog.getErrorMap().isEmpty()) {
return ++attempt;
}
return 0;
}
} }

View File

@ -28,7 +28,6 @@ import pro.taskana.exceptions.InvalidStateException;
import pro.taskana.exceptions.NotAuthorizedException; import pro.taskana.exceptions.NotAuthorizedException;
import pro.taskana.exceptions.TaskAlreadyExistException; import pro.taskana.exceptions.TaskAlreadyExistException;
import pro.taskana.exceptions.TaskNotFoundException; import pro.taskana.exceptions.TaskNotFoundException;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.exceptions.WorkbasketNotFoundException; import pro.taskana.exceptions.WorkbasketNotFoundException;
import pro.taskana.impl.JobTaskRunner; import pro.taskana.impl.JobTaskRunner;
import pro.taskana.security.JAASRunner; import pro.taskana.security.JAASRunner;
@ -53,14 +52,14 @@ public class JobTaskRunnerAccTest extends AbstractAccTest {
@Test @Test
public void shouldCleanCompletedTasksUntilDate() { public void shouldCleanCompletedTasksUntilDate() {
JobTaskRunner runner = new JobTaskRunner(taskanaEngine, taskService); JobTaskRunner runner = new JobTaskRunner(taskanaEngine);
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN) Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault()) .atZone(ZoneId.systemDefault())
.minusDays(14) .minusDays(14)
.toInstant(); .toInstant();
List<TaskSummary> tasksCompletedUntilDateBefore = getTaskCompletedUntilDate(completeUntilDate); List<TaskSummary> tasksCompletedUntilDateBefore = getTaskCompletedUntilDate(completeUntilDate);
BulkOperationResults<String, TaskanaException> results = runner.runCleanCompletedTasks(completeUntilDate); BulkOperationResults<String, Exception> results = runner.runCleanCompletedTasks(completeUntilDate);
List<TaskSummary> tasksCompletedUntilDateAfter = getTaskCompletedUntilDate(completeUntilDate); List<TaskSummary> tasksCompletedUntilDateAfter = getTaskCompletedUntilDate(completeUntilDate);
assertFalse(results.containsErrors()); assertFalse(results.containsErrors());
@ -84,7 +83,7 @@ public class JobTaskRunnerAccTest extends AbstractAccTest {
taskService.claim(createdTask.getId()); taskService.claim(createdTask.getId());
taskService.completeTask(createdTask.getId()); taskService.completeTask(createdTask.getId());
JobTaskRunner runner = new JobTaskRunner(taskanaEngine, taskService); JobTaskRunner runner = new JobTaskRunner(taskanaEngine);
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN) Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault()) .atZone(ZoneId.systemDefault())
.minusDays(14) .minusDays(14)

View File

@ -17,7 +17,6 @@ import org.springframework.stereotype.Component;
import pro.taskana.BulkOperationResults; import pro.taskana.BulkOperationResults;
import pro.taskana.TaskanaEngine; import pro.taskana.TaskanaEngine;
import pro.taskana.TaskanaTransactionProvider; import pro.taskana.TaskanaTransactionProvider;
import pro.taskana.exceptions.TaskanaException;
import pro.taskana.impl.JobRunner; import pro.taskana.impl.JobRunner;
import pro.taskana.impl.JobTaskRunner; import pro.taskana.impl.JobTaskRunner;
import pro.taskana.impl.util.LoggerUtils; import pro.taskana.impl.util.LoggerUtils;
@ -63,14 +62,15 @@ public class JobScheduler {
@Scheduled(cron = "0 0 0 * * *") @Scheduled(cron = "0 0 0 * * *")
public void triggerTaskCompletedCleanUpJob() { public void triggerTaskCompletedCleanUpJob() {
LOGGER.info("triggerTaskCompletedCleanUpJob"); LOGGER.info("triggerTaskCompletedCleanUpJob");
JobTaskRunner runner = new JobTaskRunner(taskanaEngine, taskanaEngine.getTaskService()); JobTaskRunner runner = new JobTaskRunner(taskanaEngine);
runner.registerTransactionProvider(springTransactionProvider);
Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN) Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN)
.atZone(ZoneId.systemDefault()) .atZone(ZoneId.systemDefault())
.minusDays(untilDays) .minusDays(untilDays)
.toInstant(); .toInstant();
BulkOperationResults<String, TaskanaException> result = runner.runCleanCompletedTasks(completeUntilDate); BulkOperationResults<String, Exception> result = runner.runCleanCompletedTasks(completeUntilDate);
Map<String, TaskanaException> errors = result.getErrorMap(); Map<String, Exception> errors = result.getErrorMap();
LOGGER.info("triggerTaskCompletedCleanUpJob Completed Result = {} ", LoggerUtils.mapToString(errors)); LOGGER.info("triggerTaskCompletedCleanUpJob Completed Result = {} ", LoggerUtils.mapToString(errors));
} }