Skip to content

Commit

Permalink
fix(repair): get rid of float intensity in internals
Browse files Browse the repository at this point in the history
There are still some bugs (one described in the issue below) regarding using float intensity. In order to get rid of them, we should only tolerate float intensity at the entrypoint, but completely remove it from SM repair internals (both in service and progress display).

We keep float intensity in task properties and in swagger endpoint parameters, but we convert it to int intensity as a first thing in internals, which happens in:
- GetTarget (from task properties)
- sctool repair control endpoint (from query param)

Fixes #3665
  • Loading branch information
Michal-Leszczynski committed Jan 16, 2024
1 parent 9067fba commit 0acf5b0
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 91 deletions.
2 changes: 1 addition & 1 deletion pkg/command/flag/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (fl *Intensity) String() string {

// Set implements pflag.Value.
func (fl *Intensity) Set(s string) error {
errValidation := errors.New("intensity must be an integer >= 1 or a decimal between (0,1)")
errValidation := errors.New("intensity must be a non-negative integer")

f, err := strconv.ParseFloat(s, 64)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/service/repair/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ package repair
// controller informs generator about the amount of ranges that can be repaired
// on a given replica set. Returns 0 ranges when repair shouldn't be scheduled.
type controller interface {
TryBlock(replicaSet []string) (ranges int)
TryBlock(replicaSet []string) (ranges Intensity)
Unblock(replicaSet []string)
Busy() bool
}

type intensityChecker interface {
Intensity() int
Intensity() Intensity
Parallel() int
MaxParallel() int
ReplicaSetMaxIntensity(replicaSet []string) int
ReplicaSetMaxIntensity(replicaSet []string) Intensity
}

// rowLevelRepairController is a specialised controller for row-level repair.
Expand All @@ -37,15 +37,15 @@ func newRowLevelRepairController(i intensityChecker) *rowLevelRepairController {
}
}

func (c *rowLevelRepairController) TryBlock(replicaSet []string) int {
func (c *rowLevelRepairController) TryBlock(replicaSet []string) Intensity {
if !c.shouldBlock(replicaSet) {
return 0
}
c.block(replicaSet)

i := c.intensity.Intensity()
if max := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || max < i {
i = max
if maxI := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || maxI < i {
i = maxI
}
return i
}
Expand Down
20 changes: 6 additions & 14 deletions pkg/service/repair/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
node6 = "192.168.1.6"
)

maxRangesPerHost := map[string]int{
maxRangesPerHost := map[string]Intensity{
node1: 20,
node2: 19,
node3: 18,
Expand All @@ -33,7 +33,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
return &intensityHandler{
logger: log.Logger{},
maxHostIntensity: maxRangesPerHost,
intensity: atomic.NewFloat64(defaultIntensity),
intensity: atomic.NewInt64(int64(defaultIntensity)),
maxParallel: 3,
parallel: atomic.NewInt64(defaultParallel),
poolController: workerpool.New[*worker, job, jobResult](context.Background(), func(ctx context.Context, id int) *worker {
Expand Down Expand Up @@ -69,9 +69,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetIntensity(context.Background(), expectedNrOfRanges); err != nil {
t.Fatalf("unexpected error = {%v}", err)
}
ih.SetIntensity(context.Background(), expectedNrOfRanges)
if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges {
t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount)
}
Expand All @@ -87,9 +85,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetIntensity(context.Background(), float64(expectedNrOfRanges)); err != nil {
t.Fatalf("unexpected error = {%v}", err)
}
ih.SetIntensity(context.Background(), expectedNrOfRanges)
if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges {
t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount)
}
Expand All @@ -106,9 +102,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetIntensity(context.Background(), intensity); err != nil {
t.Fatalf("unexpected error = {%v}", err)
}
ih.SetIntensity(context.Background(), intensity)
if rangesCount := c.TryBlock(replicaSet); rangesCount != minRangesInParallel {
t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", minRangesInParallel, rangesCount)
}
Expand All @@ -122,9 +116,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetParallel(context.Background(), 1); err != nil {
t.Fatalf("unexpected error {%v}", err)
}
ih.SetParallel(context.Background(), 1)
if rangesCount := c.TryBlock(replicaSet1); rangesCount == 0 {
t.Fatal("expected to let in, but was denied")
}
Expand Down
42 changes: 29 additions & 13 deletions pkg/service/repair/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,33 @@ import (
// Unit represents keyspace and its tables.
type Unit = ksfilter.Unit

// Intensity represents parsed internal intensity.
type Intensity int

// NewIntensity returns Intensity.
func NewIntensity(i int) Intensity {
return Intensity(i)
}

// NewIntensityFromDeprecated returns Intensity parsed from deprecated float value.
func NewIntensityFromDeprecated(i float64) Intensity {
if 0 < i && i < 1 {
return defaultIntensity
}
return Intensity(i)
}

// Target specifies what shall be repaired.
type Target struct {
Units []Unit `json:"units"`
DC []string `json:"dc"`
Host string `json:"host,omitempty"`
IgnoreHosts []string `json:"ignore_hosts,omitempty"`
FailFast bool `json:"fail_fast"`
Continue bool `json:"continue"`
Intensity float64 `json:"intensity"`
Parallel int `json:"parallel"`
SmallTableThreshold int64 `json:"small_table_threshold"`
Units []Unit `json:"units"`
DC []string `json:"dc"`
Host string `json:"host,omitempty"`
IgnoreHosts []string `json:"ignore_hosts,omitempty"`
FailFast bool `json:"fail_fast"`
Continue bool `json:"continue"`
Intensity Intensity `json:"intensity"`
Parallel int `json:"parallel"`
SmallTableThreshold int64 `json:"small_table_threshold"`
// Cache for repair plan so that it does not have to be generated
// in both GetTarget and Repair functions.
plan *plan `json:"-"`
Expand All @@ -48,7 +64,7 @@ func defaultTaskProperties() *taskProperties {
Keyspace: []string{"*", "!system_traces"},

Continue: true,
Intensity: defaultIntensity,
Intensity: float64(defaultIntensity),

// Consider 1GB table as small by default.
SmallTableThreshold: 1 * 1024 * 1024 * 1024,
Expand All @@ -64,7 +80,7 @@ type Run struct {
DC []string
Host string
Parallel int
Intensity int
Intensity Intensity
PrevID uuid.UUID
StartTime time.Time
EndTime time.Time
Expand Down Expand Up @@ -169,8 +185,8 @@ type Progress struct {
Host string `json:"host"`
Hosts []HostProgress `json:"hosts"`
Tables []TableProgress `json:"tables"`
MaxIntensity float64 `json:"max_intensity"`
Intensity float64 `json:"intensity"`
MaxIntensity Intensity `json:"max_intensity"`
Intensity Intensity `json:"intensity"`
MaxParallel int `json:"max_parallel"`
Parallel int `json:"parallel"`
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/service/repair/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type plan struct {

SkippedKeyspaces []string
MaxParallel int
MaxHostIntensity map[string]int
MaxHostIntensity map[string]Intensity
HostTableSize map[scyllaclient.HostKeyspaceTable]int64
}

Expand Down Expand Up @@ -296,22 +296,22 @@ func (p *plan) KeyspaceRangesMap() map[string]int64 {
return out
}

func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]int {
out := make(map[string]int, len(shards))
func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]Intensity {
out := make(map[string]Intensity, len(shards))
for h, sh := range shards {
out[h] = maxRepairRangesInParallel(sh, memory[h])
}
return out
}

func maxRepairRangesInParallel(shards uint, totalMemory int64) int {
func maxRepairRangesInParallel(shards uint, totalMemory int64) Intensity {
const MiB = 1024 * 1024
memoryPerShard := totalMemory / int64(shards)
max := int(0.1 * float64(memoryPerShard) / (32 * MiB) / 4)
if max == 0 {
max = 1
}
return max
return NewIntensity(max)
}

// keyspacePlan describes repair schedule and state for keyspace.
Expand Down Expand Up @@ -340,20 +340,20 @@ func (kp keyspacePlan) IsTableRepaired(tabIdx int) bool {
}

// GetRangesToRepair returns at most cnt ranges of table owned by replica set.
func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx, cnt int) []scyllaclient.TokenRange {
func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx int, intensity Intensity) []scyllaclient.TokenRange {
rep := kp.Replicas[repIdx]
tp := kp.Tables[tabIdx]

// Return all ranges for optimized or deleted table
if tp.Optimize || tp.Deleted {
cnt = len(rep.Ranges)
intensity = NewIntensity(len(rep.Ranges))
}

var out []scyllaclient.TokenRange
for _, r := range rep.Ranges {
if tp.MarkRange(repIdx, r) {
out = append(out, r)
if len(out) >= cnt {
if NewIntensity(len(out)) >= intensity {
break
}
}
Expand Down
Loading

0 comments on commit 0acf5b0

Please sign in to comment.