Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] Backport TestLessorRenewExtendPileup race condition fix #18570

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ var v3_6 = semver.Version{Major: 3, Minor: 6}
var (
forever = time.Time{}

// maximum number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000
// default number of leases to revoke per second; configurable for tests
defaultLeaseRevokeRate = 1000

// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000
Expand Down Expand Up @@ -172,6 +172,9 @@ type lessor struct {
// requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64

// maximum number of leases to revoke per second
leaseRevokeRate int

expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{}
Expand Down Expand Up @@ -200,6 +203,8 @@ type LessorConfig struct {
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool

leaseRevokeRate int
}

func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
Expand All @@ -209,19 +214,24 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
leaseRevokeRate := cfg.leaseRevokeRate
if checkpointInterval == 0 {
checkpointInterval = defaultLeaseCheckpointInterval
}
if expiredLeaseRetryInterval == 0 {
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
}
if leaseRevokeRate == 0 {
leaseRevokeRate = defaultLeaseRevokeRate
}
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseExpiredNotifier: newLeaseExpiredNotifier(),
leaseCheckpointHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: cfg.MinLeaseTTL,
leaseRevokeRate: leaseRevokeRate,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
Expand Down Expand Up @@ -474,7 +484,7 @@ func (le *lessor) Promote(extend time.Duration) {
le.scheduleCheckpointIfNeeded(l)
}

if len(le.leaseMap) < leaseRevokeRate {
if len(le.leaseMap) < le.leaseRevokeRate {
// no possibility of lease pile-up
return
}
Expand All @@ -488,7 +498,7 @@ func (le *lessor) Promote(extend time.Duration) {
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
Expand Down Expand Up @@ -627,7 +637,7 @@ func (le *lessor) revokeExpiredLeases() {
var ls []*Lease

// rate limit
revokeLimit := leaseRevokeRate / 2
revokeLimit := le.leaseRevokeRate / 2

le.mu.RLock()
if le.isPrimary() {
Expand Down
18 changes: 8 additions & 10 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,15 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
// expire at the same time.
func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate
defer func() { leaseRevokeRate = oldRevokeRate }()
leaseRevokeRate := 10
lg := zap.NewNop()
leaseRevokeRate = 10

dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
for i := 1; i <= le.leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
t.Fatal(err)
}
Expand All @@ -318,7 +316,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate})
defer le.Stop()

// extend after recovery should extend expiration on lease pile-up
Expand All @@ -333,11 +331,11 @@ func TestLessorRenewExtendPileup(t *testing.T) {

for i := ttl; i < ttl+20; i++ {
c := windowCounts[i]
if c > leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c)
if c > le.leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c)
}
if c < leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c)
if c < le.leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c)
}
}
}
Expand Down
Loading