Skip to content

Commit

Permalink
Merge pull request #2709 from DaoDaoNoCode/jira-rhoaieng-5349
Browse files Browse the repository at this point in the history
Show all the nodes in pipeline details topology
  • Loading branch information
openshift-merge-bot[bot] authored Apr 12, 2024
2 parents d39f001 + 5d77285 commit a0e5d7f
Showing 1 changed file with 89 additions and 98 deletions.
187 changes: 89 additions & 98 deletions frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PipelineRunKFv2, PipelineSpecVariable } from '~/concepts/pipelines/kfTypes';
import { PipelineRunKFv2, PipelineSpecVariable, TaskKF } from '~/concepts/pipelines/kfTypes';
import { createNode } from '~/concepts/topology';
import { PipelineNodeModelExpanded } from '~/concepts/topology/types';
import { createArtifactNode } from '~/concepts/topology/utils';
Expand All @@ -13,103 +13,104 @@ import {
} from './parseUtils';
import { KubeFlowTaskTopology } from './pipelineTaskTypes';

const EMPTY_STATE: KubeFlowTaskTopology = { taskMap: {}, nodes: [] };

export const usePipelineTaskTopology = (
spec?: PipelineSpecVariable,
run?: PipelineRunKFv2,
): KubeFlowTaskTopology => {
if (!spec) {
return EMPTY_STATE;
return { taskMap: {}, nodes: [] };
}
const pipelineSpec = spec.pipeline_spec ?? spec;

const {
components,
deploymentSpec: { executors },
root: {
dag: { tasks },
dag: { tasks: rootTasks },
},
} = pipelineSpec;
const { run_details: runDetails } = run || {};

const componentArtifactMap = parseComponentsForArtifactRelationship(components);
const taskArtifactMap = parseTasksForArtifactRelationship(tasks);

return Object.entries(tasks).reduce<KubeFlowTaskTopology>((acc, [taskId, taskValue]) => {
const taskName = taskValue.taskInfo.name;

const componentRef = taskValue.componentRef.name;
const component = components[componentRef];
const artifactsInComponent = componentArtifactMap[componentRef];
const isGroupNode = !!component?.dag;

const executorLabel = component?.executorLabel;
const executor = executorLabel ? executors[executorLabel] : undefined;

const status = parseRuntimeInfo(taskId, runDetails);

const newTaskMapEntries: KubeFlowTaskTopology['taskMap'] = {};
const nodes: PipelineNodeModelExpanded[] = [];
const runAfter: string[] = taskValue.dependentTasks ?? [];

if (artifactsInComponent) {
const artifactNodeData = taskArtifactMap[taskId];

Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => {
const label = artifactKey;
const { artifactId } =
artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {};

// if no node needs it as an input, we don't really need a well known id
const id = artifactId ?? artifactKey;

nodes.push(
createArtifactNode({
id,
label,
artifactType: data.schemaTitle,
runAfter: [taskId],
status: translateStatusForNode(status?.state),
}),
const nodes: PipelineNodeModelExpanded[] = [];
const taskMap: KubeFlowTaskTopology['taskMap'] = {};

const createNodes = (tasks: Record<string, TaskKF>, parentTask?: string) => {
const taskArtifactMap = parseTasksForArtifactRelationship(tasks);
Object.entries(tasks).forEach(([taskId, taskValue]) => {
const taskName = taskValue.taskInfo.name;

const componentRef = taskValue.componentRef.name;
const component = components[componentRef];
const artifactsInComponent = componentArtifactMap[componentRef];
const isGroupNode = !!component?.dag;

const executorLabel = component?.executorLabel;
const executor = executorLabel ? executors[executorLabel] : undefined;

const status = parseRuntimeInfo(taskId, runDetails);

const runAfter: string[] = taskValue.dependentTasks ?? [];

if (artifactsInComponent) {
const artifactNodeData = taskArtifactMap[taskId];

Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => {
const label = artifactKey;
const { artifactId } =
artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {};

// if no node needs it as an input, we don't really need a well known id
const id = artifactId ?? artifactKey;

nodes.push(
createArtifactNode({
id,
label,
artifactType: data.schemaTitle,
runAfter: [taskId],
status: translateStatusForNode(status?.state),
}),
);

taskMap[id] = {
type: 'artifact',
name: label,
inputs: {
artifacts: [{ label: id, type: composeArtifactType(data) }],
},
};
});
}

// This task
taskMap[taskId] = {
type: isGroupNode ? 'groupTask' : 'task',
name: taskName,
steps: executor ? [executor.container] : undefined,
inputs: parseInputOutput(component?.inputDefinitions),
outputs: parseInputOutput(component?.outputDefinitions),
status,
volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel),
};
if (taskValue.dependentTasks) {
// This task's runAfters may need artifact relationships -- find those artifactIds
runAfter.push(
...taskValue.dependentTasks
.map((dependantTaskId) => {
const art = taskArtifactMap[dependantTaskId];
return art ? art.map((v) => v.artifactId) : null;
})
.filter((v): v is string[] => !!v)
.flat(),
);
} else if (parentTask) {
// Create an edge from the grouped task to its parent task
// Prevent the node floating on the topology
// This logic could be removed once we have the stacked node to better deal with groups
runAfter.push(parentTask);
}

newTaskMapEntries[id] = {
type: 'artifact',
name: label,
inputs: {
artifacts: [{ label: id, type: composeArtifactType(data) }],
},
};
});
}

// This task
newTaskMapEntries[taskId] = {
type: isGroupNode ? 'groupTask' : 'task',
name: taskName,
steps: executor ? [executor.container] : undefined,
inputs: parseInputOutput(component?.inputDefinitions),
outputs: parseInputOutput(component?.outputDefinitions),
status,
volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel),
};
if (taskValue.dependentTasks) {
// This task's runAfters may need artifact relationships -- find those artifactIds
runAfter.push(
...taskValue.dependentTasks
.map((dependantTaskId) => {
const art = taskArtifactMap[dependantTaskId];
return art ? art.map((v) => v.artifactId) : null;
})
.filter((v): v is string[] => !!v)
.flat(),
);
}

// This task's rendering information
if (isGroupNode) {
// TODO: handle group nodes
nodes.push(
createNode({
id: taskId,
Expand All @@ -118,23 +119,13 @@ export const usePipelineTaskTopology = (
status: translateStatusForNode(status?.state),
}),
);
} else {
nodes.push(
createNode({
id: taskId,
label: taskName,
runAfter,
status: translateStatusForNode(status?.state),
}),
);
}

return {
taskMap: {
...acc.taskMap,
...newTaskMapEntries,
},
nodes: [...acc.nodes, ...nodes],
};
}, EMPTY_STATE);
// This task's rendering information
if (isGroupNode) {
// TODO: better handle group nodes
createNodes(component.dag.tasks, taskId);
}
});
};
createNodes(rootTasks);
return { nodes, taskMap };
};

0 comments on commit a0e5d7f

Please sign in to comment.