Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Combined input/output data if additional payload is added when update…
Browse files Browse the repository at this point in the history
… task/workflow (#3214)
  • Loading branch information
jxu-nflx authored Sep 2, 2022
1 parent 037b8a6 commit 6413a12
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public Map<String, Object> downloadPayload(String path) {
* per {@link ConductorProperties}
*/
public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
if (!shouldUpload(entity, payloadType)) return;

long threshold = 0L;
long maxThreshold = 0L;
Map<String, Object> payload = new HashMap<>();
Expand Down Expand Up @@ -232,4 +234,23 @@ void failWorkflow(WorkflowModel workflow, PayloadType payloadType, String errorM
}
throw new TerminateWorkflowException(errorMsg);
}

@VisibleForTesting
<T> boolean shouldUpload(T entity, PayloadType payloadType) {
if (entity instanceof TaskModel) {
TaskModel taskModel = (TaskModel) entity;
if (payloadType == PayloadType.TASK_INPUT) {
return !taskModel.getRawInputData().isEmpty();
} else {
return !taskModel.getRawOutputData().isEmpty();
}
} else {
WorkflowModel workflowModel = (WorkflowModel) entity;
if (payloadType == PayloadType.WORKFLOW_INPUT) {
return !workflowModel.getRawInput().isEmpty();
} else {
return !workflowModel.getRawOutput().isEmpty();
}
}
}
}
20 changes: 18 additions & 2 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,15 @@ public void setStatus(Status status) {

@JsonIgnore
public Map<String, Object> getInputData() {
return externalInputPayloadStoragePath != null ? inputPayload : inputData;
if (!inputPayload.isEmpty() && !inputData.isEmpty()) {
inputData.putAll(inputPayload);
inputPayload = new HashMap<>();
return inputData;
} else if (inputPayload.isEmpty()) {
return inputData;
} else {
return inputPayload;
}
}

@JsonIgnore
Expand Down Expand Up @@ -389,7 +397,15 @@ public void setWorkerId(String workerId) {

@JsonIgnore
public Map<String, Object> getOutputData() {
return externalOutputPayloadStoragePath != null ? outputPayload : outputData;
if (!outputPayload.isEmpty() && !outputData.isEmpty()) {
outputData.putAll(outputPayload);
outputPayload = new HashMap<>();
return outputData;
} else if (outputPayload.isEmpty()) {
return outputData;
} else {
return outputPayload;
}
}

@JsonIgnore
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,15 @@ public void setTasks(List<TaskModel> tasks) {

@JsonIgnore
public Map<String, Object> getInput() {
return externalInputPayloadStoragePath != null ? inputPayload : input;
if (!inputPayload.isEmpty() && !input.isEmpty()) {
input.putAll(inputPayload);
inputPayload = new HashMap<>();
return input;
} else if (inputPayload.isEmpty()) {
return input;
} else {
return inputPayload;
}
}

@JsonIgnore
Expand All @@ -191,7 +199,15 @@ public void setInput(Map<String, Object> input) {

@JsonIgnore
public Map<String, Object> getOutput() {
return externalOutputPayloadStoragePath != null ? outputPayload : output;
if (!outputPayload.isEmpty() && !output.isEmpty()) {
output.putAll(outputPayload);
outputPayload = new HashMap<>();
return output;
} else if (outputPayload.isEmpty()) {
return output;
} else {
return outputPayload;
}
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,32 @@ public void testFailWorkflowWithOutputPayload() {
assertTrue(workflow.getOutput().isEmpty());
assertEquals(WorkflowModel.Status.FAILED, workflow.getStatus());
}

@Test
public void testShouldUpload() {
Map<String, Object> payload = new HashMap<>();
payload.put("key1", "value1");
payload.put("key2", "value2");

TaskModel task = new TaskModel();
task.setInputData(payload);
task.setOutputData(payload);

WorkflowModel workflow = new WorkflowModel();
workflow.setInput(payload);
workflow.setOutput(payload);

assertTrue(
externalPayloadStorageUtils.shouldUpload(
task, ExternalPayloadStorage.PayloadType.TASK_INPUT));
assertTrue(
externalPayloadStorageUtils.shouldUpload(
task, ExternalPayloadStorage.PayloadType.TASK_OUTPUT));
assertTrue(
externalPayloadStorageUtils.shouldUpload(
task, ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT));
assertTrue(
externalPayloadStorageUtils.shouldUpload(
task, ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.netflix.conductor.test.utils.UserTask

import spock.lang.Shared

import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPayload
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedLargePayloadTask
import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask

Expand Down Expand Up @@ -754,6 +755,103 @@ class ExternalPayloadStorageSpec extends AbstractSpecification {
}
}

def "Test update task output multiple times using external payload storage"() {
given: "An existing simple workflow definition"
metadataService.getWorkflowDef(LINEAR_WORKFLOW_T1_T2, 1)

when: "the workflow is started"
def workflowInstanceId = workflowExecutor.startWorkflow(LINEAR_WORKFLOW_T1_T2, 1, 'multi_task_update_external_storage', new HashMap<String, Object>(), null, null, null)

then: "verify that the workflow is in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
}

when: "poll and update 'integration_task_1' with external payload storage output"
String taskOutputPath = uploadLargeTaskOutput()
workflowTestUtil.pollAndUpdateTask('integration_task_1', 'task1.integration.worker', taskOutputPath, null, 1)

then: "verify that 'integration_task1's output is updated correctly"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
tasks[0].outputData.isEmpty()
tasks[0].externalOutputPayloadStoragePath == taskOutputPath
}

when: "poll and update 'integration_task_1' with no additional output"
workflowTestUtil.pollAndUpdateTask('integration_task_1', 'task1.integration.worker', null, null, 1)

then: "verify that 'integration_task1's output is updated correctly"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.SCHEDULED
tasks[0].outputData.isEmpty()
// no duplicate upload
tasks[0].externalOutputPayloadStoragePath == taskOutputPath
}

when: "poll and complete 'integration_task_1' with additional output"
Map<String, Object> output = ['k1': 'v1', 'k2': 'v2']
workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker', output, 1)

then: "verify that 'integration_task1 is complete and output is updated correctly"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 2
tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
tasks[0].outputData.isEmpty()
// upload again with additional output
tasks[0].externalOutputPayloadStoragePath != taskOutputPath
verifyPayload(output, mockExternalPayloadStorage.downloadPayload(tasks[0].externalOutputPayloadStoragePath))

tasks[1].taskType == 'integration_task_2'
tasks[1].status == Task.Status.SCHEDULED
}

when: "poll and update 'integration_task_2' with output"
Map<String, Object> output1 = ['k1': 'v1', 'k2': 'v2']
workflowTestUtil.pollAndUpdateTask('integration_task_2', 'task1.integration.worker', null, output1, 1)

then: "verify that 'integration_task2's output is updated correctly"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 2
tasks[1].taskType == 'integration_task_2'
tasks[1].status == Task.Status.SCHEDULED
tasks[1].externalOutputPayloadStoragePath == null
verifyPayload(output1, tasks[1].outputData)
}

when: "poll and complete 'integration_task_2' with additional output"
Map<String, Object> output2 = ['k3': 'v3', 'k4': 'v4']
workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task1.integration.worker', output2, 1)

then: "verify that 'integration_task2 is complete and output is updated correctly"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 2

tasks[0].taskType == 'integration_task_1'
tasks[0].status == Task.Status.COMPLETED
tasks[0].outputData.isEmpty()

tasks[1].taskType == 'integration_task_2'
tasks[1].status == Task.Status.COMPLETED
tasks[1].externalOutputPayloadStoragePath == null
output1.putAll(output2)
verifyPayload(output1, tasks[1].outputData)
}
}

private String uploadLargeTaskOutput() {
String taskOutputPath = "${UUID.randomUUID()}.json"
mockExternalPayloadStorage.upload(taskOutputPath, mockExternalPayloadStorage.readOutputDotJson(), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,23 @@ class WorkflowTestUtil {
return new Tuple(polledIntegrationTask)
}

Tuple pollAndUpdateTask(String taskName, String workerId, String outputPayloadPath, Map<String, Object> outputParams = null, int waitAtEndSeconds = 0) {
def polledIntegrationTask = workflowExecutionService.poll(taskName, workerId)
def taskResult = new TaskResult(polledIntegrationTask)
taskResult.status = TaskResult.Status.IN_PROGRESS
taskResult.callbackAfterSeconds = 1
if (outputPayloadPath) {
taskResult.outputData = null
taskResult.externalOutputPayloadStoragePath = outputPayloadPath
} else if (outputParams) {
outputParams.forEach { k, v ->
taskResult.outputData[k] = v
}
}
workflowExecutionService.updateTask(taskResult)
return waitAtEndSecondsAndReturn(waitAtEndSeconds, polledIntegrationTask)
}

/**
* A helper method intended to be used in the <tt>then:</tt> block of the spock test feature, ideally intended to be called after either:
* pollAndCompleteTask function or pollAndFailTask function
Expand All @@ -322,4 +339,12 @@ class WorkflowTestUtil {
def polledIntegrationTask = completedTaskAndAck[0] as Task
assert polledIntegrationTask
}

static void verifyPayload(Map<String, Object> expected, Map<String, Object> payload) {
expected.forEach {
k, v ->
assert payload.containsKey(k)
assert payload[k] == v
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,17 @@ public Map<String, Object> curateDynamicForkLargePayload() {
}
return dynamicForkLargePayload;
}

public Map<String, Object> downloadPayload(String path) {
InputStream inputStream = download(path);
if (inputStream != null) {
try {
Map<String, Object> largePayload = objectMapper.readValue(inputStream, Map.class);
return largePayload;
} catch (IOException e) {
LOGGER.error("Error in downloading payload for path {}", path, e);
}
}
return new HashMap<>();
}
}

0 comments on commit 6413a12

Please sign in to comment.