From f8f225f8eb187150ef101ec2b7ae4af5de30f31d Mon Sep 17 00:00:00 2001 From: Israel Fruchter Date: Wed, 14 Aug 2024 21:22:49 +0300 Subject: [PATCH] treewide: make sure we use ruamel.yaml in a thread safe manner we have multiple tests failing with the following: ``` > self.stream.write(data) E ValueError: I/O operation on closed file. ../scylla/.local/lib/python3.12/site-packages/ruamel/yaml/emitter.py:1249: ValueError ``` seems like it start when we have calls from multiple thread that might be updating the same files, or sharing the same state. this change make sure we create `YAML` instances as needed, so they won't be shared between threads Ref: https://sourceforge.net/p/ruamel-yaml/tickets/367/ (cherry picked from commit 1e6e8b190f1554d68b75067fd93b0aa5d259d9cb) --- ccmlib/cluster.py | 4 +--- ccmlib/cluster_factory.py | 4 +--- ccmlib/common.py | 7 +++---- ccmlib/dse_node.py | 7 ++----- ccmlib/node.py | 12 +++++------- ccmlib/scylla_cluster.py | 10 ++++------ ccmlib/scylla_docker_cluster.py | 4 +--- ccmlib/scylla_node.py | 23 ++++++++++------------- ccmlib/scylla_repository.py | 7 ++----- ccmlib/utils/sni_proxy.py | 6 ++---- 10 files changed, 31 insertions(+), 53 deletions(-) diff --git a/ccmlib/cluster.py b/ccmlib/cluster.py index a2ff75b2..eff426fc 100644 --- a/ccmlib/cluster.py +++ b/ccmlib/cluster.py @@ -15,8 +15,6 @@ from ccmlib.common import logger from ccmlib.utils.version import parse_version -yaml = YAML() -yaml.default_flow_style = False class Cluster(object): @@ -677,7 +675,7 @@ def _update_config(self, install_dir=None): cluster_config['sni_proxy_listen_port'] = self.sni_proxy_listen_port with open(filename, 'w') as f: - yaml.dump(cluster_config, f) + YAML().dump(cluster_config, f) def __update_pids(self, started): for node, p, _ in started: diff --git a/ccmlib/cluster_factory.py b/ccmlib/cluster_factory.py index 7e108969..23c39e22 100644 --- a/ccmlib/cluster_factory.py +++ b/ccmlib/cluster_factory.py @@ -10,8 +10,6 @@ from ccmlib import repository from ccmlib.node import Node -yaml = YAML() -yaml.default_flow_style = False class ClusterFactory(): @@ -20,7 +18,7 @@ def load(path, name): cluster_path = os.path.join(path, name) filename = os.path.join(cluster_path, 'cluster.conf') with open(filename, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) try: install_dir = None scylla_manager_install_path = data.get('scylla_manager_install_path') diff --git a/ccmlib/common.py b/ccmlib/common.py index 668563e0..9352c170 100644 --- a/ccmlib/common.py +++ b/ccmlib/common.py @@ -27,8 +27,6 @@ from botocore import UNSIGNED from botocore.client import Config -yaml = YAML() -yaml.default_flow_style = False BIN_DIR = "bin" CASSANDRA_CONF_DIR = "conf" @@ -817,6 +815,7 @@ def normalize_interface(itf): def parse_settings(args): settings = {} + yaml = YAML() for s in args: splitted = s.split(':', 1) if len(splitted) != 2: @@ -888,7 +887,7 @@ def get_version_from_build(install_dir=None, node_path=None): def get_default_scylla_yaml(install_dir): scylla_yaml_path = Path(install_dir) / SCYLLA_CONF_DIR / SCYLLA_CONF with scylla_yaml_path.open() as f: - return yaml.load(f) + return YAML().load(f) def _get_scylla_version(install_dir): scylla_version_files = [ @@ -970,7 +969,7 @@ def is_dse_cluster(path): cluster_path = os.path.join(path, name) filename = os.path.join(cluster_path, 'cluster.conf') with open(filename, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) if 'dse_dir' in data: return True except IOError: diff --git a/ccmlib/dse_node.py b/ccmlib/dse_node.py index 12d54198..c1bef3f7 100644 --- a/ccmlib/dse_node.py +++ b/ccmlib/dse_node.py @@ -14,9 +14,6 @@ from ccmlib import common from ccmlib.node import Node, NodeError -yaml = YAML() -yaml.default_flow_style = False - class DseNode(Node): @@ -296,7 +293,7 @@ def import_bin_files(self): def __update_yaml(self): conf_file = os.path.join(self.get_path(), 'resources', 'dse', 'conf', 'dse.yaml') with open(conf_file, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) data['system_key_directory'] = os.path.join(self.get_path(), 'keys') @@ -313,7 +310,7 @@ def __update_yaml(self): data[name] = full_options[name] with open(conf_file, 'w') as f: - yaml.dump(data, f) + YAML().dump(data, f) def _update_log4j(self): super(DseNode, self)._update_log4j() diff --git a/ccmlib/node.py b/ccmlib/node.py index b9480084..373c536a 100644 --- a/ccmlib/node.py +++ b/ccmlib/node.py @@ -24,8 +24,6 @@ from ccmlib.repository import setup from ccmlib.utils.version import parse_version -yaml = YAML() -yaml.default_flow_style = False class Status(): UNINITIALIZED = "UNINITIALIZED" @@ -138,7 +136,7 @@ def load(path, name, cluster): node_path = os.path.join(path, name) filename = os.path.join(node_path, 'node.conf') with open(filename, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) try: itf = data['interfaces'] initial_token = None @@ -1586,12 +1584,12 @@ def _update_config(self): if self.workload is not None: values['workload'] = self.workload with open(filename, 'w') as f: - yaml.dump(values, f) + YAML().dump(values, f) def __update_yaml(self): conf_file = os.path.join(self.get_conf_dir(), common.CASSANDRA_CONF) with open(conf_file, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) with open(conf_file, 'r') as f: yaml_text = f.read() @@ -1641,7 +1639,7 @@ def __update_yaml(self): data[name] = full_options[name] with open(conf_file, 'w') as f: - yaml.dump(data, f) + YAML().dump(data, f) def _update_log4j(self): append_pattern = 'log4j.appender.R.File=' @@ -1963,7 +1961,7 @@ def _clean_win_jmx(self): def get_conf_option(self, option): conf_file = os.path.join(self.get_conf_dir(), common.CASSANDRA_CONF) with open(conf_file, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) if option in data: return data[option] diff --git a/ccmlib/scylla_cluster.py b/ccmlib/scylla_cluster.py index a155989b..82f1d577 100644 --- a/ccmlib/scylla_cluster.py +++ b/ccmlib/scylla_cluster.py @@ -19,8 +19,6 @@ SNITCH = 'org.apache.cassandra.locator.GossipingPropertyFileSnitch' -yaml = YAML() -yaml.default_flow_style = False class ScyllaCluster(Cluster): @@ -246,7 +244,7 @@ def _update_config(self, install_dir=None): filename = os.path.join(self.get_path(), 'cluster.conf') with open(filename, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) if self.is_scylla_reloc(): data['scylla_version'] = self.scylla_version @@ -255,7 +253,7 @@ def _update_config(self, install_dir=None): data['scylla_manager_install_path'] = self._scylla_manager.install_dir with open(filename, 'w') as f: - yaml.dump(data, f) + YAML().dump(data, f) def sctool(self, cmd): if self._scylla_manager == None: @@ -315,7 +313,7 @@ def _get_api_address(self): def _update_config(self, install_dir=None): conf_file = os.path.join(self._get_path(), common.SCYLLAMANAGER_CONF) with open(conf_file, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) data['http'] = self._get_api_address() if not 'database' in data: data['database'] = {} @@ -351,7 +349,7 @@ def _update_config(self, install_dir=None): for key in keys_to_delete: del data[key] with open(conf_file, 'w') as f: - yaml.dump(data, f) + YAML().dump(data, f) def _copy_config_files(self, install_dir): conf_dir = os.path.join(install_dir, 'etc') diff --git a/ccmlib/scylla_docker_cluster.py b/ccmlib/scylla_docker_cluster.py index 04cae3eb..180aca97 100644 --- a/ccmlib/scylla_docker_cluster.py +++ b/ccmlib/scylla_docker_cluster.py @@ -12,8 +12,6 @@ LOGGER = logging.getLogger("ccm") -yaml = YAML() -yaml.default_flow_style = False class ScyllaDockerCluster(ScyllaCluster): def __init__(self, *args, **kwargs): @@ -139,7 +137,7 @@ def update_yaml(self): copyfile(src=file_path, dst=os.path.join(keys_dir_path, file_name)) server_encryption_options[key] = os.path.join(self.base_data_path, "keys", file_name) with open(conf_file, 'w') as f: - yaml.safe_dump(data, f, default_flow_style=False) + YAML().dump(data, f) def create_docker(self, args): # TODO: handle smp correctly via the correct param/api (or only via commandline params) diff --git a/ccmlib/scylla_node.py b/ccmlib/scylla_node.py index 7178eff5..ad20c036 100644 --- a/ccmlib/scylla_node.py +++ b/ccmlib/scylla_node.py @@ -19,10 +19,10 @@ import logging import psutil -from ruamel.yaml import YAML import glob import re import requests +from ruamel.yaml import YAML from ccmlib.common import CASSANDRA_SH, BIN_DIR, wait_for, copy_directory, print_if_standalone @@ -35,9 +35,6 @@ from ccmlib.utils.version import parse_version -yaml = YAML(typ='safe') -yaml.default_flow_style = False - class ScyllaNode(Node): """ @@ -117,7 +114,7 @@ def has_jmx(self): @property def scylla_yaml(self) -> Dict[str, Any]: - return yaml.load(Path(self.get_conf_dir()) / common.SCYLLA_CONF) + return YAML().load(Path(self.get_conf_dir()) / common.SCYLLA_CONF) @property def api_port(self) -> int: @@ -383,18 +380,18 @@ def _create_agent_config(self): data['s3'] = {"endpoint": os.getenv("AWS_S3_ENDPOINT"), "provider": "Minio"} with open(conf_file, 'w') as f: - yaml.dump(data, f) + YAML().dump(data, f) return conf_file def update_agent_config(self, new_settings, restart_agent_after_change=True): conf_file = os.path.join(self.get_conf_dir(), 'scylla-manager-agent.yaml') with open(conf_file, 'r') as f: - current_config = yaml.load(f) + current_config = YAML().load(f) current_config.update(new_settings) with open(conf_file, 'w') as f: - yaml.dump(current_config, f) + YAML().dump(current_config, f) if restart_agent_after_change: self.restart_scylla_manager_agent(gently=True, recreate_config=False) @@ -427,7 +424,7 @@ def start_scylla_manager_agent(self, create_config=True): pid_file.write(str(self._process_agent.pid)) with open(config_file, 'r') as f: - current_config = yaml.load(f) + current_config = YAML().load(f) # Extracting currently configured port current_listening_port = int(current_config['https'].split(":")[1]) @@ -580,7 +577,7 @@ def start(self, join_ring=True, no_wait=False, verbose=False, # from config file scylla#59 conf_file = os.path.join(self.get_conf_dir(), common.SCYLLA_CONF) with open(conf_file, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) jvm_args = jvm_args + ['--api-address', data['api_address']] args = [launch_bin, '--options-file', options_file, '--log-to-stdout', '1'] @@ -1155,7 +1152,7 @@ def update_yaml(self): # TODO: copied from node.py conf_file = os.path.join(self.get_conf_dir(), common.SCYLLA_CONF) with open(conf_file, 'r') as f: - data = yaml.load(f) + data = YAML().load(f) data['cluster_name'] = self.cluster.name data['auto_bootstrap'] = self.auto_bootstrap @@ -1216,7 +1213,7 @@ def update_yaml(self): if 'alternator_port' in data or 'alternator_https_port' in data: data['alternator_address'] = data['listen_address'] with open(conf_file, 'w') as f: - yaml.dump(data, f) + YAML().dump(data, f) # TODO: - for now create a cassandra conf file leaving only # cassandra config items - this should be removed once tools are @@ -1326,7 +1323,7 @@ def update_yaml(self): cassandra_data[key] = data[key] with open(cassandra_conf_file, 'w') as f: - yaml.dump(cassandra_data, f) + YAML().dump(cassandra_data, f) def __update_yaml_dse(self): raise NotImplementedError('ScyllaNode.__update_yaml_dse') diff --git a/ccmlib/scylla_repository.py b/ccmlib/scylla_repository.py index 40a6c728..bb7aa43a 100644 --- a/ccmlib/scylla_repository.py +++ b/ccmlib/scylla_repository.py @@ -14,8 +14,8 @@ from typing import NamedTuple, Literal import requests -from ruamel.yaml import YAML import packaging.version +from ruamel.yaml import YAML from ccmlib.common import ( ArgumentError, CCMError, get_default_path, rmdirs, validate_install_dir, get_scylla_version, aws_bucket_ls, @@ -23,9 +23,6 @@ from ccmlib.utils.download import download_file, download_version_from_s3, get_url_hash, save_source_file from ccmlib.utils.version import parse_version -yaml = YAML() -yaml.default_flow_style = False - GIT_REPO = "http://github.com/scylladb/scylla.git" @@ -189,7 +186,7 @@ def read_build_manifest(url): # # url-id: 2022-08-29T08:05:34Z # docker-image-name: scylla-nightly:5.2.0-dev-0.20220829.67c91e8bcd61 - return yaml.load(res.content) + return YAML().load(res.content) def normalize_scylla_version(version): diff --git a/ccmlib/utils/sni_proxy.py b/ccmlib/utils/sni_proxy.py index ea683e5e..f529f280 100644 --- a/ccmlib/utils/sni_proxy.py +++ b/ccmlib/utils/sni_proxy.py @@ -18,8 +18,6 @@ logger = logging.getLogger(__name__) -yaml = YAML() -yaml.default_flow_style = False @contextmanager def file_or_memory(path=None, data=None): @@ -67,7 +65,7 @@ def encode_base64(filename): currentContext='default') with open(os.path.join(ssl_dir, 'config_data.yaml'), 'w') as config_file: - yaml.dump(config, config_file) + YAML().dump(config, config_file) datacenters = {} @@ -86,7 +84,7 @@ def encode_base64(filename): currentContext='default') with open(os.path.join(ssl_dir, 'config_path.yaml'), 'w') as config_file: - yaml.dump(config, config_file) + YAML().dump(config, config_file) return os.path.join(ssl_dir, 'config_data.yaml'), os.path.join(ssl_dir, 'config_path.yaml')