From 9067fba379ea920e241f49ff61172fe65e4a5c5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 15 Jan 2024 19:08:26 +0100 Subject: [PATCH 1/2] test(repair): add tests with deprecated float intensity --- .../repair/service_repair_integration_test.go | 54 +++++++++++++++---- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/pkg/service/repair/service_repair_integration_test.go b/pkg/service/repair/service_repair_integration_test.go index 7abd105166..8a71091b61 100644 --- a/pkg/service/repair/service_repair_integration_test.go +++ b/pkg/service/repair/service_repair_integration_test.go @@ -270,6 +270,7 @@ func (h *repairTestHelper) assertParallelIntensity(parallel, intensity int) { if err != nil { h.T.Fatal(err) } + if p.Parallel != parallel || int(p.Intensity) != intensity { h.T.Fatalf("Expected parallel %d, intensity %d, got parallel %d, intensity %d", parallel, intensity, p.Parallel, int(p.Intensity)) } @@ -1416,10 +1417,18 @@ func TestServiceRepairIntegration(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + const ( + propParallel = 0 + propIntensity = 0 + controlParallel = 1 + controlIntensity = 1 + deprecatedControlIntensity = 0.5 + ) + Print("When: run repair") props := singleUnit(map[string]any{ - "parallel": 1, - "intensity": 1, + "parallel": propParallel, + "intensity": propIntensity, "small_table_threshold": -1, }) h.runRepair(ctx, props) @@ -1428,18 +1437,18 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertRunning(shortWait) Print("Then: assert parallel/intensity from properties") - h.assertParallelIntensity(1, 1) + h.assertParallelIntensity(propParallel, propIntensity) Print("And: control parallel and intensity") - if err := h.service.SetParallel(ctx, h.ClusterID, 0); err != nil { + if err := h.service.SetParallel(ctx, h.ClusterID, controlParallel); err != nil { t.Fatal(err) } - if err := h.service.SetIntensity(ctx, h.ClusterID, 0); err != nil { + if err := h.service.SetIntensity(ctx, h.ClusterID, deprecatedControlIntensity); err != nil { t.Fatal(err) } Print("Then: assert parallel/intensity from control") - h.assertParallelIntensity(0, 0) + h.assertParallelIntensity(controlParallel, controlIntensity) Print("And: repair is stopped") cancel() @@ -1458,13 +1467,13 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertRunning(shortWait) Print("Then: assert resumed, running parallel/intensity from control") - h.assertParallelIntensity(0, 0) + h.assertParallelIntensity(controlParallel, controlIntensity) Print("Then: repair is done") h.assertDone(3 * longWait) Print("Then: assert resumed, finished parallel/intensity from control") - h.assertParallelIntensity(0, 0) + h.assertParallelIntensity(controlParallel, controlIntensity) Print("And: run fresh repair") h.RunID = uuid.NewTime() @@ -1474,13 +1483,13 @@ func TestServiceRepairIntegration(t *testing.T) { h.assertRunning(shortWait) Print("Then: assert fresh, running parallel/intensity from properties") - h.assertParallelIntensity(1, 1) + h.assertParallelIntensity(propParallel, propIntensity) Print("Then: repair is done") h.assertDone(3 * longWait) Print("Then: assert fresh, finished repair parallel/intensity from control") - h.assertParallelIntensity(1, 1) + h.assertParallelIntensity(propParallel, propIntensity) }) t.Run("repair restart no continue", func(t *testing.T) { @@ -1907,6 +1916,31 @@ func TestServiceRepairIntegration(t *testing.T) { } }) + t.Run("deprecated float intensity", func(t *testing.T) { + h := newRepairTestHelper(t, session, defaultConfig()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + Print("When: run repair") + h.runRepair(ctx, multipleUnits(map[string]any{ + "intensity": 0.5, + })) + + Print("Then: repair is running") + h.assertRunning(shortWait) + + p, err := h.service.GetProgress(ctx, h.ClusterID, h.TaskID, h.RunID) + if err != nil { + t.Fatal(err) + } + if p.Intensity != 1 { + t.Fatalf("Expected default intensity (1) when using float value, got: %v", p.Intensity) + } + + Print("And: repair is done") + h.assertDone(longWait) + }) + t.Run("repair status context timeout", func(t *testing.T) { h := newRepairTestHelper(t, session, defaultConfig()) ctx, cancel := context.WithCancel(context.Background()) From 0acf5b09984b4a694afe86abf550dd639095a39f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 15 Jan 2024 19:16:09 +0100 Subject: [PATCH 2/2] fix(repair): get rid of float intensity in internals There are still some bugs (one described in the issue below) regarding using float intensity. In order to get rid of them, we should only tolerate float intensity at the entrypoint, but completely remove it from SM repair internals (both in service and progress display). We keep float intensity in task properties and in swagger endpoint parameters, but we convert it to int intensity as a first thing in internals, which happens in: - GetTarget (from task properties) - sctool repair control endpoint (from query param) Fixes #3665 --- pkg/command/flag/type.go | 2 +- pkg/service/repair/controller.go | 12 ++-- pkg/service/repair/controller_test.go | 20 ++----- pkg/service/repair/model.go | 42 +++++++++----- pkg/service/repair/plan.go | 16 +++--- pkg/service/repair/service.go | 79 ++++++++++----------------- 6 files changed, 80 insertions(+), 91 deletions(-) diff --git a/pkg/command/flag/type.go b/pkg/command/flag/type.go index c9943a076f..e60122cf9e 100644 --- a/pkg/command/flag/type.go +++ b/pkg/command/flag/type.go @@ -238,7 +238,7 @@ func (fl *Intensity) String() string { // Set implements pflag.Value. func (fl *Intensity) Set(s string) error { - errValidation := errors.New("intensity must be an integer >= 1 or a decimal between (0,1)") + errValidation := errors.New("intensity must be a non-negative integer") f, err := strconv.ParseFloat(s, 64) if err != nil { diff --git a/pkg/service/repair/controller.go b/pkg/service/repair/controller.go index 7798625356..1f66de98ee 100644 --- a/pkg/service/repair/controller.go +++ b/pkg/service/repair/controller.go @@ -5,16 +5,16 @@ package repair // controller informs generator about the amount of ranges that can be repaired // on a given replica set. Returns 0 ranges when repair shouldn't be scheduled. type controller interface { - TryBlock(replicaSet []string) (ranges int) + TryBlock(replicaSet []string) (ranges Intensity) Unblock(replicaSet []string) Busy() bool } type intensityChecker interface { - Intensity() int + Intensity() Intensity Parallel() int MaxParallel() int - ReplicaSetMaxIntensity(replicaSet []string) int + ReplicaSetMaxIntensity(replicaSet []string) Intensity } // rowLevelRepairController is a specialised controller for row-level repair. @@ -37,15 +37,15 @@ func newRowLevelRepairController(i intensityChecker) *rowLevelRepairController { } } -func (c *rowLevelRepairController) TryBlock(replicaSet []string) int { +func (c *rowLevelRepairController) TryBlock(replicaSet []string) Intensity { if !c.shouldBlock(replicaSet) { return 0 } c.block(replicaSet) i := c.intensity.Intensity() - if max := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || max < i { - i = max + if maxI := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || maxI < i { + i = maxI } return i } diff --git a/pkg/service/repair/controller_test.go b/pkg/service/repair/controller_test.go index 9a7b35ccc6..4b95c18424 100644 --- a/pkg/service/repair/controller_test.go +++ b/pkg/service/repair/controller_test.go @@ -21,7 +21,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { node6 = "192.168.1.6" ) - maxRangesPerHost := map[string]int{ + maxRangesPerHost := map[string]Intensity{ node1: 20, node2: 19, node3: 18, @@ -33,7 +33,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { return &intensityHandler{ logger: log.Logger{}, maxHostIntensity: maxRangesPerHost, - intensity: atomic.NewFloat64(defaultIntensity), + intensity: atomic.NewInt64(int64(defaultIntensity)), maxParallel: 3, parallel: atomic.NewInt64(defaultParallel), poolController: workerpool.New[*worker, job, jobResult](context.Background(), func(ctx context.Context, id int) *worker { @@ -69,9 +69,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetIntensity(context.Background(), expectedNrOfRanges); err != nil { - t.Fatalf("unexpected error = {%v}", err) - } + ih.SetIntensity(context.Background(), expectedNrOfRanges) if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges { t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount) } @@ -87,9 +85,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetIntensity(context.Background(), float64(expectedNrOfRanges)); err != nil { - t.Fatalf("unexpected error = {%v}", err) - } + ih.SetIntensity(context.Background(), expectedNrOfRanges) if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges { t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount) } @@ -106,9 +102,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetIntensity(context.Background(), intensity); err != nil { - t.Fatalf("unexpected error = {%v}", err) - } + ih.SetIntensity(context.Background(), intensity) if rangesCount := c.TryBlock(replicaSet); rangesCount != minRangesInParallel { t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", minRangesInParallel, rangesCount) } @@ -122,9 +116,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetParallel(context.Background(), 1); err != nil { - t.Fatalf("unexpected error {%v}", err) - } + ih.SetParallel(context.Background(), 1) if rangesCount := c.TryBlock(replicaSet1); rangesCount == 0 { t.Fatal("expected to let in, but was denied") } diff --git a/pkg/service/repair/model.go b/pkg/service/repair/model.go index 448c86fe52..6872a3f98b 100644 --- a/pkg/service/repair/model.go +++ b/pkg/service/repair/model.go @@ -13,17 +13,33 @@ import ( // Unit represents keyspace and its tables. type Unit = ksfilter.Unit +// Intensity represents parsed internal intensity. +type Intensity int + +// NewIntensity returns Intensity. +func NewIntensity(i int) Intensity { + return Intensity(i) +} + +// NewIntensityFromDeprecated returns Intensity parsed from deprecated float value. +func NewIntensityFromDeprecated(i float64) Intensity { + if 0 < i && i < 1 { + return defaultIntensity + } + return Intensity(i) +} + // Target specifies what shall be repaired. type Target struct { - Units []Unit `json:"units"` - DC []string `json:"dc"` - Host string `json:"host,omitempty"` - IgnoreHosts []string `json:"ignore_hosts,omitempty"` - FailFast bool `json:"fail_fast"` - Continue bool `json:"continue"` - Intensity float64 `json:"intensity"` - Parallel int `json:"parallel"` - SmallTableThreshold int64 `json:"small_table_threshold"` + Units []Unit `json:"units"` + DC []string `json:"dc"` + Host string `json:"host,omitempty"` + IgnoreHosts []string `json:"ignore_hosts,omitempty"` + FailFast bool `json:"fail_fast"` + Continue bool `json:"continue"` + Intensity Intensity `json:"intensity"` + Parallel int `json:"parallel"` + SmallTableThreshold int64 `json:"small_table_threshold"` // Cache for repair plan so that it does not have to be generated // in both GetTarget and Repair functions. plan *plan `json:"-"` @@ -48,7 +64,7 @@ func defaultTaskProperties() *taskProperties { Keyspace: []string{"*", "!system_traces"}, Continue: true, - Intensity: defaultIntensity, + Intensity: float64(defaultIntensity), // Consider 1GB table as small by default. SmallTableThreshold: 1 * 1024 * 1024 * 1024, @@ -64,7 +80,7 @@ type Run struct { DC []string Host string Parallel int - Intensity int + Intensity Intensity PrevID uuid.UUID StartTime time.Time EndTime time.Time @@ -169,8 +185,8 @@ type Progress struct { Host string `json:"host"` Hosts []HostProgress `json:"hosts"` Tables []TableProgress `json:"tables"` - MaxIntensity float64 `json:"max_intensity"` - Intensity float64 `json:"intensity"` + MaxIntensity Intensity `json:"max_intensity"` + Intensity Intensity `json:"intensity"` MaxParallel int `json:"max_parallel"` Parallel int `json:"parallel"` } diff --git a/pkg/service/repair/plan.go b/pkg/service/repair/plan.go index bd83f91e16..07bd779913 100644 --- a/pkg/service/repair/plan.go +++ b/pkg/service/repair/plan.go @@ -20,7 +20,7 @@ type plan struct { SkippedKeyspaces []string MaxParallel int - MaxHostIntensity map[string]int + MaxHostIntensity map[string]Intensity HostTableSize map[scyllaclient.HostKeyspaceTable]int64 } @@ -296,22 +296,22 @@ func (p *plan) KeyspaceRangesMap() map[string]int64 { return out } -func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]int { - out := make(map[string]int, len(shards)) +func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]Intensity { + out := make(map[string]Intensity, len(shards)) for h, sh := range shards { out[h] = maxRepairRangesInParallel(sh, memory[h]) } return out } -func maxRepairRangesInParallel(shards uint, totalMemory int64) int { +func maxRepairRangesInParallel(shards uint, totalMemory int64) Intensity { const MiB = 1024 * 1024 memoryPerShard := totalMemory / int64(shards) max := int(0.1 * float64(memoryPerShard) / (32 * MiB) / 4) if max == 0 { max = 1 } - return max + return NewIntensity(max) } // keyspacePlan describes repair schedule and state for keyspace. @@ -340,20 +340,20 @@ func (kp keyspacePlan) IsTableRepaired(tabIdx int) bool { } // GetRangesToRepair returns at most cnt ranges of table owned by replica set. -func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx, cnt int) []scyllaclient.TokenRange { +func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx int, intensity Intensity) []scyllaclient.TokenRange { rep := kp.Replicas[repIdx] tp := kp.Tables[tabIdx] // Return all ranges for optimized or deleted table if tp.Optimize || tp.Deleted { - cnt = len(rep.Ranges) + intensity = NewIntensity(len(rep.Ranges)) } var out []scyllaclient.TokenRange for _, r := range rep.Ranges { if tp.MarkRange(repIdx, r) { out = append(out, r) - if len(out) >= cnt { + if NewIntensity(len(out)) >= intensity { break } } diff --git a/pkg/service/repair/service.go b/pkg/service/repair/service.go index 38804c56cf..b9f36ed279 100644 --- a/pkg/service/repair/service.go +++ b/pkg/service/repair/service.go @@ -89,7 +89,7 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties Host: props.Host, FailFast: props.FailFast, Continue: props.Continue, - Intensity: props.Intensity, + Intensity: NewIntensityFromDeprecated(props.Intensity), Parallel: props.Parallel, SmallTableThreshold: props.SmallTableThreshold, } @@ -210,7 +210,7 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID DC: target.DC, Host: target.Host, Parallel: target.Parallel, - Intensity: int(target.Intensity), + Intensity: target.Intensity, StartTime: timeutc.Now(), } if err := s.putRun(run); err != nil { @@ -258,12 +258,8 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID defer cleanup() // Set controlled parameters - if err := s.SetParallel(ctx, clusterID, run.Parallel); err != nil { - return errors.Wrap(err, "set initial parallel") - } - if err := s.SetIntensity(ctx, clusterID, float64(run.Intensity)); err != nil { - return errors.Wrap(err, "set initial intensity") - } + ih.SetParallel(ctx, run.Parallel) + ih.SetIntensity(ctx, run.Intensity) // Give generator the ability to read parallel/intensity and // to submit and receive results from worker pool. @@ -318,14 +314,14 @@ func (s *Service) killAllRepairs(ctx context.Context, client *scyllaclient.Clien } func (s *Service) newIntensityHandler(ctx context.Context, clusterID, taskID, runID uuid.UUID, - maxHostIntensity map[string]int, maxParallel int, poolController sizeSetter, + maxHostIntensity map[string]Intensity, maxParallel int, poolController sizeSetter, ) (ih *intensityHandler, cleanup func()) { ih = &intensityHandler{ taskID: taskID, runID: runID, logger: s.logger.Named("control"), maxHostIntensity: maxHostIntensity, - intensity: &atomic.Float64{}, + intensity: &atomic.Int64{}, maxParallel: maxParallel, parallel: &atomic.Int64{}, poolController: poolController, @@ -385,18 +381,18 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid return Progress{}, errors.Wrap(err, "aggregate progress") } p.Parallel = run.Parallel - p.Intensity = float64(run.Intensity) + p.Intensity = run.Intensity // Set max parallel/intensity only for running tasks s.mu.Lock() if ih, ok := s.intensityHandlers[clusterID]; ok { - maxI := 0 + maxI := NewIntensity(0) for _, v := range ih.MaxHostIntensity() { if maxI < v { maxI = v } } - p.MaxIntensity = float64(maxI) + p.MaxIntensity = maxI p.MaxParallel = ih.MaxParallel() } s.mu.Unlock() @@ -415,10 +411,10 @@ func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensi if !ok { return errors.Wrap(service.ErrNotFound, "repair task") } - - if err := ih.SetIntensity(ctx, intensity); err != nil { - return errors.Wrap(err, "set intensity") + if intensity < 0 { + return service.ErrValidate(errors.Errorf("setting invalid intensity value %.2f", intensity)) } + ih.SetIntensity(ctx, NewIntensityFromDeprecated(intensity)) err := table.RepairRun.UpdateBuilder("intensity").Query(s.session).BindMap(qb.M{ "cluster_id": clusterID, @@ -438,10 +434,10 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel if !ok { return errors.Wrap(service.ErrNotFound, "repair task") } - - if err := ih.SetParallel(ctx, parallel); err != nil { - return errors.Wrap(err, "set parallel") + if parallel < 0 { + return service.ErrValidate(errors.Errorf("setting invalid parallel value %d", parallel)) } + ih.SetParallel(ctx, parallel) err := table.RepairRun.UpdateBuilder("parallel").Query(s.session).BindMap(qb.M{ "cluster_id": clusterID, @@ -460,37 +456,28 @@ type intensityHandler struct { taskID uuid.UUID runID uuid.UUID logger log.Logger - maxHostIntensity map[string]int - intensity *atomic.Float64 + maxHostIntensity map[string]Intensity + intensity *atomic.Int64 maxParallel int parallel *atomic.Int64 poolController sizeSetter } const ( - maxIntensity = 0 - defaultIntensity = 1 - defaultParallel = 0 - chanSize = 10000 + maxIntensity Intensity = 0 + defaultIntensity Intensity = 1 + defaultParallel = 0 + chanSize = 10000 ) // SetIntensity sets the value of '--intensity' flag. -func (i *intensityHandler) SetIntensity(ctx context.Context, intensity float64) error { - if intensity < 0 { - return service.ErrValidate(errors.Errorf("setting invalid intensity value %.2f", intensity)) - } - +func (i *intensityHandler) SetIntensity(ctx context.Context, intensity Intensity) { i.logger.Info(ctx, "Setting repair intensity", "value", intensity, "previous", i.intensity.Load()) - i.intensity.Store(intensity) - return nil + i.intensity.Store(int64(intensity)) } // SetParallel sets the value of '--parallel' flag. -func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) error { - if parallel < 0 { - return service.ErrValidate(errors.Errorf("setting invalid parallel value %d", parallel)) - } - +func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) { i.logger.Info(ctx, "Setting repair parallel", "value", parallel, "previous", i.parallel.Load()) i.parallel.Store(int64(parallel)) if parallel == defaultParallel { @@ -498,11 +485,10 @@ func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) error } else { i.poolController.SetSize(parallel) } - return nil } -func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) int { - out := math.MaxInt +func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity { + out := NewIntensity(math.MaxInt) for _, rep := range replicaSet { if ranges := i.maxHostIntensity[rep]; ranges < out { out = ranges @@ -512,18 +498,13 @@ func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) int { } // MaxHostIntensity returns max_token_ranges_in_parallel per host. -func (i *intensityHandler) MaxHostIntensity() map[string]int { +func (i *intensityHandler) MaxHostIntensity() map[string]Intensity { return i.maxHostIntensity } -// Intensity returns effective intensity. -func (i *intensityHandler) Intensity() int { - intensity := i.intensity.Load() - // Deprecate float intensity - if 0 < intensity && intensity < 1 { - intensity = defaultIntensity - } - return int(intensity) +// Intensity returns stored value for intensity. +func (i *intensityHandler) Intensity() Intensity { + return NewIntensity(int(i.intensity.Load())) } // MaxParallel returns maximal achievable parallelism.