From f09a31776b45144e00a5cebfecdcd72f10ac598a Mon Sep 17 00:00:00 2001 From: Martin Rojas Miguel Angel Date: Thu, 21 Jun 2018 10:33:14 +0200 Subject: [PATCH] TSK-567 Add transactional support {Part1} --- .../pro/taskana/BulkOperationResults.java | 17 ++++ .../java/pro/taskana/impl/JobTaskRunner.java | 83 ++++++++++++++----- .../acceptance/task/JobTaskRunnerAccTest.java | 7 +- .../java/pro/taskana/rest/JobScheduler.java | 8 +- 4 files changed, 86 insertions(+), 29 deletions(-) diff --git a/lib/taskana-core/src/main/java/pro/taskana/BulkOperationResults.java b/lib/taskana-core/src/main/java/pro/taskana/BulkOperationResults.java index 1373387a7..52dbd531d 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/BulkOperationResults.java +++ b/lib/taskana-core/src/main/java/pro/taskana/BulkOperationResults.java @@ -112,4 +112,21 @@ public class BulkOperationResults { } } } + + /** + * Map from any exception type to Exception. + * + * @return map of errors which can“t be null. + */ + public BulkOperationResults mapBulkOperationResults() { + BulkOperationResults bulkLogMapped = new BulkOperationResults<>(); + + List failedIds = this.getFailedIds(); + for (K id : failedIds) { + bulkLogMapped.addError(id, (Exception) this.getErrorForId(id)); + } + + return bulkLogMapped; + + } } diff --git a/lib/taskana-core/src/main/java/pro/taskana/impl/JobTaskRunner.java b/lib/taskana-core/src/main/java/pro/taskana/impl/JobTaskRunner.java index 5a18626a6..a92510dec 100644 --- a/lib/taskana-core/src/main/java/pro/taskana/impl/JobTaskRunner.java +++ b/lib/taskana-core/src/main/java/pro/taskana/impl/JobTaskRunner.java @@ -8,11 +8,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pro.taskana.BulkOperationResults; -import pro.taskana.TaskService; import pro.taskana.TaskSummary; import pro.taskana.TaskanaEngine; +import pro.taskana.TaskanaTransactionProvider; import pro.taskana.TimeInterval; -import pro.taskana.exceptions.TaskanaException; +import pro.taskana.exceptions.InvalidArgumentException; /** * This is the runner for Tasks jobs. @@ -22,37 +22,78 @@ import pro.taskana.exceptions.TaskanaException; public class JobTaskRunner { private static final Logger LOGGER = LoggerFactory.getLogger(TaskServiceImpl.class); - private TaskanaEngineImpl taskanaEngine; - private TaskServiceImpl taskService; + private TaskServiceImpl taskanaService; + private TaskanaTransactionProvider> txProvider; + private int maxRetryOperations = 3; + private int batchSize = 50; - public JobTaskRunner(TaskanaEngine taskanaEngine, TaskService taskService) { - this.taskanaEngine = (TaskanaEngineImpl) taskanaEngine; - this.taskService = (TaskServiceImpl) taskService; + public JobTaskRunner(TaskanaEngine taskanaEngine) { + this.taskanaService = (TaskServiceImpl) taskanaEngine.getTaskService(); } - public BulkOperationResults runCleanCompletedTasks(Instant untilDate) { + public BulkOperationResults runCleanCompletedTasks(Instant untilDate) { return cleanCompletedTasks(untilDate); } - private BulkOperationResults cleanCompletedTasks(Instant untilDate) { + public void registerTransactionProvider( + TaskanaTransactionProvider> txProvider) { + this.txProvider = txProvider; + } + + private BulkOperationResults cleanCompletedTasks(Instant untilDate) { LOGGER.info("entry to RunCompletedTasks({})", untilDate.toString()); - BulkOperationResults bulkLog = new BulkOperationResults<>(); - try { - List tasksIds = new ArrayList<>(); - List tasksCompleted = taskService.createTaskQuery() - .completedWithin(new TimeInterval(null, untilDate)) - .list(); + BulkOperationResults bulkLog = new BulkOperationResults<>(); + int attempt = 0; + + List tasksCompleted = getTasksCompleted(untilDate); + List tasksIds = new ArrayList<>(); + while (tasksCompleted.size() != 0 && attempt < maxRetryOperations) { tasksCompleted.stream().forEach(task -> { tasksIds.add(task.getTaskId()); - LOGGER.info("task id deleted: {}", task.getTaskId()); + LOGGER.info("task id to be deleted: {}", task.getTaskId()); }); - bulkLog = taskService.deleteTasks(tasksIds); - } catch (Exception e) { - taskanaEngine.returnConnection(); - LOGGER.info("exit from RunCompletedTasks(). Returning result {} ", bulkLog); + bulkLog = executeTransactionalDeleting(tasksIds); + attempt = getAttempt(bulkLog, attempt); + tasksCompleted = getTasksCompleted(untilDate); } - LOGGER.info("exit from RunCompletedTasks({})", untilDate.toString()); + + LOGGER.info("exit from RunCompletedTasks({}). Returning result: " + bulkLog, untilDate.toString()); return bulkLog; } + + private List getTasksCompleted(Instant untilDate) { + return taskanaService.createTaskQuery() + .completedWithin(new TimeInterval(null, untilDate)) + .list(0, batchSize); + } + + private BulkOperationResults executeTransactionalDeleting(List tasksIds) { + if (txProvider == null) { + return doDeleteTasks(tasksIds); + } + return txProvider.executeInTransaction(() -> doDeleteTasks(tasksIds)); + } + + private BulkOperationResults doDeleteTasks(List tasksIds) { + if (tasksIds.isEmpty()) { + return new BulkOperationResults<>(); + } + + try { + return taskanaService.deleteTasks(tasksIds).mapBulkOperationResults(); + } catch (InvalidArgumentException e) { + LOGGER.error("could not delete next tasksIds batch: {}, error:" + e.getMessage(), + String.join(",", tasksIds)); + return new BulkOperationResults<>(); + } + } + + private int getAttempt(BulkOperationResults bulkLog, int attempt) { + if (!bulkLog.getErrorMap().isEmpty()) { + return ++attempt; + } + return 0; + } + } diff --git a/lib/taskana-core/src/test/java/acceptance/task/JobTaskRunnerAccTest.java b/lib/taskana-core/src/test/java/acceptance/task/JobTaskRunnerAccTest.java index a90db8bb5..1725b59b6 100644 --- a/lib/taskana-core/src/test/java/acceptance/task/JobTaskRunnerAccTest.java +++ b/lib/taskana-core/src/test/java/acceptance/task/JobTaskRunnerAccTest.java @@ -28,7 +28,6 @@ import pro.taskana.exceptions.InvalidStateException; import pro.taskana.exceptions.NotAuthorizedException; import pro.taskana.exceptions.TaskAlreadyExistException; import pro.taskana.exceptions.TaskNotFoundException; -import pro.taskana.exceptions.TaskanaException; import pro.taskana.exceptions.WorkbasketNotFoundException; import pro.taskana.impl.JobTaskRunner; import pro.taskana.security.JAASRunner; @@ -53,14 +52,14 @@ public class JobTaskRunnerAccTest extends AbstractAccTest { @Test public void shouldCleanCompletedTasksUntilDate() { - JobTaskRunner runner = new JobTaskRunner(taskanaEngine, taskService); + JobTaskRunner runner = new JobTaskRunner(taskanaEngine); Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN) .atZone(ZoneId.systemDefault()) .minusDays(14) .toInstant(); List tasksCompletedUntilDateBefore = getTaskCompletedUntilDate(completeUntilDate); - BulkOperationResults results = runner.runCleanCompletedTasks(completeUntilDate); + BulkOperationResults results = runner.runCleanCompletedTasks(completeUntilDate); List tasksCompletedUntilDateAfter = getTaskCompletedUntilDate(completeUntilDate); assertFalse(results.containsErrors()); @@ -84,7 +83,7 @@ public class JobTaskRunnerAccTest extends AbstractAccTest { taskService.claim(createdTask.getId()); taskService.completeTask(createdTask.getId()); - JobTaskRunner runner = new JobTaskRunner(taskanaEngine, taskService); + JobTaskRunner runner = new JobTaskRunner(taskanaEngine); Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN) .atZone(ZoneId.systemDefault()) .minusDays(14) diff --git a/rest/taskana-rest-spring-example/src/main/java/pro/taskana/rest/JobScheduler.java b/rest/taskana-rest-spring-example/src/main/java/pro/taskana/rest/JobScheduler.java index 39834fca5..7b2b7f36f 100644 --- a/rest/taskana-rest-spring-example/src/main/java/pro/taskana/rest/JobScheduler.java +++ b/rest/taskana-rest-spring-example/src/main/java/pro/taskana/rest/JobScheduler.java @@ -17,7 +17,6 @@ import org.springframework.stereotype.Component; import pro.taskana.BulkOperationResults; import pro.taskana.TaskanaEngine; import pro.taskana.TaskanaTransactionProvider; -import pro.taskana.exceptions.TaskanaException; import pro.taskana.impl.JobRunner; import pro.taskana.impl.JobTaskRunner; import pro.taskana.impl.util.LoggerUtils; @@ -63,14 +62,15 @@ public class JobScheduler { @Scheduled(cron = "0 0 0 * * *") public void triggerTaskCompletedCleanUpJob() { LOGGER.info("triggerTaskCompletedCleanUpJob"); - JobTaskRunner runner = new JobTaskRunner(taskanaEngine, taskanaEngine.getTaskService()); + JobTaskRunner runner = new JobTaskRunner(taskanaEngine); + runner.registerTransactionProvider(springTransactionProvider); Instant completeUntilDate = LocalDateTime.of(LocalDate.now(), LocalTime.MIN) .atZone(ZoneId.systemDefault()) .minusDays(untilDays) .toInstant(); - BulkOperationResults result = runner.runCleanCompletedTasks(completeUntilDate); - Map errors = result.getErrorMap(); + BulkOperationResults result = runner.runCleanCompletedTasks(completeUntilDate); + Map errors = result.getErrorMap(); LOGGER.info("triggerTaskCompletedCleanUpJob Completed Result = {} ", LoggerUtils.mapToString(errors)); }