Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix repair float intensity #3688

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/command/flag/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (fl *Intensity) String() string {

// Set implements pflag.Value.
func (fl *Intensity) Set(s string) error {
errValidation := errors.New("intensity must be an integer >= 1 or a decimal between (0,1)")
errValidation := errors.New("intensity must be a non-negative integer")

f, err := strconv.ParseFloat(s, 64)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/service/repair/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ package repair
// controller informs generator about the amount of ranges that can be repaired
// on a given replica set. Returns 0 ranges when repair shouldn't be scheduled.
type controller interface {
TryBlock(replicaSet []string) (ranges int)
TryBlock(replicaSet []string) (ranges Intensity)
Unblock(replicaSet []string)
Busy() bool
}

type intensityChecker interface {
Intensity() int
Intensity() Intensity
Parallel() int
MaxParallel() int
ReplicaSetMaxIntensity(replicaSet []string) int
ReplicaSetMaxIntensity(replicaSet []string) Intensity
}

// rowLevelRepairController is a specialised controller for row-level repair.
Expand All @@ -37,15 +37,15 @@ func newRowLevelRepairController(i intensityChecker) *rowLevelRepairController {
}
}

func (c *rowLevelRepairController) TryBlock(replicaSet []string) int {
func (c *rowLevelRepairController) TryBlock(replicaSet []string) Intensity {
if !c.shouldBlock(replicaSet) {
return 0
}
c.block(replicaSet)

i := c.intensity.Intensity()
if max := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || max < i {
i = max
if maxI := c.intensity.ReplicaSetMaxIntensity(replicaSet); i == maxIntensity || maxI < i {
i = maxI
}
return i
}
Expand Down
20 changes: 6 additions & 14 deletions pkg/service/repair/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
node6 = "192.168.1.6"
)

maxRangesPerHost := map[string]int{
maxRangesPerHost := map[string]Intensity{
node1: 20,
node2: 19,
node3: 18,
Expand All @@ -33,7 +33,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
return &intensityHandler{
logger: log.Logger{},
maxHostIntensity: maxRangesPerHost,
intensity: atomic.NewFloat64(defaultIntensity),
intensity: atomic.NewInt64(int64(defaultIntensity)),
maxParallel: 3,
parallel: atomic.NewInt64(defaultParallel),
poolController: workerpool.New[*worker, job, jobResult](context.Background(), func(ctx context.Context, id int) *worker {
Expand Down Expand Up @@ -69,9 +69,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetIntensity(context.Background(), expectedNrOfRanges); err != nil {
t.Fatalf("unexpected error = {%v}", err)
}
ih.SetIntensity(context.Background(), expectedNrOfRanges)
if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges {
t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount)
}
Expand All @@ -87,9 +85,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetIntensity(context.Background(), float64(expectedNrOfRanges)); err != nil {
t.Fatalf("unexpected error = {%v}", err)
}
ih.SetIntensity(context.Background(), expectedNrOfRanges)
if rangesCount := c.TryBlock(replicaSet); rangesCount != expectedNrOfRanges {
t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", expectedNrOfRanges, rangesCount)
}
Expand All @@ -106,9 +102,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetIntensity(context.Background(), intensity); err != nil {
t.Fatalf("unexpected error = {%v}", err)
}
ih.SetIntensity(context.Background(), intensity)
if rangesCount := c.TryBlock(replicaSet); rangesCount != minRangesInParallel {
t.Fatalf("expected to return {%d} ranges to repair, but got {%d}", minRangesInParallel, rangesCount)
}
Expand All @@ -122,9 +116,7 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
ih.maxParallel = maxParallel
c := newRowLevelRepairController(ih)

if err := ih.SetParallel(context.Background(), 1); err != nil {
t.Fatalf("unexpected error {%v}", err)
}
ih.SetParallel(context.Background(), 1)
if rangesCount := c.TryBlock(replicaSet1); rangesCount == 0 {
t.Fatal("expected to let in, but was denied")
}
Expand Down
42 changes: 29 additions & 13 deletions pkg/service/repair/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,33 @@ import (
// Unit represents keyspace and its tables.
type Unit = ksfilter.Unit

// Intensity represents parsed internal intensity.
type Intensity int

// NewIntensity returns Intensity.
func NewIntensity(i int) Intensity {
return Intensity(i)
}

// NewIntensityFromDeprecated returns Intensity parsed from deprecated float value.
func NewIntensityFromDeprecated(i float64) Intensity {
if 0 < i && i < 1 {
return defaultIntensity
}
return Intensity(i)
}

// Target specifies what shall be repaired.
type Target struct {
Units []Unit `json:"units"`
DC []string `json:"dc"`
Host string `json:"host,omitempty"`
IgnoreHosts []string `json:"ignore_hosts,omitempty"`
FailFast bool `json:"fail_fast"`
Continue bool `json:"continue"`
Intensity float64 `json:"intensity"`
Parallel int `json:"parallel"`
SmallTableThreshold int64 `json:"small_table_threshold"`
Units []Unit `json:"units"`
DC []string `json:"dc"`
Host string `json:"host,omitempty"`
IgnoreHosts []string `json:"ignore_hosts,omitempty"`
FailFast bool `json:"fail_fast"`
Continue bool `json:"continue"`
Intensity Intensity `json:"intensity"`
Parallel int `json:"parallel"`
SmallTableThreshold int64 `json:"small_table_threshold"`
// Cache for repair plan so that it does not have to be generated
// in both GetTarget and Repair functions.
plan *plan `json:"-"`
Expand All @@ -48,7 +64,7 @@ func defaultTaskProperties() *taskProperties {
Keyspace: []string{"*", "!system_traces"},

Continue: true,
Intensity: defaultIntensity,
Intensity: float64(defaultIntensity),

// Consider 1GB table as small by default.
SmallTableThreshold: 1 * 1024 * 1024 * 1024,
Expand All @@ -64,7 +80,7 @@ type Run struct {
DC []string
Host string
Parallel int
Intensity int
Intensity Intensity
PrevID uuid.UUID
StartTime time.Time
EndTime time.Time
Expand Down Expand Up @@ -169,8 +185,8 @@ type Progress struct {
Host string `json:"host"`
Hosts []HostProgress `json:"hosts"`
Tables []TableProgress `json:"tables"`
MaxIntensity float64 `json:"max_intensity"`
Intensity float64 `json:"intensity"`
MaxIntensity Intensity `json:"max_intensity"`
Intensity Intensity `json:"intensity"`
MaxParallel int `json:"max_parallel"`
Parallel int `json:"parallel"`
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/service/repair/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type plan struct {

SkippedKeyspaces []string
MaxParallel int
MaxHostIntensity map[string]int
MaxHostIntensity map[string]Intensity
HostTableSize map[scyllaclient.HostKeyspaceTable]int64
}

Expand Down Expand Up @@ -296,22 +296,22 @@ func (p *plan) KeyspaceRangesMap() map[string]int64 {
return out
}

func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]int {
out := make(map[string]int, len(shards))
func hostMaxRanges(shards map[string]uint, memory map[string]int64) map[string]Intensity {
out := make(map[string]Intensity, len(shards))
for h, sh := range shards {
out[h] = maxRepairRangesInParallel(sh, memory[h])
}
return out
}

func maxRepairRangesInParallel(shards uint, totalMemory int64) int {
func maxRepairRangesInParallel(shards uint, totalMemory int64) Intensity {
const MiB = 1024 * 1024
memoryPerShard := totalMemory / int64(shards)
max := int(0.1 * float64(memoryPerShard) / (32 * MiB) / 4)
if max == 0 {
max = 1
}
return max
return NewIntensity(max)
}

// keyspacePlan describes repair schedule and state for keyspace.
Expand Down Expand Up @@ -340,20 +340,20 @@ func (kp keyspacePlan) IsTableRepaired(tabIdx int) bool {
}

// GetRangesToRepair returns at most cnt ranges of table owned by replica set.
func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx, cnt int) []scyllaclient.TokenRange {
func (kp keyspacePlan) GetRangesToRepair(repIdx, tabIdx int, intensity Intensity) []scyllaclient.TokenRange {
rep := kp.Replicas[repIdx]
tp := kp.Tables[tabIdx]

// Return all ranges for optimized or deleted table
if tp.Optimize || tp.Deleted {
cnt = len(rep.Ranges)
intensity = NewIntensity(len(rep.Ranges))
}

var out []scyllaclient.TokenRange
for _, r := range rep.Ranges {
if tp.MarkRange(repIdx, r) {
out = append(out, r)
if len(out) >= cnt {
if NewIntensity(len(out)) >= intensity {
break
}
}
Expand Down
Loading