diff --git a/pkg/command/flag/type.go b/pkg/command/flag/type.go index c9943a076f..e60122cf9e 100644 --- a/pkg/command/flag/type.go +++ b/pkg/command/flag/type.go @@ -238,7 +238,7 @@ func (fl *Intensity) String() string { // Set implements pflag.Value. func (fl *Intensity) Set(s string) error { - errValidation := errors.New("intensity must be an integer >= 1 or a decimal between (0,1)") + errValidation := errors.New("intensity must be a non-negative integer") f, err := strconv.ParseFloat(s, 64) if err != nil { diff --git a/pkg/service/repair/controller.go b/pkg/service/repair/controller.go index 7798625356..1f66de98ee 100644 --- a/pkg/service/repair/controller.go +++ b/pkg/service/repair/controller.go @@ -5,16 +5,16 @@ package repair // controller informs generator about the amount of ranges that can be repaired // on a given replica set. Returns 0 ranges when repair shouldn't be scheduled. type controller interface { - TryBlock(replicaSet []string) (ranges int) + TryBlock(replicaSet []string) (ranges Intensity) Unblock(replicaSet []string) Busy() bool } type intensityChecker interface { - Intensity() int + Intensity() Intensity Parallel() int MaxParallel() int - ReplicaSetMaxIntensity(replicaSet []string) int + ReplicaSetMaxIntensity(replicaSet []string) Intensity } // rowLevelRepairController is a specialised controller for row-level repair. @@ -37,15 +37,15 @@ func newRowLevelRepairController(i intensityChecker) *rowLevelRepairController { } } -func (c *rowLevelRepairController) TryBlock(replicaSet []string) int { +func (c *rowLevelRepairController) TryBlock(replicaSet []string) Intensity { if !c.shouldBlock(replicaSet) { return 0 } c.block(replicaSet) i := c.intensity.Intensity() - if max := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || max < i { - i = max + if maxI := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || maxI < i { + i = maxI } return i } diff --git a/pkg/service/repair/controller_test.go b/pkg/service/repair/controller_test.go index 9a7b35ccc6..4b95c18424 100644 --- a/pkg/service/repair/controller_test.go +++ b/pkg/service/repair/controller_test.go @@ -21,7 +21,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { node6 = "192.168.1.6" ) - maxRangesPerHost := map[string]int{ + maxRangesPerHost := map[string]Intensity{ node1: 20, node2: 19, node3: 18, @@ -33,7 +33,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { return &intensityHandler{ logger: log.Logger{}, maxHostIntensity: maxRangesPerHost, - intensity: atomic.NewFloat64(defaultIntensity), + intensity: atomic.NewInt64(int64(defaultIntensity)), maxParallel: 3, parallel: atomic.NewInt64(defaultParallel), poolController: workerpool.New[*worker, job, jobResult](context.Background(), func(ctx context.Context, id int) *worker { @@ -69,9 +69,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetIntensity(context.Background(), expectedNrOfRanges); err != nil { - t.Fatalf("unexpected error = {%v}", err) - } + ih.SetIntensity(context.Background(), expectedNrOfRanges) if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges { t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount) } @@ -87,9 +85,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetIntensity(context.Background(), float64(expectedNrOfRanges)); err != nil { - t.Fatalf("unexpected error = {%v}", err) - } + ih.SetIntensity(context.Background(), expectedNrOfRanges) if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges { t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount) } @@ -106,9 +102,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetIntensity(context.Background(), intensity); err != nil { - t.Fatalf("unexpected error = {%v}", err) - } + ih.SetIntensity(context.Background(), intensity) if rangesCount := c.TryBlock(replicaSet); rangesCount != minRangesInParallel { t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", minRangesInParallel, rangesCount) } @@ -122,9 +116,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { ih.maxParallel = maxParallel c := newRowLevelRepairController(ih) - if err := ih.SetParallel(context.Background(), 1); err != nil { - t.Fatalf("unexpected error {%v}", err) - } + ih.SetParallel(context.Background(), 1) if rangesCount := c.TryBlock(replicaSet1); rangesCount == 0 { t.Fatal("expected to let in, but was denied") } diff --git a/pkg/service/repair/model.go b/pkg/service/repair/model.go index 448c86fe52..6872a3f98b 100644 --- a/pkg/service/repair/model.go +++ b/pkg/service/repair/model.go @@ -13,17 +13,33 @@ import ( // Unit represents keyspace and its tables. type Unit = ksfilter.Unit +// Intensity represents parsed internal intensity. +type Intensity int + +// NewIntensity returns Intensity. +func NewIntensity(i int) Intensity { + return Intensity(i) +} + +// NewIntensityFromDeprecated returns Intensity parsed from deprecated float value. +func NewIntensityFromDeprecated(i float64) Intensity { + if 0 < i && i < 1 { + return defaultIntensity + } + return Intensity(i) +} + // Target specifies what shall be repaired. type Target struct { - Units []Unit `json:"units"` - DC []string `json:"dc"` - Host string `json:"host,omitempty"` - IgnoreHosts []string `json:"ignore_hosts,omitempty"` - FailFast bool `json:"fail_fast"` - Continue bool `json:"continue"` - Intensity float64 `json:"intensity"` - Parallel int `json:"parallel"` - SmallTableThreshold int64 `json:"small_table_threshold"` + Units []Unit `json:"units"` + DC []string `json:"dc"` + Host string `json:"host,omitempty"` + IgnoreHosts []string `json:"ignore_hosts,omitempty"` + FailFast bool `json:"fail_fast"` + Continue bool `json:"continue"` + Intensity Intensity `json:"intensity"` + Parallel int `json:"parallel"` + SmallTableThreshold int64 `json:"small_table_threshold"` // Cache for repair plan so that it does not have to be generated // in both GetTarget and Repair functions. plan *plan `json:"-"` @@ -48,7 +64,7 @@ func defaultTaskProperties() *taskProperties { Keyspace: []string{"*", "!system_traces"}, Continue: true, - Intensity: defaultIntensity, + Intensity: float64(defaultIntensity), // Consider 1GB table as small by default. SmallTableThreshold: 1 * 1024 * 1024 * 1024, @@ -64,7 +80,7 @@ type Run struct { DC []string Host string Parallel int - Intensity int + Intensity Intensity PrevID uuid.UUID StartTime time.Time EndTime time.Time @@ -169,8 +185,8 @@ type Progress struct { Host string `json:"host"` Hosts []HostProgress `json:"hosts"` Tables []TableProgress `json:"tables"` - MaxIntensity float64 `json:"max_intensity"` - Intensity float64 `json:"intensity"` + MaxIntensity Intensity `json:"max_intensity"` + Intensity Intensity `json:"intensity"` MaxParallel int `json:"max_parallel"` Parallel int `json:"parallel"` } diff --git a/pkg/service/repair/plan.go b/pkg/service/repair/plan.go index bd83f91e16..07bd779913 100644 --- a/pkg/service/repair/plan.go +++ b/pkg/service/repair/plan.go @@ -20,7 +20,7 @@ type plan struct { SkippedKeyspaces []string MaxParallel int - MaxHostIntensity map[string]int + MaxHostIntensity map[string]Intensity HostTableSize map[scyllaclient.HostKeyspaceTable]int64 } @@ -296,22 +296,22 @@ func (p *plan) KeyspaceRangesMap() map[string]int64 { return out } -func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]int { - out := make(map[string]int, len(shards)) +func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]Intensity { + out := make(map[string]Intensity, len(shards)) for h, sh := range shards { out[h] = maxRepairRangesInParallel(sh, memory[h]) } return out } -func maxRepairRangesInParallel(shards uint, totalMemory int64) int { +func maxRepairRangesInParallel(shards uint, totalMemory int64) Intensity { const MiB = 1024 * 1024 memoryPerShard := totalMemory / int64(shards) max := int(0.1 * float64(memoryPerShard) / (32 * MiB) / 4) if max == 0 { max = 1 } - return max + return NewIntensity(max) } // keyspacePlan describes repair schedule and state for keyspace. @@ -340,20 +340,20 @@ func (kp keyspacePlan) IsTableRepaired(tabIdx int) bool { } // GetRangesToRepair returns at most cnt ranges of table owned by replica set. -func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx, cnt int) []scyllaclient.TokenRange { +func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx int, intensity Intensity) []scyllaclient.TokenRange { rep := kp.Replicas[repIdx] tp := kp.Tables[tabIdx] // Return all ranges for optimized or deleted table if tp.Optimize || tp.Deleted { - cnt = len(rep.Ranges) + intensity = NewIntensity(len(rep.Ranges)) } var out []scyllaclient.TokenRange for _, r := range rep.Ranges { if tp.MarkRange(repIdx, r) { out = append(out, r) - if len(out) >= cnt { + if NewIntensity(len(out)) >= intensity { break } } diff --git a/pkg/service/repair/service.go b/pkg/service/repair/service.go index 38804c56cf..b9f36ed279 100644 --- a/pkg/service/repair/service.go +++ b/pkg/service/repair/service.go @@ -89,7 +89,7 @@ func (s *Service) GetTarget(ctx context.Context, clusterID uuid.UUID, properties Host: props.Host, FailFast: props.FailFast, Continue: props.Continue, - Intensity: props.Intensity, + Intensity: NewIntensityFromDeprecated(props.Intensity), Parallel: props.Parallel, SmallTableThreshold: props.SmallTableThreshold, } @@ -210,7 +210,7 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID DC: target.DC, Host: target.Host, Parallel: target.Parallel, - Intensity: int(target.Intensity), + Intensity: target.Intensity, StartTime: timeutc.Now(), } if err := s.putRun(run); err != nil { @@ -258,12 +258,8 @@ func (s *Service) Repair(ctx context.Context, clusterID, taskID, runID uuid.UUID defer cleanup() // Set controlled parameters - if err := s.SetParallel(ctx, clusterID, run.Parallel); err != nil { - return errors.Wrap(err, "set initial parallel") - } - if err := s.SetIntensity(ctx, clusterID, float64(run.Intensity)); err != nil { - return errors.Wrap(err, "set initial intensity") - } + ih.SetParallel(ctx, run.Parallel) + ih.SetIntensity(ctx, run.Intensity) // Give generator the ability to read parallel/intensity and // to submit and receive results from worker pool. @@ -318,14 +314,14 @@ func (s *Service) killAllRepairs(ctx context.Context, client *scyllaclient.Clien } func (s *Service) newIntensityHandler(ctx context.Context, clusterID, taskID, runID uuid.UUID, - maxHostIntensity map[string]int, maxParallel int, poolController sizeSetter, + maxHostIntensity map[string]Intensity, maxParallel int, poolController sizeSetter, ) (ih *intensityHandler, cleanup func()) { ih = &intensityHandler{ taskID: taskID, runID: runID, logger: s.logger.Named("control"), maxHostIntensity: maxHostIntensity, - intensity: &atomic.Float64{}, + intensity: &atomic.Int64{}, maxParallel: maxParallel, parallel: &atomic.Int64{}, poolController: poolController, @@ -385,18 +381,18 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid return Progress{}, errors.Wrap(err, "aggregate progress") } p.Parallel = run.Parallel - p.Intensity = float64(run.Intensity) + p.Intensity = run.Intensity // Set max parallel/intensity only for running tasks s.mu.Lock() if ih, ok := s.intensityHandlers[clusterID]; ok { - maxI := 0 + maxI := NewIntensity(0) for _, v := range ih.MaxHostIntensity() { if maxI < v { maxI = v } } - p.MaxIntensity = float64(maxI) + p.MaxIntensity = maxI p.MaxParallel = ih.MaxParallel() } s.mu.Unlock() @@ -415,10 +411,10 @@ func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensi if !ok { return errors.Wrap(service.ErrNotFound, "repair task") } - - if err := ih.SetIntensity(ctx, intensity); err != nil { - return errors.Wrap(err, "set intensity") + if intensity < 0 { + return service.ErrValidate(errors.Errorf("setting invalid intensity value %.2f", intensity)) } + ih.SetIntensity(ctx, NewIntensityFromDeprecated(intensity)) err := table.RepairRun.UpdateBuilder("intensity").Query(s.session).BindMap(qb.M{ "cluster_id": clusterID, @@ -438,10 +434,10 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel if !ok { return errors.Wrap(service.ErrNotFound, "repair task") } - - if err := ih.SetParallel(ctx, parallel); err != nil { - return errors.Wrap(err, "set parallel") + if parallel < 0 { + return service.ErrValidate(errors.Errorf("setting invalid parallel value %d", parallel)) } + ih.SetParallel(ctx, parallel) err := table.RepairRun.UpdateBuilder("parallel").Query(s.session).BindMap(qb.M{ "cluster_id": clusterID, @@ -460,37 +456,28 @@ type intensityHandler struct { taskID uuid.UUID runID uuid.UUID logger log.Logger - maxHostIntensity map[string]int - intensity *atomic.Float64 + maxHostIntensity map[string]Intensity + intensity *atomic.Int64 maxParallel int parallel *atomic.Int64 poolController sizeSetter } const ( - maxIntensity = 0 - defaultIntensity = 1 - defaultParallel = 0 - chanSize = 10000 + maxIntensity Intensity = 0 + defaultIntensity Intensity = 1 + defaultParallel = 0 + chanSize = 10000 ) // SetIntensity sets the value of '--intensity' flag. -func (i *intensityHandler) SetIntensity(ctx context.Context, intensity float64) error { - if intensity < 0 { - return service.ErrValidate(errors.Errorf("setting invalid intensity value %.2f", intensity)) - } - +func (i *intensityHandler) SetIntensity(ctx context.Context, intensity Intensity) { i.logger.Info(ctx, "Setting repair intensity", "value", intensity, "previous", i.intensity.Load()) - i.intensity.Store(intensity) - return nil + i.intensity.Store(int64(intensity)) } // SetParallel sets the value of '--parallel' flag. -func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) error { - if parallel < 0 { - return service.ErrValidate(errors.Errorf("setting invalid parallel value %d", parallel)) - } - +func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) { i.logger.Info(ctx, "Setting repair parallel", "value", parallel, "previous", i.parallel.Load()) i.parallel.Store(int64(parallel)) if parallel == defaultParallel { @@ -498,11 +485,10 @@ func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) error } else { i.poolController.SetSize(parallel) } - return nil } -func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) int { - out := math.MaxInt +func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity { + out := NewIntensity(math.MaxInt) for _, rep := range replicaSet { if ranges := i.maxHostIntensity[rep]; ranges < out { out = ranges @@ -512,18 +498,13 @@ func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) int { } // MaxHostIntensity returns max_token_ranges_in_parallel per host. -func (i *intensityHandler) MaxHostIntensity() map[string]int { +func (i *intensityHandler) MaxHostIntensity() map[string]Intensity { return i.maxHostIntensity } -// Intensity returns effective intensity. -func (i *intensityHandler) Intensity() int { - intensity := i.intensity.Load() - // Deprecate float intensity - if 0 < intensity && intensity < 1 { - intensity = defaultIntensity - } - return int(intensity) +// Intensity returns stored value for intensity. +func (i *intensityHandler) Intensity() Intensity { + return NewIntensity(int(i.intensity.Load())) } // MaxParallel returns maximal achievable parallelism.