From 42c4889d3634e8d15a99bbba772db2ce4d47472c Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 1 Feb 2024 16:09:23 +0100 Subject: [PATCH] Refactor pipeline filter and add to public API (#405) --- docs/docs/user/references/cli-commands.md | 12 +- kpops/cli/main.py | 220 ++++++++---------- kpops/cli/options.py | 6 + .../base_components/pipeline_component.py | 2 + kpops/pipeline.py | 182 +++++++++------ tests/cli/test_pipeline_steps.py | 87 ------- tests/pipeline/test_generate.py | 77 ++++-- tests/pipeline/test_pipeline.py | 75 ++++++ 8 files changed, 359 insertions(+), 302 deletions(-) create mode 100644 kpops/cli/options.py delete mode 100644 tests/cli/test_pipeline_steps.py create mode 100644 tests/pipeline/test_pipeline.py diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index de8a7febe..bfb9108c4 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -43,7 +43,7 @@ $ kpops clean [OPTIONS] PIPELINE_PATH * `--defaults DIRECTORY`: Path to defaults folder [env var: KPOPS_DEFAULT_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] -* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: include] +* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] @@ -70,7 +70,7 @@ $ kpops deploy [OPTIONS] PIPELINE_PATH * `--defaults DIRECTORY`: Path to defaults folder [env var: KPOPS_DEFAULT_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] -* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: include] +* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] @@ -97,7 +97,7 @@ $ kpops destroy [OPTIONS] PIPELINE_PATH * `--defaults DIRECTORY`: Path to defaults folder [env var: KPOPS_DEFAULT_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] -* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: include] +* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] @@ -124,6 +124,8 @@ $ kpops generate [OPTIONS] PIPELINE_PATH * `--defaults DIRECTORY`: Path to defaults folder [env var: KPOPS_DEFAULT_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] * `--output / --no-output`: Enable output printing [default: output] +* `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] +* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] * `--help`: Show this message and exit. @@ -149,7 +151,7 @@ $ kpops manifest [OPTIONS] PIPELINE_PATH * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] * `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] -* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: include] +* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] * `--help`: Show this message and exit. @@ -174,7 +176,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATH * `--defaults DIRECTORY`: Path to defaults folder [env var: KPOPS_DEFAULT_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] -* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: include] +* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 3a3936226..bb00b0b3c 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -2,7 +2,6 @@ import asyncio import logging -from enum import Enum from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -11,6 +10,7 @@ from kpops import __version__ from kpops.cli.custom_formatter import CustomFormatter +from kpops.cli.options import FilterType from kpops.cli.registry import Registry from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( @@ -21,7 +21,7 @@ from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.models.resource import Resource from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.pipeline import Pipeline, PipelineGenerator +from kpops.pipeline import ComponentFilterPredicate, Pipeline, PipelineGenerator from kpops.utils.gen_schema import ( SchemaScope, gen_config_schema, @@ -32,8 +32,6 @@ from kpops.utils.yaml import print_yaml if TYPE_CHECKING: - from collections.abc import Awaitable, Callable, Coroutine, Iterator - from kpops.components.base_components import PipelineComponent @@ -102,13 +100,8 @@ ) -class FilterType(str, Enum): - INCLUDE = "include" - EXCLUDE = "exclude" - - FILTER_TYPE: FilterType = typer.Option( - default=FilterType.INCLUDE.value, + default=FilterType.INCLUDE, case_sensitive=False, help="Whether the --steps option should include/exclude the steps", ) @@ -166,64 +159,21 @@ def parse_steps(steps: str) -> set[str]: return set(steps.split(",")) -def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: - return [step.name for step in steps_to_apply] +def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: + return component.name in component_names -def filter_steps_to_apply( - pipeline: Pipeline, steps: set[str], filter_type: FilterType -) -> list[PipelineComponent]: - def is_in_steps(component: PipelineComponent) -> bool: - return component.name in steps - - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {steps} and filter type of {filter_type.value}" - ) - filtered_steps = [ - component - for component in pipeline - if ( - is_in_steps(component) - if filter_type is FilterType.INCLUDE - else not is_in_steps(component) - ) - ] - log.info(f"The following steps are included:\n{get_step_names(filtered_steps)}") - return filtered_steps - - -def get_reverse_concurrently_tasks_to_execute( - pipeline: Pipeline, - steps: str | None, - filter_type: FilterType, - runner: Callable[[PipelineComponent], Coroutine], -) -> Awaitable: - steps_to_apply = reverse_pipeline_steps(pipeline, steps, filter_type) - return pipeline.build_execution_graph_from(list(steps_to_apply), True, runner) - - -def get_concurrently_tasks_to_execute( - pipeline: Pipeline, - steps: str | None, - filter_type: FilterType, - runner: Callable[[PipelineComponent], Coroutine], -) -> Awaitable: - steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) - return pipeline.build_execution_graph_from(steps_to_apply, False, runner) - - -def get_steps_to_apply( - pipeline: Pipeline, steps: str | None, filter_type: FilterType -) -> list[PipelineComponent]: - if steps: - return filter_steps_to_apply(pipeline, parse_steps(steps), filter_type) - return list(pipeline) +def create_default_step_names_filter_predicate( + component_names: set[str], filter_type: FilterType +) -> ComponentFilterPredicate: + def predicate(component: PipelineComponent) -> bool: + match filter_type, is_in_steps(component, component_names): + case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): + return False + case _: + return True - -def reverse_pipeline_steps( - pipeline: Pipeline, steps: str | None, filter_type: FilterType -) -> Iterator[PipelineComponent]: - return reversed(get_steps_to_apply(pipeline, steps, filter_type)) + return predicate def log_action(action: str, pipeline_component: PipelineComponent): @@ -302,6 +252,8 @@ def generate( defaults: Optional[Path] = DEFAULT_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, output: bool = OUTPUT_OPTION, + steps: Optional[str] = PIPELINE_STEPS, + filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ) -> Pipeline: @@ -314,6 +266,22 @@ def generate( ) pipeline = setup_pipeline(pipeline_path, kpops_config, environment) + + if steps: + component_names = parse_steps(steps) + log.debug( + f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" + ) + + predicate = create_default_step_names_filter_predicate( + component_names, filter_type + ) + pipeline.filter(predicate) + + def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: + return [step.name for step in steps_to_apply] + + log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") if output: print_yaml(pipeline.to_yaml()) return pipeline @@ -340,12 +308,13 @@ def manifest( defaults=defaults, config=config, output=False, + steps=steps, + filter_type=filter_type, environment=environment, verbose=verbose, ) - steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) resources: list[Resource] = [] - for component in steps_to_apply: + for component in pipeline.components: resource = component.manifest() resources.append(resource) if output: @@ -367,28 +336,28 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): + pipeline = generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + defaults=defaults, + config=config, + output=False, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + async def deploy_runner(component: PipelineComponent): log_action("Deploy", component) await component.deploy(dry_run) async def async_deploy(): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - if parallel: - pipeline_tasks = get_concurrently_tasks_to_execute( - pipeline, steps, filter_type, deploy_runner - ) + pipeline_tasks = pipeline.build_execution_graph(deploy_runner) await pipeline_tasks else: - steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) - for component in steps_to_apply: + for component in pipeline.components: await deploy_runner(component) asyncio.run(async_deploy()) @@ -407,29 +376,30 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): + pipeline = generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + defaults=defaults, + config=config, + output=False, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + async def destroy_runner(component: PipelineComponent): log_action("Destroy", component) await component.destroy(dry_run) async def async_destroy(): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - if parallel: - pipeline_tasks = get_reverse_concurrently_tasks_to_execute( - pipeline, steps, filter_type, destroy_runner + pipeline_tasks = pipeline.build_execution_graph( + destroy_runner, reverse=True ) await pipeline_tasks else: - pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) - for component in pipeline_steps: + for component in reversed(pipeline.components): await destroy_runner(component) asyncio.run(async_destroy()) @@ -448,28 +418,29 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): + pipeline = generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + defaults=defaults, + config=config, + output=False, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + async def reset_runner(component: PipelineComponent): - log_action("Reset", component) await component.destroy(dry_run) + log_action("Reset", component) await component.reset(dry_run) async def async_reset(): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) if parallel: - pipeline_tasks = get_reverse_concurrently_tasks_to_execute( - pipeline, steps, filter_type, reset_runner - ) + pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) await pipeline_tasks else: - pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) - for component in pipeline_steps: + for component in pipeline.components: await reset_runner(component) asyncio.run(async_reset()) @@ -488,28 +459,29 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): + pipeline = generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + defaults=defaults, + config=config, + output=False, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + async def clean_runner(component: PipelineComponent): - log_action("Clean", component) await component.destroy(dry_run) + log_action("Clean", component) await component.clean(dry_run) async def async_clean(): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) if parallel: - pipeline_steps = get_reverse_concurrently_tasks_to_execute( - pipeline, steps, filter_type, clean_runner - ) - await pipeline_steps + pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) + await pipeline_tasks else: - pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) - for component in pipeline_steps: + for component in pipeline.components: await clean_runner(component) asyncio.run(async_clean()) diff --git a/kpops/cli/options.py b/kpops/cli/options.py new file mode 100644 index 000000000..ac176d986 --- /dev/null +++ b/kpops/cli/options.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class FilterType(str, Enum): + INCLUDE = "include" + EXCLUDE = "exclude" diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 44ef61394..9b1697bcf 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -86,6 +86,8 @@ def extra_output_topics(self) -> dict[str, str]: @property def id(self) -> str: + # TODO: remove "component-" prefix, prefix topics instead + # ideally return just self.name return f"component-{self.full_name}" @property diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 06de8d222..c6fe013a4 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -3,13 +3,13 @@ import asyncio import json import logging -from collections import Counter +from collections.abc import Callable, Iterable from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypeAlias import networkx as nx import yaml -from pydantic import BaseModel, Field, SerializeAsAny +from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, computed_field from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.dict_ops import generate_substitution, update_nested_pair @@ -18,7 +18,7 @@ from kpops.utils.yaml import load_yaml_file, substitute_nested if TYPE_CHECKING: - from collections.abc import Awaitable, Callable, Coroutine, Iterator + from collections.abc import Awaitable, Coroutine, Iterator from pathlib import Path from kpops.cli.registry import Registry @@ -36,64 +36,74 @@ class ValidationError(Exception): pass +ComponentFilterPredicate: TypeAlias = Callable[[PipelineComponent], bool] + + class Pipeline(BaseModel): """Pipeline representation.""" - components: list[SerializeAsAny[PipelineComponent]] = Field( - default=[], title="Components" - ) graph: nx.DiGraph = Field(default_factory=nx.DiGraph, exclude=True) - _component_index: dict[str, PipelineComponent | None] = {} + _component_index: dict[str, PipelineComponent] = {} - class Config: - arbitrary_types_allowed = True + model_config = ConfigDict(arbitrary_types_allowed=True) + + @computed_field(title="Components") + @property + def components(self) -> list[SerializeAsAny[PipelineComponent]]: + return list(self._component_index.values()) @property def last(self) -> PipelineComponent: return self.components[-1] - def find(self, component_name: str) -> PipelineComponent: - for component in self.components: - if component_name == component.name: - return component - msg = f"Component {component_name} not found" - raise ValueError(msg) - - def __add_to_graph(self, component: PipelineComponent): + def add(self, component: PipelineComponent) -> None: + if self._component_index.get(component.id) is not None: + msg = ( + f"Pipeline steps must have unique id, '{component.id}' already exists." + ) + raise ValidationError(msg) self._component_index[component.id] = component - self.graph.add_node(component.id) + self.__add_to_graph(component) - for input_topic in component.inputs: - self.__add_input(input_topic, component.id) + def remove(self, component_id: str) -> None: + self._component_index.pop(component_id) - for output_topic in component.outputs: - self.__add_output(output_topic, component.id) + def get(self, component_id: str) -> PipelineComponent | None: + self._component_index.get(component_id) - def add(self, component: PipelineComponent) -> None: - self.components.append(component) - self.__add_to_graph(component) + def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]: + """Find pipeline components matching a custom predicate. - def __bool__(self) -> bool: - return bool(self.components) + :param predicate: Filter function, + returns boolean value whether the component should be kept or removed + :returns: Iterator of components matching the predicate + """ + for component in self.components: + if predicate(component): + yield component - def __iter__(self) -> Iterator[PipelineComponent]: - return iter(self.components) + def filter(self, predicate: ComponentFilterPredicate) -> None: + """Filter pipeline components using a custom predicate. - def __len__(self) -> int: - return len(self.components) + :param predicate: Filter function, + returns boolean value whether the component should be kept or removed + """ + for component in self.components: + # filter out components not matching the predicate + if not predicate(component): + self.remove(component.id) def to_yaml(self) -> str: return yaml.dump( self.model_dump(mode="json", by_alias=True, exclude_none=True)["components"] ) - def build_execution_graph_from( - self, - components: list[PipelineComponent], - reverse: bool, - runner: Callable[[PipelineComponent], Coroutine], + def build_execution_graph( + self, runner: Callable[[PipelineComponent], Coroutine], /, reverse: bool = False ) -> Awaitable: - sub_graph_nodes = self.__get_graph_nodes(components) + sub_graph_nodes = self.__collect_graph_nodes( + reversed(self.components) if reverse else self.components + ) async def run_parallel_tasks(coroutines: list[Coroutine]) -> None: tasks = [] @@ -134,8 +144,33 @@ async def run_graph_tasks(pending_tasks: list[Awaitable]): return run_graph_tasks(sorted_tasks) + def __getitem__(self, component_id: str) -> PipelineComponent: + try: + return self._component_index[component_id] + except KeyError as exc: + msg = f"Component {component_id} not found" + raise ValueError(msg) from exc + + def __bool__(self) -> bool: + return bool(self._component_index) + + def __iter__(self) -> Iterator[PipelineComponent]: + yield from self._component_index.values() + + def __len__(self) -> int: + return len(self.components) + + def __add_to_graph(self, component: PipelineComponent): + self.graph.add_node(component.id) + + for input_topic in component.inputs: + self.__add_input(input_topic, component.id) + + for output_topic in component.outputs: + self.__add_output(output_topic, component.id) + @staticmethod - def __get_graph_nodes(components: list[PipelineComponent]) -> Iterator[str]: + def __collect_graph_nodes(components: Iterable[PipelineComponent]) -> Iterator[str]: for component in components: yield component.id yield from component.inputs @@ -144,14 +179,13 @@ def __get_graph_nodes(components: list[PipelineComponent]) -> Iterator[str]: def __get_parallel_tasks_from( self, layer: list[str], runner: Callable[[PipelineComponent], Coroutine] ) -> list[Coroutine]: - parallel_tasks = [] - - for node_in_layer in layer: - component = self._component_index[node_in_layer] - if component is not None: - parallel_tasks.append(runner(component)) + def gen_parallel_tasks(): + for node_in_layer in layer: + # check if component, skip topics + if (component := self._component_index.get(node_in_layer)) is not None: + yield runner(component) - return parallel_tasks + return list(gen_parallel_tasks()) def __validate_graph(self) -> None: if not nx.is_directed_acyclic_graph(self.graph): @@ -159,25 +193,15 @@ def __validate_graph(self) -> None: raise ValueError(msg) def validate(self) -> None: - self.validate_unique_names() self.__validate_graph() - def __add_output(self, output_topic: str, source: str) -> None: - self._component_index[output_topic] = None - self.graph.add_node(output_topic) - self.graph.add_edge(source, output_topic) - - def __add_input(self, input_topic: str, target: str) -> None: - self._component_index[input_topic] = None - self.graph.add_node(input_topic) - self.graph.add_edge(input_topic, target) - - def validate_unique_names(self) -> None: - step_names = [component.full_name for component in self.components] - duplicates = [name for name, count in Counter(step_names).items() if count > 1] - if duplicates: - msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}" - raise ValidationError(msg) + def __add_output(self, topic_id: str, source: str) -> None: + self.graph.add_node(topic_id) + self.graph.add_edge(source, topic_id) + + def __add_input(self, topic_id: str, target: str) -> None: + self.graph.add_node(topic_id) + self.graph.add_edge(topic_id, target) def create_env_components_index( @@ -290,6 +314,28 @@ def apply_component( :param component_class: Type of pipeline component :param component_data: Arguments for instantiation of pipeline component """ + + def is_name(name: str) -> ComponentFilterPredicate: + def predicate(component: PipelineComponent) -> bool: + return component.name == name + + return predicate + + # NOTE: temporary until we can just get components by id + # performance improvement + def find(component_name: str) -> PipelineComponent: + """Find component in pipeline by name. + + :param component_name: Name of component to get + :returns: Component matching the name + :raises ValueError: Component not found + """ + try: + return next(self.pipeline.find(is_name(component_name))) + except StopIteration as exc: + msg = f"Component {component_name} not found" + raise ValueError(msg) from exc + component = component_class( config=self.config, handlers=self.handlers, @@ -306,13 +352,11 @@ def apply_component( original_from_component_name, from_topic, ) in enriched_component.from_.components.items(): - original_from_component = self.pipeline.find( - original_from_component_name - ) + original_from_component = find(original_from_component_name) + inflated_from_component = original_from_component.inflate()[-1] - resolved_from_component = self.pipeline.find( - inflated_from_component.name - ) + resolved_from_component = find(inflated_from_component.name) + enriched_component.weave_from_topics( resolved_from_component.to, from_topic ) diff --git a/tests/cli/test_pipeline_steps.py b/tests/cli/test_pipeline_steps.py deleted file mode 100644 index fe0cfe68e..000000000 --- a/tests/cli/test_pipeline_steps.py +++ /dev/null @@ -1,87 +0,0 @@ -from unittest.mock import MagicMock - -import pytest -from polyfactory.factories.pydantic_factory import ModelFactory -from pytest_mock import MockerFixture - -from kpops.cli.main import FilterType, get_steps_to_apply -from kpops.component_handlers import ( - ComponentHandlers, -) -from kpops.components import PipelineComponent -from kpops.components.base_components.models.from_section import FromSection -from kpops.components.base_components.models.to_section import ToSection -from kpops.pipeline import Pipeline - -PREFIX = "example-prefix-" - - -class TestComponentFactory(ModelFactory[PipelineComponent]): - to = ToSection() - from_ = FromSection() - enrich = False - validate = False - handlers = ComponentHandlers(None, MagicMock(), MagicMock()) - - -run_validation = False -test_component_1 = TestComponentFactory.build(run_validation) -test_component_2 = TestComponentFactory.build(run_validation) -test_component_3 = TestComponentFactory.build(run_validation) - -test_component_1.name = "example1" -test_component_2.name = "example2" -test_component_3.name = "example3" - - -@pytest.fixture(autouse=True) -def pipeline() -> Pipeline: - pipeline = Pipeline() - pipeline.add(test_component_1) - pipeline.add(test_component_2) - pipeline.add(test_component_3) - return pipeline - - -@pytest.fixture(autouse=True) -def log_info(mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.cli.main.log.info") - - -def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): - filtered_steps = get_steps_to_apply( - pipeline, "example2,example3", FilterType.INCLUDE - ) - - assert len(filtered_steps) == 2 - assert test_component_2 in filtered_steps - assert test_component_3 in filtered_steps - - assert log_info.call_count == 1 - log_info.assert_any_call( - "The following steps are included:\n['example2', 'example3']" - ) - - filtered_steps = get_steps_to_apply(pipeline, None, FilterType.INCLUDE) - assert len(filtered_steps) == 3 - - filtered_steps = get_steps_to_apply(pipeline, "", FilterType.INCLUDE) - assert len(filtered_steps) == 3 - - -def tests_filter_steps_to_exclude(log_info: MagicMock, pipeline: Pipeline): - filtered_steps = get_steps_to_apply( - pipeline, "example2,example3", FilterType.EXCLUDE - ) - - assert len(filtered_steps) == 1 - assert test_component_1 in filtered_steps - - assert log_info.call_count == 1 - log_info.assert_any_call("The following steps are included:\n['example1']") - - filtered_steps = get_steps_to_apply(pipeline, None, FilterType.EXCLUDE) - assert len(filtered_steps) == 3 - - filtered_steps = get_steps_to_apply(pipeline, "", FilterType.EXCLUDE) - assert len(filtered_steps) == 3 diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 801f1553e..afdbbbdeb 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -1,15 +1,16 @@ import asyncio from pathlib import Path from unittest import mock -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock import pytest import yaml +from pytest_mock import MockerFixture from snapshottest.module import SnapshotTest from typer.testing import CliRunner import kpops -from kpops.cli.main import app +from kpops.cli.main import FilterType, app from kpops.components import PipelineComponent from kpops.pipeline import ParsingException, ValidationError @@ -20,6 +21,10 @@ @pytest.mark.usefixtures("mock_env", "load_yaml_file_clear_cache") class TestGenerate: + @pytest.fixture(autouse=True) + def log_info(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch("kpops.cli.main.log.info") + def test_python_api(self): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", @@ -27,6 +32,39 @@ def test_python_api(self): output=False, ) assert len(pipeline) == 3 + assert [component.type for component in pipeline.components] == [ + "scheduled-producer", + "converter", + "filter", + ] + + def test_python_api_filter_include(self, log_info: MagicMock): + pipeline = kpops.generate( + RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", + defaults=RESOURCE_PATH, + output=False, + steps="converter", + filter_type=FilterType.INCLUDE, + ) + assert len(pipeline) == 1 + assert pipeline.components[0].type == "converter" + assert log_info.call_count == 1 + log_info.assert_any_call("Filtered pipeline:\n['converter']") + + def test_python_api_filter_exclude(self, log_info: MagicMock): + pipeline = kpops.generate( + RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", + defaults=RESOURCE_PATH, + output=False, + steps="converter,scheduled-producer", + filter_type=FilterType.EXCLUDE, + ) + assert len(pipeline) == 1 + assert pipeline.components[0].type == "filter" + assert log_info.call_count == 1 + log_info.assert_any_call( + "Filtered pipeline:\n['a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name']" + ) def test_load_pipeline(self, snapshot: SnapshotTest): result = runner.invoke( @@ -602,7 +640,13 @@ def test_short_topic_definition(self): assert input_components["component-extra-pattern"]["role"] == "role" def test_kubernetes_app_name_validation(self): - with pytest.raises((ValueError, ParsingException)): + with pytest.raises( + ParsingException, + match="Error enriching filter component illegal_name", + ), pytest.raises( + ValueError, + match="The component name illegal_name is invalid for Kubernetes.", + ): runner.invoke( app, [ @@ -619,8 +663,11 @@ def test_kubernetes_app_name_validation(self): def test_validate_unique_step_names(self): with pytest.raises( + ParsingException, + match="Error enriching pipeline-component component component", + ), pytest.raises( ValidationError, - match="step names should be unique. duplicate step names: resources-pipeline-duplicate-step-names-component", + match="Pipeline steps must have unique id, 'component-resources-pipeline-duplicate-step-names-component' already exists.", ): runner.invoke( app, @@ -698,9 +745,7 @@ async def name_runner(component: PipelineComponent): await asyncio.sleep(sleep_table_components[component.name]) await called_component(component.name) - execution_graph = pipeline.build_execution_graph_from( - list(pipeline.components), False, name_runner - ) + execution_graph = pipeline.build_execution_graph(name_runner) await execution_graph @@ -724,18 +769,18 @@ async def test_subgraph_execution(self): config=RESOURCE_PATH / "parallel-pipeline", ) - list_of_components = list(pipeline.components) - called_component = AsyncMock() async def name_runner(component: PipelineComponent): await called_component(component.name) - execution_graph = pipeline.build_execution_graph_from( - [list_of_components[0], list_of_components[3], list_of_components[6]], - False, - name_runner, - ) + pipeline.remove(pipeline.components[8].id) + pipeline.remove(pipeline.components[7].id) + pipeline.remove(pipeline.components[5].id) + pipeline.remove(pipeline.components[4].id) + pipeline.remove(pipeline.components[2].id) + pipeline.remove(pipeline.components[1].id) + execution_graph = pipeline.build_execution_graph(name_runner) await execution_graph @@ -771,9 +816,7 @@ async def name_runner(component: PipelineComponent): await asyncio.sleep(sleep_table_components[component.name]) await called_component(component.name) - execution_graph = pipeline.build_execution_graph_from( - list(pipeline.components), True, name_runner - ) + execution_graph = pipeline.build_execution_graph(name_runner, reverse=True) await execution_graph diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py new file mode 100644 index 000000000..d0c53667b --- /dev/null +++ b/tests/pipeline/test_pipeline.py @@ -0,0 +1,75 @@ +from unittest.mock import MagicMock + +import pytest +from polyfactory.factories.pydantic_factory import ModelFactory + +from kpops.cli.main import create_default_step_names_filter_predicate +from kpops.cli.options import FilterType +from kpops.component_handlers import ( + ComponentHandlers, +) +from kpops.components import PipelineComponent +from kpops.components.base_components.models.from_section import FromSection +from kpops.components.base_components.models.to_section import ToSection +from kpops.pipeline import Pipeline + +PREFIX = "example-prefix-" + + +class TestComponentFactory(ModelFactory[PipelineComponent]): + to = ToSection() + from_ = FromSection() + enrich = False + validate = False + handlers = ComponentHandlers(None, MagicMock(), MagicMock()) + + +run_validation = False +test_component_1 = TestComponentFactory.build(run_validation) +test_component_2 = TestComponentFactory.build(run_validation) +test_component_3 = TestComponentFactory.build(run_validation) + +test_component_1.name = "example1" +test_component_2.name = "example2" +test_component_3.name = "example3" + + +class TestPipeline: + @pytest.fixture(autouse=True) + def pipeline(self) -> Pipeline: + pipeline = Pipeline() + pipeline.add(test_component_1) + pipeline.add(test_component_2) + pipeline.add(test_component_3) + return pipeline + + def test_filter_include(self, pipeline: Pipeline): + predicate = create_default_step_names_filter_predicate( + {"example2", "example3"}, FilterType.INCLUDE + ) + pipeline.filter(predicate) + assert len(pipeline.components) == 2 + assert test_component_2 in pipeline.components + assert test_component_3 in pipeline.components + + def test_filter_include_empty(self, pipeline: Pipeline): + predicate = create_default_step_names_filter_predicate( + set(), FilterType.INCLUDE + ) + pipeline.filter(predicate) + assert len(pipeline.components) == 0 + + def test_filter_exclude(self, pipeline: Pipeline): + predicate = create_default_step_names_filter_predicate( + {"example2", "example3"}, FilterType.EXCLUDE + ) + pipeline.filter(predicate) + assert len(pipeline.components) == 1 + assert test_component_1 in pipeline.components + + def test_filter_exclude_empty(self, pipeline: Pipeline): + predicate = create_default_step_names_filter_predicate( + set(), FilterType.EXCLUDE + ) + pipeline.filter(predicate) + assert len(pipeline.components) == 3