Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 6.0] ccmlib/node: check both pending and active tasks when waiting for compactions #612

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 44 additions & 42 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import warnings
from datetime import datetime
import locale
from collections import namedtuple
from collections import defaultdict, namedtuple

from ruamel.yaml import YAML

Expand Down Expand Up @@ -78,6 +78,9 @@ def __decode(self, value):

# Groups: 0 = ks, 1 = cf, 2 = tmp or none, 3 = version, 4 = identifier (generation), 4 = "big-" or none, 5 = suffix (Compacted or Data.db)
_sstable_regexp = re.compile(r'((?P<keyspace>[^\s-]+)-(?P<cf>[^\s-]+)-)?(?P<tmp>tmp(link)?-)?(?P<version>[^\s-]+)-(?P<identifier>[^-]+)-(?P<big>big-)?(?P<suffix>[a-zA-Z]+)\.[a-zA-Z0-9]+$')
# Regexes for parsing nodetool compactionstats
_pending_tasks_pattern = re.compile(r'- (?P<ks>\w+)\.(?P<cf>\w+): (?P<tasks>\d+)')
_active_tasks_pattern = re.compile(r'\s*([\w-]+)\s+\w+\s+(?P<ks>\w+)\s+(?P<cf>\w+)\s+\d+\s+\d+\s+\w+\s+\d+\.\d+%')


class Node(object):
Expand Down Expand Up @@ -743,58 +746,57 @@ def stop(self, wait=True, wait_other_notice=False, other_nodes=None, gently=True
return False

@staticmethod
def _parse_pending_tasks(output, keyspace, column_family):
# "nodetool compactionstats" prints the compaction stats like:
# pending tasks: 42
# - ks1.cf1: 13
# - ks1.cf2: 19
# - ks2.cf1: 10
head_pattern = r"pending tasks:\s*(?P<tasks>\d+)"
tail_pattern = re.compile(r'\-\s+(\w+)\.(\w+):\s*(\d+)')
head, *tail = output.strip().split('\n')
def _parse_tasks(output: str, keyspace: str, column_family: str):
"""
Returns the total number of tasks

`nodetool compactionstats` prints the compaction stats like:
```
pending tasks: 42
- ks1.cf1: 13
- ks2.cf2: 19
- ks3.cf3: 10

id compaction type keyspace table completed total unit progress
55eaee80-7445-11ef-9197-2931a44dadc4 COMPACTION ks3 cf3 32116 55680 keys 57.68%
55e8f2b0-7445-11ef-b438-2930a44dadc4 COMPACTION ks4 cf4 46789 55936 keys 83.65%
```
"""
lines = output.strip().splitlines()
tasks = defaultdict(int)

for line in lines:
line = line.strip()
if match := _pending_tasks_pattern.match(line):
tasks[(match.group("ks"), match.group("cf"))] += int(match.group("tasks"))
elif match := _active_tasks_pattern.match(line):
tasks[(match.group("ks"), match.group("cf"))] += 1

if keyspace is None and column_family is None:
matched = re.search(head_pattern, head)
if not matched:
raise RuntimeError(f"Cannot find 'pending tasks' in nodetool output.\nOutput: {output}")
return int(matched.group('tasks'))

# if keyspace or column_family is specified, check active tasks
# of the specified ks.cf instead
def matches(expected, actual):
if expected is None:
return True
return expected == actual
return sum(tasks.values())
elif keyspace is not None and column_family is None:
return sum(num_tasks for (ks, _), num_tasks in tasks.items() if ks == keyspace)
elif keyspace is not None and column_family is not None:
return tasks.get((keyspace, column_family), 0)

total = 0
for line in tail:
m = tail_pattern.search(line)
if not m:
break
ks, cf, num = m.groups()
if matches(keyspace, ks) and matches(column_family, cf):
total += int(num)
return total

def wait_for_compactions(self,
keyspace=None,
column_family=None,
idle_timeout=300):
def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle_timeout=300):
"""Wait for all compactions to finish on this node.

:param keyspace: only wait for the compactions performed for specified
keyspace. if not specified, all keyspaces are waited
:param column_family: only wait for the compactions performed for
specified column_family. if not specified, all keyspaces are waited
:param keyspace: only wait for the compactions performed for specified keyspace.
If not specified, all keyspaces are waited.
Must be provided if collumn_family is provided.
:param column_family: only wait for the compactions performed for specified column family.
If not specified, all column families are waited.
:param idle_timeout: the time in seconds to wait for progress.
Total time to wait is undeteremined, as long as we observe forward
progress.
Total time to wait is undeteremined, as long as we observe forward progress.
"""
if column_family is not None and keyspace is None:
raise ValueError("Cannot search only by column family, need also keyspace")
pending_tasks = -1
last_change = None
while not last_change or time.time() - last_change < idle_timeout:
output, err = self.nodetool("compactionstats", capture_output=True)
n = self._parse_pending_tasks(output, keyspace, column_family)
n = self._parse_tasks(output, keyspace, column_family)
# no active tasks, good!
if n == 0:
return
Expand Down
122 changes: 87 additions & 35 deletions tests/test_internal_functions.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,92 @@
import pytest
import textwrap
from ccmlib.node import Node

def test_parse_pending_tasks():
def verify_result(output, keyspace, column_family, expected):
n = Node._parse_pending_tasks(output, keyspace, column_family)
assert n == expected
# Define the test cases and corresponding outputs
test_cases = [
{
"id": "only_pending",
"output": textwrap.dedent("""\
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3\
"""),
"expected_tasks": [
("system_schema", "tables", 1),
("system_schema", "columns", 2),
("system_schema", None, 3),
("keyspace1", "standard1", 3),
("keyspace1", None, 3),
(None, None, 6),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "pending_and_in_progress",
"output": textwrap.dedent("""\
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3

def verify_cases(output, cases):
for ks, cf, expected in cases:
verify_result(output, ks, cf, expected)
id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16%
Active compaction remaining time : n/a\
"""),
"expected_tasks": [
("system_schema", "tables", 1),
("system_schema", "columns", 3),
("system_schema", None, 4),
("keyspace1", "standard1", 3),
("keyspace1", None, 3),
(None, None, 7),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "only_in_progress",
"output": textwrap.dedent("""\
pending tasks: 0

for output in [
'''
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3
''',
'''
pending tasks: 6
- system_schema.tables: 1
- system_schema.columns: 2
- keyspace1.standard1: 3
id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640 keys 0.16%
Active compaction remaining time : n/a\
"""),
"expected_tasks": [
("system_schema", "tables", 0),
("system_schema", "columns", 1),
("system_schema", None, 1),
("keyspace1", "standard1", 0),
("keyspace1", None, 0),
(None, None, 1),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
},
{
"id": "no_tasks",
"output": textwrap.dedent("""\
pending tasks: 0
\
"""),
"expected_tasks": [
("system_schema", "tables", 0),
("system_schema", "columns", 0),
("system_schema", None, 0),
("keyspace1", "standard1", 0),
("keyspace1", None, 0),
(None, None, 0),
("keyspace1x", None, 0),
("keyspace1x", "table1x", 0),
]
}
]

id compaction type keyspace table completed total unit progress
8e1f2d90-a252-11ee-a7f4-1bf9ae4e6ffd COMPACTION system_schema columns 1 640
keys 0.16%
Active compaction remaining time : n/a
'''
]:
verify_cases(output, [
('system_schema', 'tables', 1),
('system_schema', 'columns', 2),
('system_schema', None, 3),
('keyspace1', 'standard1', 3),
('keyspace1', None, 3),
(None, None, 6),
('keyspace1x', None, 0),
('keyspace1x', 'table1x', 0),
])
@pytest.mark.parametrize("output, expected_tasks", [pytest.param(t["output"], t["expected_tasks"], id=t["id"]) for t in test_cases])
def test_parse_tasks(output, expected_tasks):
for ks, cf, expected in expected_tasks:
n = Node._parse_tasks(output, ks, cf)
assert n == expected, f"Expected {expected} tasks for {ks}.{cf}, but got {n}"
Loading