diff --git a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts index 0536c80cb1..11b8506480 100644 --- a/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts +++ b/frontend/src/concepts/pipelines/topology/usePipelineTaskTopology.ts @@ -1,4 +1,4 @@ -import { PipelineRunKFv2, PipelineSpecVariable } from '~/concepts/pipelines/kfTypes'; +import { PipelineRunKFv2, PipelineSpecVariable, TaskKF } from '~/concepts/pipelines/kfTypes'; import { createNode } from '~/concepts/topology'; import { PipelineNodeModelExpanded } from '~/concepts/topology/types'; import { createArtifactNode } from '~/concepts/topology/utils'; @@ -13,14 +13,12 @@ import { } from './parseUtils'; import { KubeFlowTaskTopology } from './pipelineTaskTypes'; -const EMPTY_STATE: KubeFlowTaskTopology = { taskMap: {}, nodes: [] }; - export const usePipelineTaskTopology = ( spec?: PipelineSpecVariable, run?: PipelineRunKFv2, ): KubeFlowTaskTopology => { if (!spec) { - return EMPTY_STATE; + return { taskMap: {}, nodes: [] }; } const pipelineSpec = spec.pipeline_spec ?? spec; @@ -28,88 +26,91 @@ export const usePipelineTaskTopology = ( components, deploymentSpec: { executors }, root: { - dag: { tasks }, + dag: { tasks: rootTasks }, }, } = pipelineSpec; const { run_details: runDetails } = run || {}; const componentArtifactMap = parseComponentsForArtifactRelationship(components); - const taskArtifactMap = parseTasksForArtifactRelationship(tasks); - - return Object.entries(tasks).reduce((acc, [taskId, taskValue]) => { - const taskName = taskValue.taskInfo.name; - - const componentRef = taskValue.componentRef.name; - const component = components[componentRef]; - const artifactsInComponent = componentArtifactMap[componentRef]; - const isGroupNode = !!component?.dag; - - const executorLabel = component?.executorLabel; - const executor = executorLabel ? executors[executorLabel] : undefined; - - const status = parseRuntimeInfo(taskId, runDetails); - - const newTaskMapEntries: KubeFlowTaskTopology['taskMap'] = {}; - const nodes: PipelineNodeModelExpanded[] = []; - const runAfter: string[] = taskValue.dependentTasks ?? []; - - if (artifactsInComponent) { - const artifactNodeData = taskArtifactMap[taskId]; - - Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => { - const label = artifactKey; - const { artifactId } = - artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {}; - - // if no node needs it as an input, we don't really need a well known id - const id = artifactId ?? artifactKey; - - nodes.push( - createArtifactNode({ - id, - label, - artifactType: data.schemaTitle, - runAfter: [taskId], - status: translateStatusForNode(status?.state), - }), + const nodes: PipelineNodeModelExpanded[] = []; + const taskMap: KubeFlowTaskTopology['taskMap'] = {}; + + const createNodes = (tasks: Record, parentTask?: string) => { + const taskArtifactMap = parseTasksForArtifactRelationship(tasks); + Object.entries(tasks).forEach(([taskId, taskValue]) => { + const taskName = taskValue.taskInfo.name; + + const componentRef = taskValue.componentRef.name; + const component = components[componentRef]; + const artifactsInComponent = componentArtifactMap[componentRef]; + const isGroupNode = !!component?.dag; + + const executorLabel = component?.executorLabel; + const executor = executorLabel ? executors[executorLabel] : undefined; + + const status = parseRuntimeInfo(taskId, runDetails); + + const runAfter: string[] = taskValue.dependentTasks ?? []; + + if (artifactsInComponent) { + const artifactNodeData = taskArtifactMap[taskId]; + + Object.entries(artifactsInComponent).forEach(([artifactKey, data]) => { + const label = artifactKey; + const { artifactId } = + artifactNodeData?.find((a) => artifactKey === a.outputArtifactKey) ?? {}; + + // if no node needs it as an input, we don't really need a well known id + const id = artifactId ?? artifactKey; + + nodes.push( + createArtifactNode({ + id, + label, + artifactType: data.schemaTitle, + runAfter: [taskId], + status: translateStatusForNode(status?.state), + }), + ); + + taskMap[id] = { + type: 'artifact', + name: label, + inputs: { + artifacts: [{ label: id, type: composeArtifactType(data) }], + }, + }; + }); + } + + // This task + taskMap[taskId] = { + type: isGroupNode ? 'groupTask' : 'task', + name: taskName, + steps: executor ? [executor.container] : undefined, + inputs: parseInputOutput(component?.inputDefinitions), + outputs: parseInputOutput(component?.outputDefinitions), + status, + volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel), + }; + if (taskValue.dependentTasks) { + // This task's runAfters may need artifact relationships -- find those artifactIds + runAfter.push( + ...taskValue.dependentTasks + .map((dependantTaskId) => { + const art = taskArtifactMap[dependantTaskId]; + return art ? art.map((v) => v.artifactId) : null; + }) + .filter((v): v is string[] => !!v) + .flat(), ); + } else if (parentTask) { + // Create an edge from the grouped task to its parent task + // Prevent the node floating on the topology + // This logic could be removed once we have the stacked node to better deal with groups + runAfter.push(parentTask); + } - newTaskMapEntries[id] = { - type: 'artifact', - name: label, - inputs: { - artifacts: [{ label: id, type: composeArtifactType(data) }], - }, - }; - }); - } - - // This task - newTaskMapEntries[taskId] = { - type: isGroupNode ? 'groupTask' : 'task', - name: taskName, - steps: executor ? [executor.container] : undefined, - inputs: parseInputOutput(component?.inputDefinitions), - outputs: parseInputOutput(component?.outputDefinitions), - status, - volumeMounts: parseVolumeMounts(spec.platform_spec, executorLabel), - }; - if (taskValue.dependentTasks) { - // This task's runAfters may need artifact relationships -- find those artifactIds - runAfter.push( - ...taskValue.dependentTasks - .map((dependantTaskId) => { - const art = taskArtifactMap[dependantTaskId]; - return art ? art.map((v) => v.artifactId) : null; - }) - .filter((v): v is string[] => !!v) - .flat(), - ); - } - - // This task's rendering information - if (isGroupNode) { - // TODO: handle group nodes nodes.push( createNode({ id: taskId, @@ -118,23 +119,13 @@ export const usePipelineTaskTopology = ( status: translateStatusForNode(status?.state), }), ); - } else { - nodes.push( - createNode({ - id: taskId, - label: taskName, - runAfter, - status: translateStatusForNode(status?.state), - }), - ); - } - - return { - taskMap: { - ...acc.taskMap, - ...newTaskMapEntries, - }, - nodes: [...acc.nodes, ...nodes], - }; - }, EMPTY_STATE); + // This task's rendering information + if (isGroupNode) { + // TODO: better handle group nodes + createNodes(component.dag.tasks, taskId); + } + }); + }; + createNodes(rootTasks); + return { nodes, taskMap }; };