Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Performance improvements in Decider (#3070)
Browse files Browse the repository at this point in the history
* decider performance improvements

* ignore tests - temp

* Update WorkflowExecutor.java

* formatting changes

* fix merge conflicts

* apply formatting

* updates based on the PR comments

* change the visibility of the method to package private

* Update WorkflowExecutor.java

* remove legacy code

* enable tests
  • Loading branch information
v1r3n authored Aug 14, 2022
1 parent af4be6a commit 2d51469
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 242 deletions.
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ dependencies {
testImplementation "org.codehaus.groovy:groovy-all:${revGroovy}"
testImplementation "org.spockframework:spock-core:${revSpock}"
testImplementation "org.spockframework:spock-spring:${revSpock}"
testImplementation "org.junit.vintage:junit-vintage-engine"
}
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public List<TaskExecLog> getTaskExecutionLogs(String taskId) {
* @param workflowModel the workflowModel for which the payload data needs to be populated from
* external storage (if applicable)
*/
private void populateWorkflowAndTaskPayloadData(WorkflowModel workflowModel) {
public void populateWorkflowAndTaskPayloadData(WorkflowModel workflowModel) {
if (StringUtils.isNotBlank(workflowModel.getExternalInputPayloadStoragePath())) {
Map<String, Object> workflowInputParams =
externalPayloadStorageUtils.downloadPayload(
Expand All @@ -662,7 +662,7 @@ private void populateWorkflowAndTaskPayloadData(WorkflowModel workflowModel) {
workflowModel.getTasks().forEach(this::populateTaskData);
}

private void populateTaskData(TaskModel taskModel) {
public void populateTaskData(TaskModel taskModel) {
if (StringUtils.isNotBlank(taskModel.getExternalOutputPayloadStoragePath())) {
Map<String, Object> outputData =
externalPayloadStorageUtils.downloadPayload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.time.Duration;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -68,18 +67,6 @@ public class DeciderService {

private final Map<TaskType, TaskMapper> taskMappers;

private final Predicate<TaskModel> isNonPendingTask =
task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted();

private final Predicate<WorkflowModel> containsSuccessfulTerminateTask =
workflow ->
workflow.getTasks().stream()
.anyMatch(
task ->
TERMINATE.name().equals(task.getTaskType())
&& task.getStatus().isTerminal()
&& task.getStatus().isSuccessful());

public DeciderService(
IDGenerator idGenerator,
ParametersUtils parametersUtils,
Expand Down Expand Up @@ -142,19 +129,31 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
return outcome;
}

// Filter the list of tasks and include only tasks that are not retried, not executed
// marked to be skipped and not part of System tasks that is DECISION, FORK, JOIN
// This list will be empty for a new workflow being started
List<TaskModel> pendingTasks =
workflow.getTasks().stream().filter(isNonPendingTask).collect(Collectors.toList());
List<TaskModel> pendingTasks = new ArrayList<>();
Set<String> executedTaskRefNames = new HashSet<>();
boolean hasSuccessfulTerminateTask = false;
for (TaskModel task : workflow.getTasks()) {

// Get all the tasks that have not completed their lifecycle yet
// This list will be empty for a new workflow
Set<String> executedTaskRefNames =
workflow.getTasks().stream()
.filter(TaskModel::isExecuted)
.map(TaskModel::getReferenceTaskName)
.collect(Collectors.toSet());
// Filter the list of tasks and include only tasks that are not retried, not executed
// marked to be skipped and not part of System tasks that is DECISION, FORK, JOIN
// This list will be empty for a new workflow being started
if (!task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted()) {
pendingTasks.add(task);
}

// Get all the tasks that have not completed their lifecycle yet
// This list will be empty for a new workflow
if (task.isExecuted()) {
executedTaskRefNames.add(task.getReferenceTaskName());
}

if (TERMINATE.name().equals(task.getTaskType())
&& task.getStatus().isTerminal()
&& task.getStatus().isSuccessful()) {
hasSuccessfulTerminateTask = true;
outcome.terminateTask = task;
}
}

Map<String, TaskModel> tasksToBeScheduled = new LinkedHashMap<>();

Expand Down Expand Up @@ -251,7 +250,7 @@ private DeciderOutcome decide(final WorkflowModel workflow, List<TaskModel> preS
workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
if (containsSuccessfulTerminateTask.test(workflow)
if (hasSuccessfulTerminateTask
|| (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow))) {
LOGGER.debug("Marking workflow: {} as complete.", workflow);
outcome.isComplete = true;
Expand Down Expand Up @@ -404,45 +403,56 @@ void updateWorkflowOutput(final WorkflowModel workflow, TaskModel task) {

public boolean checkForWorkflowCompletion(final WorkflowModel workflow)
throws TerminateWorkflowException {
List<TaskModel> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
return false;
}

if (containsSuccessfulTerminateTask.test(workflow)) {
return true;
Map<String, TaskModel.Status> taskStatusMap = new HashMap<>();
List<TaskModel> nonExecutedTasks = new ArrayList<>();
for (TaskModel task : workflow.getTasks()) {
taskStatusMap.put(task.getReferenceTaskName(), task.getStatus());
if (!task.getStatus().isTerminal()) {
return false;
}

// If there is a TERMINATE task that has been executed successfuly then the workflow
// should be marked as completed.
if (TERMINATE.name().equals(task.getTaskType())
&& task.getStatus().isTerminal()
&& task.getStatus().isSuccessful()) {
return true;
}
if (!task.isRetried() || !task.isExecuted()) {
nonExecutedTasks.add(task);
}
}

Map<String, TaskModel.Status> taskStatusMap = new HashMap<>();
workflow.getTasks()
.forEach(task -> taskStatusMap.put(task.getReferenceTaskName(), task.getStatus()));
// If there are no tasks executed, then we are not done yet
if (taskStatusMap.isEmpty()) {
return false;
}

List<WorkflowTask> workflowTasks = workflow.getWorkflowDefinition().getTasks();
boolean allCompletedSuccessfully =
workflowTasks.stream()
.parallel()
.allMatch(
wftask -> {
TaskModel.Status status =
taskStatusMap.get(wftask.getTaskReferenceName());
return status != null
&& status.isSuccessful()
&& status.isTerminal();
});

boolean noPendingTasks =
taskStatusMap.values().stream().allMatch(TaskModel.Status::isTerminal);
for (WorkflowTask wftask : workflowTasks) {
TaskModel.Status status = taskStatusMap.get(wftask.getTaskReferenceName());
if (status == null || !status.isTerminal()) {
return false;
}
// if we reach here, the task has been completed.
// Was the task successful in completion?
if (!status.isSuccessful()) {
return false;
}
}

boolean noPendingSchedule =
workflow.getTasks().stream()
nonExecutedTasks.stream()
.parallel()
.noneMatch(
wftask -> {
String next = getNextTasksToBeScheduled(workflow, wftask);
return next != null && !taskStatusMap.containsKey(next);
});

return allCompletedSuccessfully && noPendingTasks && noPendingSchedule;
return noPendingSchedule;
}

List<TaskModel> getNextTask(WorkflowModel workflow, TaskModel task) {
Expand Down Expand Up @@ -884,6 +894,7 @@ public static class DeciderOutcome {
List<TaskModel> tasksToBeScheduled = new LinkedList<>();
List<TaskModel> tasksToBeUpdated = new LinkedList<>();
boolean isComplete;
TaskModel terminateTask;

private DeciderOutcome() {}
}
Expand Down
Loading

0 comments on commit 2d51469

Please sign in to comment.