Skip to content

Commit

Permalink
treewide: make sure we use ruamel.yaml in a thread safe manner
Browse files Browse the repository at this point in the history
we have multiple tests failing with the following:
```
>       self.stream.write(data)
E       ValueError: I/O operation on closed file.

../scylla/.local/lib/python3.12/site-packages/ruamel/yaml/emitter.py:1249: ValueError
```

seems like it start when we have calls from multiple thread that might be
updating the same files, or sharing the same state.

this change make sure we create `YAML` instances as needed,
so they won't be shared between threads

Ref: https://sourceforge.net/p/ruamel-yaml/tickets/367/
(cherry picked from commit 1e6e8b1)
  • Loading branch information
fruch committed Sep 15, 2024
1 parent 419a9b4 commit f8f225f
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 53 deletions.
4 changes: 1 addition & 3 deletions ccmlib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from ccmlib.common import logger
from ccmlib.utils.version import parse_version

yaml = YAML()
yaml.default_flow_style = False

class Cluster(object):

Expand Down Expand Up @@ -677,7 +675,7 @@ def _update_config(self, install_dir=None):
cluster_config['sni_proxy_listen_port'] = self.sni_proxy_listen_port

with open(filename, 'w') as f:
yaml.dump(cluster_config, f)
YAML().dump(cluster_config, f)

def __update_pids(self, started):
for node, p, _ in started:
Expand Down
4 changes: 1 addition & 3 deletions ccmlib/cluster_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from ccmlib import repository
from ccmlib.node import Node

yaml = YAML()
yaml.default_flow_style = False

class ClusterFactory():

Expand All @@ -20,7 +18,7 @@ def load(path, name):
cluster_path = os.path.join(path, name)
filename = os.path.join(cluster_path, 'cluster.conf')
with open(filename, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)
try:
install_dir = None
scylla_manager_install_path = data.get('scylla_manager_install_path')
Expand Down
7 changes: 3 additions & 4 deletions ccmlib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
from botocore import UNSIGNED
from botocore.client import Config

yaml = YAML()
yaml.default_flow_style = False

BIN_DIR = "bin"
CASSANDRA_CONF_DIR = "conf"
Expand Down Expand Up @@ -817,6 +815,7 @@ def normalize_interface(itf):

def parse_settings(args):
settings = {}
yaml = YAML()
for s in args:
splitted = s.split(':', 1)
if len(splitted) != 2:
Expand Down Expand Up @@ -888,7 +887,7 @@ def get_version_from_build(install_dir=None, node_path=None):
def get_default_scylla_yaml(install_dir):
scylla_yaml_path = Path(install_dir) / SCYLLA_CONF_DIR / SCYLLA_CONF
with scylla_yaml_path.open() as f:
return yaml.load(f)
return YAML().load(f)

def _get_scylla_version(install_dir):
scylla_version_files = [
Expand Down Expand Up @@ -970,7 +969,7 @@ def is_dse_cluster(path):
cluster_path = os.path.join(path, name)
filename = os.path.join(cluster_path, 'cluster.conf')
with open(filename, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)
if 'dse_dir' in data:
return True
except IOError:
Expand Down
7 changes: 2 additions & 5 deletions ccmlib/dse_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
from ccmlib import common
from ccmlib.node import Node, NodeError

yaml = YAML()
yaml.default_flow_style = False


class DseNode(Node):

Expand Down Expand Up @@ -296,7 +293,7 @@ def import_bin_files(self):
def __update_yaml(self):
conf_file = os.path.join(self.get_path(), 'resources', 'dse', 'conf', 'dse.yaml')
with open(conf_file, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)

data['system_key_directory'] = os.path.join(self.get_path(), 'keys')

Expand All @@ -313,7 +310,7 @@ def __update_yaml(self):
data[name] = full_options[name]

with open(conf_file, 'w') as f:
yaml.dump(data, f)
YAML().dump(data, f)

def _update_log4j(self):
super(DseNode, self)._update_log4j()
Expand Down
12 changes: 5 additions & 7 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from ccmlib.repository import setup
from ccmlib.utils.version import parse_version

yaml = YAML()
yaml.default_flow_style = False

class Status():
UNINITIALIZED = "UNINITIALIZED"
Expand Down Expand Up @@ -138,7 +136,7 @@ def load(path, name, cluster):
node_path = os.path.join(path, name)
filename = os.path.join(node_path, 'node.conf')
with open(filename, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)
try:
itf = data['interfaces']
initial_token = None
Expand Down Expand Up @@ -1586,12 +1584,12 @@ def _update_config(self):
if self.workload is not None:
values['workload'] = self.workload
with open(filename, 'w') as f:
yaml.dump(values, f)
YAML().dump(values, f)

def __update_yaml(self):
conf_file = os.path.join(self.get_conf_dir(), common.CASSANDRA_CONF)
with open(conf_file, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)

with open(conf_file, 'r') as f:
yaml_text = f.read()
Expand Down Expand Up @@ -1641,7 +1639,7 @@ def __update_yaml(self):
data[name] = full_options[name]

with open(conf_file, 'w') as f:
yaml.dump(data, f)
YAML().dump(data, f)

def _update_log4j(self):
append_pattern = 'log4j.appender.R.File='
Expand Down Expand Up @@ -1963,7 +1961,7 @@ def _clean_win_jmx(self):
def get_conf_option(self, option):
conf_file = os.path.join(self.get_conf_dir(), common.CASSANDRA_CONF)
with open(conf_file, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)

if option in data:
return data[option]
Expand Down
10 changes: 4 additions & 6 deletions ccmlib/scylla_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

SNITCH = 'org.apache.cassandra.locator.GossipingPropertyFileSnitch'

yaml = YAML()
yaml.default_flow_style = False

class ScyllaCluster(Cluster):

Expand Down Expand Up @@ -246,7 +244,7 @@ def _update_config(self, install_dir=None):
filename = os.path.join(self.get_path(), 'cluster.conf')

with open(filename, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)

if self.is_scylla_reloc():
data['scylla_version'] = self.scylla_version
Expand All @@ -255,7 +253,7 @@ def _update_config(self, install_dir=None):
data['scylla_manager_install_path'] = self._scylla_manager.install_dir

with open(filename, 'w') as f:
yaml.dump(data, f)
YAML().dump(data, f)

def sctool(self, cmd):
if self._scylla_manager == None:
Expand Down Expand Up @@ -315,7 +313,7 @@ def _get_api_address(self):
def _update_config(self, install_dir=None):
conf_file = os.path.join(self._get_path(), common.SCYLLAMANAGER_CONF)
with open(conf_file, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)
data['http'] = self._get_api_address()
if not 'database' in data:
data['database'] = {}
Expand Down Expand Up @@ -351,7 +349,7 @@ def _update_config(self, install_dir=None):
for key in keys_to_delete:
del data[key]
with open(conf_file, 'w') as f:
yaml.dump(data, f)
YAML().dump(data, f)

def _copy_config_files(self, install_dir):
conf_dir = os.path.join(install_dir, 'etc')
Expand Down
4 changes: 1 addition & 3 deletions ccmlib/scylla_docker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

LOGGER = logging.getLogger("ccm")

yaml = YAML()
yaml.default_flow_style = False

class ScyllaDockerCluster(ScyllaCluster):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -139,7 +137,7 @@ def update_yaml(self):
copyfile(src=file_path, dst=os.path.join(keys_dir_path, file_name))
server_encryption_options[key] = os.path.join(self.base_data_path, "keys", file_name)
with open(conf_file, 'w') as f:
yaml.safe_dump(data, f, default_flow_style=False)
YAML().dump(data, f)

def create_docker(self, args):
# TODO: handle smp correctly via the correct param/api (or only via commandline params)
Expand Down
23 changes: 10 additions & 13 deletions ccmlib/scylla_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import logging

import psutil
from ruamel.yaml import YAML
import glob
import re
import requests
from ruamel.yaml import YAML

from ccmlib.common import CASSANDRA_SH, BIN_DIR, wait_for, copy_directory, print_if_standalone

Expand All @@ -35,9 +35,6 @@
from ccmlib.utils.version import parse_version


yaml = YAML(typ='safe')
yaml.default_flow_style = False

class ScyllaNode(Node):

"""
Expand Down Expand Up @@ -117,7 +114,7 @@ def has_jmx(self):

@property
def scylla_yaml(self) -> Dict[str, Any]:
return yaml.load(Path(self.get_conf_dir()) / common.SCYLLA_CONF)
return YAML().load(Path(self.get_conf_dir()) / common.SCYLLA_CONF)

@property
def api_port(self) -> int:
Expand Down Expand Up @@ -383,18 +380,18 @@ def _create_agent_config(self):
data['s3'] = {"endpoint": os.getenv("AWS_S3_ENDPOINT"), "provider": "Minio"}

with open(conf_file, 'w') as f:
yaml.dump(data, f)
YAML().dump(data, f)
return conf_file

def update_agent_config(self, new_settings, restart_agent_after_change=True):
conf_file = os.path.join(self.get_conf_dir(), 'scylla-manager-agent.yaml')
with open(conf_file, 'r') as f:
current_config = yaml.load(f)
current_config = YAML().load(f)

current_config.update(new_settings)

with open(conf_file, 'w') as f:
yaml.dump(current_config, f)
YAML().dump(current_config, f)

if restart_agent_after_change:
self.restart_scylla_manager_agent(gently=True, recreate_config=False)
Expand Down Expand Up @@ -427,7 +424,7 @@ def start_scylla_manager_agent(self, create_config=True):
pid_file.write(str(self._process_agent.pid))

with open(config_file, 'r') as f:
current_config = yaml.load(f)
current_config = YAML().load(f)
# Extracting currently configured port
current_listening_port = int(current_config['https'].split(":")[1])

Expand Down Expand Up @@ -580,7 +577,7 @@ def start(self, join_ring=True, no_wait=False, verbose=False,
# from config file scylla#59
conf_file = os.path.join(self.get_conf_dir(), common.SCYLLA_CONF)
with open(conf_file, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)
jvm_args = jvm_args + ['--api-address', data['api_address']]

args = [launch_bin, '--options-file', options_file, '--log-to-stdout', '1']
Expand Down Expand Up @@ -1155,7 +1152,7 @@ def update_yaml(self):
# TODO: copied from node.py
conf_file = os.path.join(self.get_conf_dir(), common.SCYLLA_CONF)
with open(conf_file, 'r') as f:
data = yaml.load(f)
data = YAML().load(f)

data['cluster_name'] = self.cluster.name
data['auto_bootstrap'] = self.auto_bootstrap
Expand Down Expand Up @@ -1216,7 +1213,7 @@ def update_yaml(self):
if 'alternator_port' in data or 'alternator_https_port' in data:
data['alternator_address'] = data['listen_address']
with open(conf_file, 'w') as f:
yaml.dump(data, f)
YAML().dump(data, f)

# TODO: - for now create a cassandra conf file leaving only
# cassandra config items - this should be removed once tools are
Expand Down Expand Up @@ -1326,7 +1323,7 @@ def update_yaml(self):
cassandra_data[key] = data[key]

with open(cassandra_conf_file, 'w') as f:
yaml.dump(cassandra_data, f)
YAML().dump(cassandra_data, f)

def __update_yaml_dse(self):
raise NotImplementedError('ScyllaNode.__update_yaml_dse')
Expand Down
7 changes: 2 additions & 5 deletions ccmlib/scylla_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@
from typing import NamedTuple, Literal

import requests
from ruamel.yaml import YAML
import packaging.version
from ruamel.yaml import YAML

from ccmlib.common import (
ArgumentError, CCMError, get_default_path, rmdirs, validate_install_dir, get_scylla_version, aws_bucket_ls,
DOWNLOAD_IN_PROGRESS_FILE, print_if_standalone, LockFile, get_installed_scylla_package_hash)
from ccmlib.utils.download import download_file, download_version_from_s3, get_url_hash, save_source_file
from ccmlib.utils.version import parse_version

yaml = YAML()
yaml.default_flow_style = False


GIT_REPO = "http://github.com/scylladb/scylla.git"

Expand Down Expand Up @@ -189,7 +186,7 @@ def read_build_manifest(url):
#
# url-id: 2022-08-29T08:05:34Z
# docker-image-name: scylla-nightly:5.2.0-dev-0.20220829.67c91e8bcd61
return yaml.load(res.content)
return YAML().load(res.content)


def normalize_scylla_version(version):
Expand Down
6 changes: 2 additions & 4 deletions ccmlib/utils/sni_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

logger = logging.getLogger(__name__)

yaml = YAML()
yaml.default_flow_style = False

@contextmanager
def file_or_memory(path=None, data=None):
Expand Down Expand Up @@ -67,7 +65,7 @@ def encode_base64(filename):
currentContext='default')

with open(os.path.join(ssl_dir, 'config_data.yaml'), 'w') as config_file:
yaml.dump(config, config_file)
YAML().dump(config, config_file)

datacenters = {}

Expand All @@ -86,7 +84,7 @@ def encode_base64(filename):
currentContext='default')

with open(os.path.join(ssl_dir, 'config_path.yaml'), 'w') as config_file:
yaml.dump(config, config_file)
YAML().dump(config, config_file)

return os.path.join(ssl_dir, 'config_data.yaml'), os.path.join(ssl_dir, 'config_path.yaml')

Expand Down

0 comments on commit f8f225f

Please sign in to comment.