diff --git a/ccmlib/node.py b/ccmlib/node.py index 4d6e9f9c..302bdcaf 100644 --- a/ccmlib/node.py +++ b/ccmlib/node.py @@ -15,7 +15,7 @@ import warnings from datetime import datetime import locale -from collections import namedtuple +from collections import defaultdict, namedtuple from ruamel.yaml import YAML @@ -78,6 +78,9 @@ def __decode(self, value): # Groups: 0 = ks, 1 = cf, 2 = tmp or none, 3 = version, 4 = identifier (generation), 4 = "big-" or none, 5 = suffix (Compacted or Data.db) _sstable_regexp = re.compile(r'((?P[^\s-]+)-(?P[^\s-]+)-)?(?Ptmp(link)?-)?(?P[^\s-]+)-(?P[^-]+)-(?Pbig-)?(?P[a-zA-Z]+)\.[a-zA-Z0-9]+$') +# Regexes for parsing nodetool compactionstats +_pending_tasks_pattern = re.compile(r'- (?P\w+)\.(?P\w+): (?P\d+)') +_active_tasks_pattern = re.compile(r'\s*([\w-]+)\s+\w+\s+(?P\w+)\s+(?P\w+)\s+\d+\s+\d+\s+\w+\s+\d+\.\d+%') class Node(object): @@ -743,58 +746,57 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True return False @staticmethod - def _parse_pending_tasks(output, keyspace, column_family): - # "nodetool compactionstats" prints the compaction stats like: - # pending tasks: 42 - # - ks1.cf1: 13 - # - ks1.cf2: 19 - # - ks2.cf1: 10 - head_pattern = r"pending tasks:\s*(?P\d+)" - tail_pattern = re.compile(r'\-\s+(\w+)\.(\w+):\s*(\d+)') - head, *tail = output.strip().split('\n') + def _parse_tasks(output: str, keyspace: str, column_family: str): + """ + Returns the total number of tasks + + `nodetool compactionstats` prints the compaction stats like: + ``` + pending tasks: 42 + - ks1.cf1: 13 + - ks2.cf2: 19 + - ks3.cf3: 10 + + id compaction type keyspace table completed total unit progress + 55eaee80-7445-11ef-9197-2931a44dadc4 COMPACTION ks3 cf3 32116 55680 keys 57.68% + 55e8f2b0-7445-11ef-b438-2930a44dadc4 COMPACTION ks4 cf4 46789 55936 keys 83.65% + ``` + """ + lines = output.strip().splitlines() + tasks = defaultdict(int) + + for line in lines: + line = line.strip() + if match := _pending_tasks_pattern.match(line): + tasks[(match.group("ks"), match.group("cf"))] += int(match.group("tasks")) + elif match := _active_tasks_pattern.match(line): + tasks[(match.group("ks"), match.group("cf"))] += 1 if keyspace is None and column_family is None: - matched = re.search(head_pattern, head) - if not matched: - raise RuntimeError(f"Cannot find 'pending tasks' in nodetool output.\nOutput: {output}") - return int(matched.group('tasks')) - - # if keyspace or column_family is specified, check active tasks - # of the specified ks.cf instead - def matches(expected, actual): - if expected is None: - return True - return expected == actual + return sum(tasks.values()) + elif keyspace is not None and column_family is None: + return sum(num_tasks for (ks, _), num_tasks in tasks.items() if ks == keyspace) + elif keyspace is not None and column_family is not None: + return tasks.get((keyspace, column_family), 0) - total = 0 - for line in tail: - m = tail_pattern.search(line) - if not m: - break - ks, cf, num = m.groups() - if matches(keyspace, ks) and matches(column_family, cf): - total += int(num) - return total - - def wait_for_compactions(self, - keyspace=None, - column_family=None, - idle_timeout=300): + def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle_timeout=300): """Wait for all compactions to finish on this node. - :param keyspace: only wait for the compactions performed for specified - keyspace. if not specified, all keyspaces are waited - :param column_family: only wait for the compactions performed for - specified column_family. if not specified, all keyspaces are waited + :param keyspace: only wait for the compactions performed for specified keyspace. + If not specified, all keyspaces are waited. + Must be provided if collumn_family is provided. + :param column_family: only wait for the compactions performed for specified column family. + If not specified, all column families are waited. :param idle_timeout: the time in seconds to wait for progress. - Total time to wait is undeteremined, as long as we observe forward - progress. + Total time to wait is undeteremined, as long as we observe forward progress. """ + if column_family is not None and keyspace is None: + raise ValueError("Cannot search only by column family, need also keyspace") pending_tasks = -1 last_change = None while not last_change or time.time() - last_change < idle_timeout: output, err = self.nodetool("compactionstats", capture_output=True) - n = self._parse_pending_tasks(output, keyspace, column_family) + n = self._parse_tasks(output, keyspace, column_family) # no active tasks, good! if n == 0: return diff --git a/tests/test_internal_functions.py b/tests/test_internal_functions.py index 7dbc0f0c..f69497e1 100644 --- a/tests/test_internal_functions.py +++ b/tests/test_internal_functions.py @@ -1,40 +1,92 @@ +import pytest +import textwrap from ccmlib.node import Node -def test_parse_pending_tasks(): - def verify_result(output, keyspace, column_family, expected): - n = Node._parse_pending_tasks(output, keyspace, column_family) - assert n == expected +# Define the test cases and corresponding outputs +test_cases = [ + { + "id": "only_pending", + "output": textwrap.dedent("""\ + pending tasks: 6 + - system_schema.tables: 1 + - system_schema.columns: 2 + - keyspace1.standard1: 3\ + """), + "expected_tasks": [ + ("system_schema", "tables", 1), + ("system_schema", "columns", 2), + ("system_schema", None, 3), + ("keyspace1", "standard1", 3), + ("keyspace1", None, 3), + (None, None, 6), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + }, + { + "id": "pending_and_in_progress", + "output": textwrap.dedent("""\ + pending tasks: 6 + - system_schema.tables: 1 + - system_schema.columns: 2 + - keyspace1.standard1: 3 - def verify_cases(output, cases): - for ks, cf, expected in cases: - verify_result(output, ks, cf, expected) + id compaction type keyspace table completed total unit progress + 8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16% + Active compaction remaining time : n/a\ + """), + "expected_tasks": [ + ("system_schema", "tables", 1), + ("system_schema", "columns", 3), + ("system_schema", None, 4), + ("keyspace1", "standard1", 3), + ("keyspace1", None, 3), + (None, None, 7), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + }, + { + "id": "only_in_progress", + "output": textwrap.dedent("""\ + pending tasks: 0 - for output in [ - ''' - pending tasks: 6 - - system_schema.tables: 1 - - system_schema.columns: 2 - - keyspace1.standard1: 3 - ''', - ''' - pending tasks: 6 - - system_schema.tables: 1 - - system_schema.columns: 2 - - keyspace1.standard1: 3 + id compaction type keyspace table completed total unit progress + 8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16% + Active compaction remaining time : n/a\ + """), + "expected_tasks": [ + ("system_schema", "tables", 0), + ("system_schema", "columns", 1), + ("system_schema", None, 1), + ("keyspace1", "standard1", 0), + ("keyspace1", None, 0), + (None, None, 1), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + }, + { + "id": "no_tasks", + "output": textwrap.dedent("""\ + pending tasks: 0 + \ + """), + "expected_tasks": [ + ("system_schema", "tables", 0), + ("system_schema", "columns", 0), + ("system_schema", None, 0), + ("keyspace1", "standard1", 0), + ("keyspace1", None, 0), + (None, None, 0), + ("keyspace1x", None, 0), + ("keyspace1x", "table1x", 0), + ] + } +] - id compaction type keyspace table completed total unit progress - 8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 - keys 0.16% - Active compaction remaining time : n/a - ''' - ]: - verify_cases(output, [ - ('system_schema', 'tables', 1), - ('system_schema', 'columns', 2), - ('system_schema', None, 3), - ('keyspace1', 'standard1', 3), - ('keyspace1', None, 3), - (None, None, 6), - ('keyspace1x', None, 0), - ('keyspace1x', 'table1x', 0), - ]) +@pytest.mark.parametrize("output, expected_tasks", [pytest.param(t["output"], t["expected_tasks"], id=t["id"]) for t in test_cases]) +def test_parse_tasks(output, expected_tasks): + for ks, cf, expected in expected_tasks: + n = Node._parse_tasks(output, ks, cf) + assert n == expected, f"Expected {expected} tasks for {ks}.{cf}, but got {n}"