Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] DataCatalog problem with ThreadRunner on kedro >=0.19.7 #4191

Open
andresrq-mckinsey opened this issue Sep 24, 2024 · 5 comments
Open
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed

Comments

@andresrq-mckinsey
Copy link

andresrq-mckinsey commented Sep 24, 2024

Description

After kedro >= 0.19.7, there was a change introduced into the DataCatalog on PR #3990 with the addition of __repr__ into the _FrozenDatasets. This is causing an error: dictionary changed size during iteration when using the ThreadRunner on some scenarios that handle big volumes of data.

Context

So, there are 3 components that are acting to generate this error:

  • The pluggy/_tracing.py has an inherit logger.debug that prints the DataCatalog
    • This always executes, even if the logger level is not debug
    • Runs for every node (thus, every created thread by the ThreadRunner) as it happens in after_node_run [hook] log from pluggy
    • Output example:
       2024-09-20 16:52:20,439 [DEBUG] - /databricks/python/lib/python3.10/site-packages/pluggy/_tracing.py @ kedro.framework.hooks.manager:34 23365 139967147775552 -       after_node_run [hook]
                 node: some_func([input_dataset]) -> [output_dataset]
                 catalog: {'dataset_name': "<dataset_class>", ...}
      
  • MemoryDatasets not previously declared in the catalog.yml are added into the DataCatalog self.datasets property once the node finishes, to keep track of them
  • ThreadRunner runs every node on a thread

The flow is as follows:

  1. When using a ThreadRunner, every node is run in a thread, and the before_node_run, on_node_error and after_node_run get to also run in parallel.
  2. After a node finishes, the debug logging that inherently happens on after_node_run from the _tracing.py will try to print the entire dataset object (see func __repr__ from DataCatalog), which triggers the print (__repr__) of the _FrozenDatasets.
  3. Finally, the output dataset, if not previously declared in the catalog.yml, will be added into the DataCatalog as a MemoryDataset during the dataset saving process (runner.py:530 in _run_node_sequential: catalog.save(name, data) -> data_catalog.py:579 in save: dataset = self._get_dataset(name) -> data_catalog.py:453 in _get_dataset: self.add(dataset_name, dataset))

This was not a problem, as the entire dictionary was being printed before the implementation of __repr__ in #3990 (so step 2 was like an atomic action). Now, it is being built with formatting, by iterating over the keys from self._original_names; but if another node finishes and is at step 2 at the same time a MemoryDataset is being saved and added into the DataCatalog (step 3), then keys from self._original_names get changed during iteration, which triggers the error.

How to fix this

There are two methods that I know about to fix this problem: user fix or kedro fix

  1. The kedro fix is to add in _FrozenDatasets.__repr__(self) function a shallow copy of the dictionary, so that if new items get added, the current process that is iterating over the self._original_names do not see this effect.
  class _FrozenDatasets:
    ...
    def __repr__(self) -> str:
        datasets_repr = {}
        datasets = copy.copy(self._original_names) #<-- The change
        for ds_name in datasets.keys():
            datasets_repr[ds_name] = self.__dict__[
                _sub_nonword_chars(ds_name)
            ].__repr__()

        return pprint.pformat(datasets_repr, sort_dicts=False)
  1. For the user fix, in case that this fix is not added into the kedro repository for whatever reason or it takes more time than expected, the users can take advantage of the dataset factories from the OmegaConfigLoader to preemptively add all missing in-between-nodes datasets, thus voiding the step 3 problem generated by threading.

In the catalog.yml

"{dataset_name}":
   type: MemoryDataset

Steps to Reproduce

I've tried to reproduce this problem on a kedro starter project, but these projects do not operate on datasets big enough to show the behavior. But the needed ingredients its to have dataset outputs not declared on the catalog.yml while using the ThreadRunner, and a big DataCatalog that takes time printing

Expected Result

Pipeline should work as previous kedro versiones

Actual Result

Pipeline fails with the following error

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/Project_package/databricks_run.py:159, in main(var_1, var_2, var_3, var_4, var_5, var_6)
    153 with KedroSession.create(
    154     env=env,
    155     conf_source=conf_source,
    156     extra_params=extra_params,
    157 ) as session:
--> 158     session.run(pipeline_name=pipeline, runner="ThreadRunner")

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/framework/session/session.py:398, in KedroSession.run(self, pipeline_name, tags, runner, node_names, from_nodes, to_nodes, from_inputs, to_outputs, load_versions, namespace)
    393 hook_manager.hook.before_pipeline_run(
    394     run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
    395 )
    397 try:
--> 398     run_result = runner.run(
    399         filtered_pipeline, catalog, hook_manager, session_id
    400     )
    401     self._run_called = True
    402 except Exception as error:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/runner/runner.py:121, in AbstractRunner.run(self, pipeline, catalog, hook_manager, session_id)
    117 if self._is_async:
    118     self._logger.info(
    119         "Asynchronous mode is enabled for loading and saving data"
    120     )
--> 121 self._run(pipeline, catalog, hook_or_null_manager, session_id)  # type: ignore[arg-type]
    123 self._logger.info("Pipeline execution completed successfully.")
    125 return {ds_name: catalog.load(ds_name) for ds_name in free_outputs}

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/runner/thread_runner.py:136, in ThreadRunner._run(self, pipeline, catalog, hook_manager, session_id)
    134 for future in done:
    135     try:
--> 136         node = future.result()
    137     except Exception:
    138         self._suggest_resume_scenario(pipeline, done_nodes, catalog)

File /usr/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout)
    449     raise CancelledError()
    450 elif self._state == FINISHED:
--> 451     return self.__get_result()
    453 self._condition.wait(timeout)
    455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File /usr/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self)
     55     return
     57 try:
---> 58     result = self.fn(*self.args, **self.kwargs)
     59 except BaseException as exc:
     60     self.future.set_exception(exc)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/runner/runner.py:417, in run_node(node, catalog, hook_manager, is_async, session_id)
    415     node = _run_node_async(node, catalog, hook_manager, session_id)
    416 else:
--> 417     node = _run_node_sequential(node, catalog, hook_manager, session_id)
    419 for name in node.confirms:
    420     catalog.confirm(name)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/runner/runner.py:510, in _run_node_sequential(node, catalog, hook_manager, session_id)
    505 additional_inputs = _collect_inputs_from_hook(
    506     node, catalog, inputs, is_async, hook_manager, session_id=session_id
    507 )
    508 inputs.update(additional_inputs)
--> 510 outputs = _call_node_run(
    511     node, catalog, inputs, is_async, hook_manager, session_id=session_id
    512 )
    514 items: Iterable = outputs.items()
    515 # if all outputs are iterators, then the node is a generator node

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/runner/runner.py:477, in _call_node_run(node, catalog, inputs, is_async, hook_manager, session_id)
    468     hook_manager.hook.on_node_error(
    469         error=exc,
    470         node=node,
   (...)
    474         session_id=session_id,
    475     )
    476     raise exc
--> 477 hook_manager.hook.after_node_run(
    478     node=node,
    479     catalog=catalog,
    480     inputs=inputs,
    481     outputs=outputs,
    482     is_async=is_async,
    483     session_id=session_id,
    484 )
    485 return outputs

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_hooks.py:513, in HookCaller.__call__(self, **kwargs)
    511 firstresult = self.spec.opts.get("firstresult", False) if self.spec else False
    512 # Copy because plugins may register other plugins during iteration (#438).
--> 513 return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_manager.py:120, in PluginManager._hookexec(self, hook_name, methods, kwargs, firstresult)
    111 def _hookexec(
    112     self,
    113     hook_name: str,
   (...)
    118     # called from all hookcaller instances.
    119     # enable_tracing will set its own wrapping function at self._inner_hookexec
--> 120     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_manager.py:475, in PluginManager.add_hookcall_monitoring.<locals>.traced_hookexec(hook_name, hook_impls, caller_kwargs, firstresult)
    469 def traced_hookexec(
    470     hook_name: str,
    471     hook_impls: Sequence[HookImpl],
    472     caller_kwargs: Mapping[str, object],
    473     firstresult: bool,
    474 ) -> object | list[object]:
--> 475     before(hook_name, hook_impls, caller_kwargs)
    476     outcome = Result.from_call(
    477         lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult)
    478     )
    479     after(outcome, hook_name, hook_impls, caller_kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_manager.py:500, in PluginManager.enable_tracing.<locals>.before(hook_name, methods, kwargs)
    496 def before(
    497     hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object]
    498 ) -> None:
    499     hooktrace.root.indent += 1
--> 500     hooktrace(hook_name, kwargs)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_tracing.py:70, in TagTracerSub.__call__(self, *args)
     69 def __call__(self, *args: object) -> None:
---> 70     self.root._processmessage(self.tags, args)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_tracing.py:45, in TagTracer._processmessage(self, tags, args)
     43 def _processmessage(self, tags: tuple[str, ...], args: tuple[object, ...]) -> None:
     44     if self._writer is not None and args:
---> 45         self._writer(self._format_message(tags, args))
     46     try:
     47         processor = self._tags2proc[tags]

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/pluggy/_tracing.py:39, in TagTracer._format_message(self, tags, args)
     36 lines = ["{}{} [{}]\n".format(indent, content, ":".join(tags))]
     38 for name, value in extra.items():
---> 39     lines.append(f"{indent}    {name}: {value}\n")
     41 return "".join(lines)

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/io/data_catalog.py:223, in DataCatalog.__repr__(self)
    222 def __repr__(self) -> str:
--> 223     return self.datasets.__repr__()

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-255cb2cb-5cae-4332-a7bb-fb5f24ca142f/lib/python3.10/site-packages/kedro/io/data_catalog.py:143, in _FrozenDatasets.__repr__(self)
    141 def __repr__(self) -> str:
    142     datasets_repr = {}
--> 143     for ds_name in self._original_names.keys():
    144         datasets_repr[ds_name] = self.__dict__[
    145             _sub_nonword_chars(ds_name)
    146         ].__repr__()
    148     return pprint.pformat(datasets_repr, sort_dicts=False)

RuntimeError: dictionary changed size during iteration

Your Environment

  • Kedro version used (pip show kedro or kedro -V): 0.19.8 but also happens in 0.19.7 (when the change was introduced)
  • Python version used (python -V): 3.11
  • Operating system and version: Databricks DBR 13.3LTS
@andresrq-mckinsey andresrq-mckinsey changed the title DataCatalog problem with ThreadRunner kedro on >=0.19.7 [BUG] DataCatalog problem with ThreadRunner kedro on >=0.19.7 Sep 24, 2024
@andresrq-mckinsey andresrq-mckinsey changed the title [BUG] DataCatalog problem with ThreadRunner kedro on >=0.19.7 [BUG] DataCatalog problem with ThreadRunner on kedro >=0.19.7 Sep 24, 2024
@cramirez98
Copy link

Im encountering the same issue while trying to run a pipeline in Databricks after upgrading from kedro 0.19.3

@ankatiyar ankatiyar added the Issue: Bug Report 🐞 Bug that needs to be fixed label Sep 25, 2024
@ankatiyar
Copy link
Contributor

Hey @andresrq-mckinsey and @cramirez98, thanks for reporting this! I've added this to our backlog!

Also ccing @ElenaKhaustova to see if any changes with the new catalog address this already?

@ElenaKhaustova
Copy link
Contributor

Hey all! Thank you for reporting the issue.

As previously discussed in the support channel this bug will be solved after releasing new catalog - KedroDataCatalog, basically in the next Kedro version. It's already merged to the main branch and @andresrq-mckinsey confirmed it solves the problem.

Currently one can use the main branch with KedroDataCatalog introduced. For that, one needs to modify the settings.py and run commands as usual:

# settings.py

from kedro.io import KedroDataCatalog
DATA_CATALOG_CLASS = KedroDataCatalog

There's also a temporal alternative to stick to the older Kedro version 0.19.6 before this issue appeared.

@ankatiyar
Copy link
Contributor

@ElenaKhaustova shall we marked this as resolved and close it or does it make sense to keep this open?

@ElenaKhaustova
Copy link
Contributor

ElenaKhaustova commented Sep 25, 2024

@ElenaKhaustova shall we marked this as resolved and close it or does it make sense to keep this open?

Let's keep it open until the release at least? In case we decide fixing it for the old catalog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed
Projects
Status: No status
Development

No branches or pull requests

4 participants