Skip to content

Commit

Permalink
ccmlib: do not pass or preserve thrift related info
Browse files Browse the repository at this point in the history
thrift was deprecated in both ScyllaDB and Cassandra. so there is
no need to pass it around. to be compatible with existing users of
the ccmlib, the thrift related parameters are preserved in public
intefaces, but they are not passed down anymore. in some places,
we enforce that the host on which thrift protocol is served should
be identical to that of binary. and actually, scylla always use
the same host address for both thrift and binary protocols. so we
replace the address like `self.network_interfaces['thrift'][0]`
with `self.network_interfaces['binary'][0]`.

Refs scylladb/scylladb#3811
Refs scylladb/scylladb#18416

Signed-off-by: Kefu Chai <[email protected]>
  • Loading branch information
tchaikov committed Apr 29, 2024
1 parent 60d6c89 commit 14cd022
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 63 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ After that execute:

ccm start

That will start 3 nodes on IP 127.0.0.[1, 2, 3] on port 9160 for thrift, port
That will start 3 nodes on IP 127.0.0.[1, 2, 3] on port 9042 for native transport, port
7000 for the internal cluster communication and ports 7100, 7200 and 7300 for JMX.
You can check that the cluster is correctly set up with

Expand Down Expand Up @@ -297,9 +297,9 @@ how to use ccmlib follows:
cluster.populate(3).start()
[node1, node2, node3] = cluster.nodelist()

# do some tests on the cluster/nodes. To connect to a node through thrift,
# do some tests on the cluster/nodes. To connect to a node through native protocol,
# the host and port to a node is available through
# node.network_interfaces['thrift']
# node.network_interfaces['binary]

cluster.flush()
node2.compact()
Expand Down
13 changes: 6 additions & 7 deletions ccmlib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,10 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N

def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None):
ipformat = self.get_ipformat()
binary = None
if parse_version(self.version()) >= parse_version('1.2'):
binary = self.get_binary_interface(i)
binary = self.get_binary_interface(i)
node = self.create_node(name=f'node{i}',
auto_bootstrap=auto_bootstrap,
thrift_interface=self.get_thrift_interface(i),
thrift_interface=None,
storage_interface=self.get_storage_interface(i),
jmx_port=str(self.get_node_jmx_port(i)),
remote_debug_port=str(self.get_debug_port(i) if debug else 0),
Expand All @@ -346,7 +344,7 @@ def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add
return node

def create_node(self, name, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None):
return Node(name, self, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface)
return Node(name, self, auto_bootstrap, None, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface)

def get_ipprefix(self):
return self.ipprefix if self.ipprefix is not None else '127.0.0.'
Expand All @@ -361,7 +359,7 @@ def get_binary_interface(self, nodeid):
return (self.get_node_ip(nodeid), 9042)

def get_thrift_interface(self, nodeid):
return (self.get_node_ip(nodeid), 9160)
raise NotImplementedError('thrift not supported')

def get_storage_interface(self, nodeid):
return (self.get_node_ip(nodeid), 7000)
Expand Down Expand Up @@ -491,9 +489,10 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_
if no_wait and not verbose:
time.sleep(2) # waiting 2 seconds to check for early errors and for the pid to be set
else:
assert parse_version(self.version()) >= parse_version("2.2")
for node, p, mark in started:
try:
start_message = "Listening for thrift clients..." if parse_version(self.version()) < parse_version("2.2") else "Starting listening for CQL clients"
start_message = "Starting listening for CQL clients"
node.watch_log_for(start_message, timeout=60, process=p, verbose=verbose, from_mark=mark)
except RuntimeError:
return None
Expand Down
18 changes: 5 additions & 13 deletions ccmlib/cmds/cluster_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,29 +321,21 @@ def get_parser(self):
def validate(self, parser, options, args):
Cmd.validate(self, parser, options, args, node_name=True, load_cluster=True, load_node=False)

if options.itfs is None and (options.thrift_itf is None or options.storage_itf is None or options.binary_itf is None):
if options.itfs is None and (options.storage_itf is None or options.binary_itf is None):
options.itfs = self.cluster.get_node_ip(len(self.cluster.nodelist())+1)

if options.thrift_itf is None:
options.thrift_itf = options.itfs
if options.storage_itf is None:
options.storage_itf = options.itfs
if options.binary_itf is None:
options.binary_itf = options.itfs

self.thrift = common.parse_interface(options.thrift_itf, 9160)
self.storage = common.parse_interface(options.storage_itf, 7000)
self.binary = common.parse_interface(options.binary_itf, 9042)

if self.binary[0] != self.thrift[0]:
print('Cannot set a binary address different from the thrift one', file=sys.stderr)
sys.exit(1)

used_binary_ips = [node.network_interfaces['binary'][0] for node in self.cluster.nodelist()]
used_thrift_ips = [node.network_interfaces['thrift'][0] for node in self.cluster.nodelist()]
used_storage_ips = [node.network_interfaces['storage'][0] for node in self.cluster.nodelist()]

if self.binary[0] in used_binary_ips or self.thrift[0] in used_thrift_ips or self.storage[0] in used_storage_ips:
if self.binary[0] in used_binary_ips or self.storage[0] in used_storage_ips:
print("One of the ips is already in use choose another.", file=sys.stderr)
parser.print_help()
sys.exit(1)
Expand All @@ -368,11 +360,11 @@ def run(self):
node_class = ScyllaDockerNode
else:
node_class = ScyllaNode
node = node_class(self.name, self.cluster, self.options.bootstrap, self.thrift, self.storage, self.jmx_port, self.remote_debug_port, self.initial_token, binary_interface=self.binary)
node = node_class(self.name, self.cluster, self.options.bootstrap, None, self.storage, self.jmx_port, self.remote_debug_port, self.initial_token, binary_interface=self.binary)
elif self.options.dse_node:
node = DseNode(self.name, self.cluster, self.options.bootstrap, self.thrift, self.storage, self.jmx_port, self.remote_debug_port, self.initial_token, binary_interface=self.binary)
node = DseNode(self.name, self.cluster, self.options.bootstrap, None, self.storage, self.jmx_port, self.remote_debug_port, self.initial_token, binary_interface=self.binary)
else:
node = Node(self.name, self.cluster, self.options.bootstrap, self.thrift, self.storage, self.jmx_port, self.remote_debug_port, self.initial_token, binary_interface=self.binary)
node = Node(self.name, self.cluster, self.options.bootstrap, None, self.storage, self.jmx_port, self.remote_debug_port, self.initial_token, binary_interface=self.binary)
self.cluster.add(node, self.options.is_seed, self.options.data_center)
except common.ArgumentError as e:
print(str(e), file=sys.stderr)
Expand Down
6 changes: 4 additions & 2 deletions ccmlib/dse_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def hasOpscenter(self):
return os.path.exists(os.path.join(self.get_path(), 'opscenter'))

def create_node(self, name, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None):
return DseNode(name, self, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface)
return DseNode(name, self, auto_bootstrap, None, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface)

def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_other_notice=False, jvm_args=None, profile_options=None, quiet_start=False):
if jvm_args is None:
Expand Down Expand Up @@ -80,7 +80,9 @@ def write_opscenter_cluster_config(self):
os.makedirs(cluster_conf)
if len(self.seeds) > 0:
seed = self.seeds[0]
(seed_ip, seed_port) = seed.network_interfaces['thrift']
# NOTE: should be use api_port, not storage_port. but we don't
# test DSE.
(seed_ip, seed_port) = seed.network_interfaces['storage']
seed_jmx = seed.jmx_port
with open(os.path.join(cluster_conf, self.name + '.conf'), 'w+') as f:
f.write('[jmx]\n')
Expand Down
8 changes: 4 additions & 4 deletions ccmlib/dse_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class DseNode(Node):
Provides interactions to a DSE node.
"""

def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None):
super(DseNode, self).__init__(name, cluster, auto_bootstrap, thrift_interface, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface)
def __init__(self, name, cluster, auto_bootstrap, _, storage_interface, jmx_port, remote_debug_port, initial_token, save=True, binary_interface=None):
super(DseNode, self).__init__(name, cluster, auto_bootstrap, None, storage_interface, jmx_port, remote_debug_port, initial_token, save, binary_interface)
self.get_cassandra_version()
if self.cluster.hasOpscenter():
self._copy_agent()
Expand Down Expand Up @@ -341,7 +341,7 @@ def __generate_server_xml(self):
with open(server_xml, 'w+') as f:
f.write('<Server port="8005" shutdown="SHUTDOWN">\n')
f.write(' <Service name="Solr">\n')
f.write(f" <Connector port=\"8983\" address=\"{self.network_interfaces['thrift'][0]}\" protocol=\"HTTP/1.1\" connectionTimeout=\"20000\" maxThreads = \"200\" URIEncoding=\"UTF-8\"/>\n")
f.write(f" <Connector port=\"8983\" address=\"{self.network_interfaces['binary'][0]}\" protocol=\"HTTP/1.1\" connectionTimeout=\"20000\" maxThreads = \"200\" URIEncoding=\"UTF-8\"/>\n")
f.write(' <Engine name="Solr" defaultHost="localhost">\n')
f.write(' <Host name="localhost" appBase="../solr/web"\n')
f.write(' unpackWARs="true" autoDeploy="true"\n')
Expand Down Expand Up @@ -391,7 +391,7 @@ def _write_agent_address_yaml(self, agent_dir):
address_yaml = os.path.join(agent_dir, 'conf', 'address.yaml')
if not os.path.exists(address_yaml):
with open(address_yaml, 'w+') as f:
(ip, port) = self.network_interfaces['thrift']
(ip, port) = self.network_interfaces['binary']
jmx = self.jmx_port
f.write('stomp_interface: 127.0.0.1\n')
f.write(f'local_interface: {ip}\n')
Expand Down
25 changes: 8 additions & 17 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte
self.cluster = cluster
self.status = Status.UNINITIALIZED
self.auto_bootstrap = auto_bootstrap
self.network_interfaces = {'thrift': common.normalize_interface(thrift_interface),
'storage': common.normalize_interface(storage_interface),
self.network_interfaces = {'storage': common.normalize_interface(storage_interface),
'binary': common.normalize_interface(binary_interface)}
self.jmx_port = jmx_port
self.remote_debug_port = remote_debug_port
Expand Down Expand Up @@ -148,7 +147,7 @@ def load(path, name, cluster):
binary_interface = None
if 'binary' in itf and itf['binary'] is not None:
binary_interface = tuple(itf['binary'])
node = cluster.create_node(data['name'], data['auto_bootstrap'], tuple(itf['thrift']), tuple(itf['storage']), data[
node = cluster.create_node(data['name'], data['auto_bootstrap'], None, tuple(itf['storage']), data[
'jmx_port'], remote_debug_port, initial_token, save=False, binary_interface=binary_interface)
node.status = data['status']
if 'pid' in data:
Expand Down Expand Up @@ -300,7 +299,6 @@ def show(self, only_status=False, show_cluster=True):
if show_cluster:
print(f"{indent}{'cluster'}={self.cluster.name}")
print(f"{indent}{'auto_bootstrap'}={self.auto_bootstrap}")
print(f"{indent}{'thrift'}={self.network_interfaces['thrift']}")
if self.network_interfaces['binary'] is not None:
print(f"{indent}{'binary'}={self.network_interfaces['binary']}")
print(f"{indent}{'storage'}={self.network_interfaces['storage']}")
Expand Down Expand Up @@ -886,7 +884,7 @@ def sqoop(self, sqoop_options=[]):
def bulkload(self, options):
loader_bin = common.join_bin(self.get_path(), 'bin', 'sstableloader')
env = self.get_env()
host, port = self.network_interfaces['thrift']
host, port = self.network_interfaces['binary']
args = ['-d', host, '-p', str(port)]
os.execve(loader_bin, [common.platform_binary('sstableloader')] + args + options, env)

Expand All @@ -910,11 +908,8 @@ def run_cqlsh(self, cmds=None, show_output=False, cqlsh_options=None, return_out
if extra_env:
env.update(extra_env)

host = self.network_interfaces['thrift'][0]
if self.get_base_cassandra_version() >= 2.1:
port = self.network_interfaces['binary'][1]
else:
port = self.network_interfaces['thrift'][1]
assert self.get_base_cassandra_version() >= 2.1
host, port = self.network_interfaces['binary']
args = cqlsh_options + [host, str(port)] if '--cloudconf' not in cqlsh_options else cqlsh_options
sys.stdout.flush()
if cmds is None:
Expand Down Expand Up @@ -1516,10 +1511,9 @@ def __clean_bat(self):

# escape the double quotes in name of the class directories
class_dir_pattern = "set CASSANDRA_CLASSPATH="
main_classes = "\\\"%CASSANDRA_HOME%\\build\\classes\\main\\\";"
thrift_classes = "\\\"%CASSANDRA_HOME%\\build\\classes\\thrift\\\""
main_classes = "\\\"%CASSANDRA_HOME%\\build\\classes\\main\\\""
common.replace_in_file(bat_file, class_dir_pattern, "set CASSANDRA_CLASSPATH=%CLASSPATH%;" +
main_classes + thrift_classes)
main_classes)

# background the server process and grab the pid
run_text = "\\\"%JAVA_HOME%\\bin\\java\\\" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% \\\"%CASSANDRA_MAIN%\\\""
Expand Down Expand Up @@ -1600,10 +1594,7 @@ def __update_yaml(self):
# cassandra 0.8
data['seed_provider'][0]['parameters'][0]['seeds'] = ','.join(self.cluster.get_seeds())
data['listen_address'], data['storage_port'] = self.network_interfaces['storage']
if self.get_base_cassandra_version() >= 4.0:
data['rpc_address'], data['native_transport_port'] = self.network_interfaces['thrift']
else:
data['rpc_address'], data['rpc_port'] = self.network_interfaces['thrift']
data['rpc_address'], data['native_transport_port'] = self.network_interfaces['binary']
if self.network_interfaces['binary'] is not None and self.get_base_cassandra_version() >= 1.2:
_, data['native_transport_port'] = self.network_interfaces['binary']

Expand Down
2 changes: 1 addition & 1 deletion ccmlib/scylla_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_node_jmx_port(self, nodeid):
def create_node(self, name, auto_bootstrap, thrift_interface,
storage_interface, jmx_port, remote_debug_port,
initial_token, save=True, binary_interface=None):
return ScyllaNode(name, self, auto_bootstrap, thrift_interface,
return ScyllaNode(name, self, auto_bootstrap, None,
storage_interface, jmx_port, remote_debug_port,
initial_token, save, binary_interface, scylla_manager=self._scylla_manager)

Expand Down
5 changes: 2 additions & 3 deletions ccmlib/scylla_docker_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def create_node(self, name, auto_bootstrap, thrift_interface,
storage_interface, jmx_port, remote_debug_port,
initial_token, save=True, binary_interface=None):

return ScyllaDockerNode(name, self, auto_bootstrap, thrift_interface,
return ScyllaDockerNode(name, self, auto_bootstrap, None,
storage_interface, jmx_port, remote_debug_port,
initial_token, save=save, binary_interface=binary_interface,
scylla_manager=self._scylla_manager)
Expand Down Expand Up @@ -150,7 +150,7 @@ def create_docker(self, args):
if not self.pid:
node1 = self.cluster.nodelist()[0]
if not self.name == node1.name:
seeds = f"--seeds {node1.network_interfaces['thrift'][0]}"
seeds = f"--seeds {node1.network_interfaces['storage'][0]}"
else:
seeds = ''
scylla_yaml = self.read_scylla_yaml()
Expand Down Expand Up @@ -243,7 +243,6 @@ def show(self, only_status=False, show_cluster=True):
if show_cluster:
print(f"{indent}{'cluster'}={self.cluster.name}")
print(f"{indent}{'auto_bootstrap'}={self.auto_bootstrap}")
print(f"{indent}{'thrift'}={self.network_interfaces['thrift']}")
if self.network_interfaces['binary'] is not None:
print(f"{indent}{'binary'}={self.network_interfaces['binary']}")
print(f"{indent}{'storage'}={self.network_interfaces['storage']}")
Expand Down
16 changes: 3 additions & 13 deletions ccmlib/scylla_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface,
self._relative_repos_root = None
self._has_jmx = None
super().__init__(name, cluster, auto_bootstrap,
thrift_interface, storage_interface,
None, storage_interface,
jmx_port, remote_debug_port,
initial_token, save, binary_interface)
self.__global_log_level = 'info'
Expand Down Expand Up @@ -1151,11 +1151,8 @@ def update_yaml(self):
','.join(self.cluster.get_seeds(node=self)))
data['listen_address'], data['storage_port'] = (
self.network_interfaces['storage'])
data['rpc_address'], data['rpc_port'] = (
self.network_interfaces['thrift'])
if (self.network_interfaces['binary'] is not None and
self.get_base_cassandra_version() >= 1.2):
_, data['native_transport_port'] = self.network_interfaces['binary']
assert self.network_interfaces['binary'] is not None
data['rpc_address'], data['native_transport_port'] = self.network_interfaces['binary']

# Use "workdir,W" instead of "workdir", because scylla defines this option this way
# and dtests compares names of used options with the names defined in scylla.
Expand Down Expand Up @@ -1286,12 +1283,6 @@ def update_yaml(self):
'rpc_interface': 0,
'rpc_interface_prefer_ipv6': 0,
'rpc_keepalive': 0,
'rpc_max_threads': 0,
'rpc_min_threads': 0,
'rpc_port': 0,
'rpc_recv_buff_size_in_bytes': 0,
'rpc_send_buff_size_in_bytes': 0,
'rpc_server_type': 0,
'seed_provider': 0,
'server_encryption_options': 0,
'snapshot_before_compaction': 0,
Expand All @@ -1302,7 +1293,6 @@ def update_yaml(self):
'storage_port': 0,
'stream_throughput_outbound_megabits_per_sec': 0,
'streaming_socket_timeout_in_ms': 0,
'thrift_framed_transport_size_in_mb': 0,
'tombstone_failure_threshold': 0,
'tombstone_warn_threshold': 0,
'trickle_fsync': 0,
Expand Down

0 comments on commit 14cd022

Please sign in to comment.