Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: fix out-of-order terms #47

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,17 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }

func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
_, term := l.tip()
return term
}

func (l *raftLog) tip() (index, term uint64) {
index = l.lastIndex()
t, err := l.term(index)
if err != nil {
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
}
return t
return index, t
}

func (l *raftLog) term(i uint64) (uint64, error) {
Expand Down
15 changes: 14 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,10 @@ func (r *raft) reset(term uint64) {
}

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
li, lt := r.raftLog.tip()
if r.Term < lt {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the first successful appendEntry lt == r.Term, because es[i].Term = r.Term on line 765. So, I don't think this check should be here. It probably should happen at the end of raft.newRaft().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

r.logger.Panicf("%x appending out-of-order term: %d < %d", r.id, r.Term, lt)
Copy link

@lavacat lavacat Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you aren't checking Terms in es, so msg is a bit confusing. Maybe appending out-of-order term -> last raft log entry has term greater than raft struct

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about using warning instead of panic?

}
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
Expand Down Expand Up @@ -1727,6 +1730,16 @@ func (r *raft) restore(s pb.Snapshot) bool {
return false
}

// Another defense-in-depth: the follower is seeing a snapshot at a bigger
// term, but hasn't updated its own term.
if s.Metadata.Term > r.Term {
r.logger.Warningf("%x attempted to restore snapshot at term %d while being at earlier term %d; "+
"should transition to follower at a larger term first",
r.id, s.Metadata.Term, r.Term)
r.becomeFollower(s.Metadata.Term, None)
return false
}

// More defense-in-depth: throw away snapshot if recipient is not in the
// config. This shouldn't ever happen (at the time of writing) but lots of
// code here and there assumes that r.id is in the progress tracker.
Expand Down
22 changes: 12 additions & 10 deletions raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package raft
import (
"testing"

"github.com/stretchr/testify/require"

pb "go.etcd.io/raft/v3/raftpb"
)

Expand All @@ -33,8 +35,8 @@ var (
func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeFollower(testingSnap.Metadata.Term, 0)
require.True(t, sm.restore(testingSnap))
sm.becomeCandidate()
sm.becomeLeader()

Expand All @@ -51,8 +53,8 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
func TestPendingSnapshotPauseReplication(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeFollower(testingSnap.Metadata.Term, 0)
require.True(t, sm.restore(testingSnap))
sm.becomeCandidate()
sm.becomeLeader()

Expand All @@ -68,8 +70,8 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
func TestSnapshotFailure(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeFollower(testingSnap.Metadata.Term, 0)
require.True(t, sm.restore(testingSnap))
sm.becomeCandidate()
sm.becomeLeader()

Expand All @@ -91,8 +93,8 @@ func TestSnapshotFailure(t *testing.T) {
func TestSnapshotSucceed(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeFollower(testingSnap.Metadata.Term, 0)
require.True(t, sm.restore(testingSnap))
sm.becomeCandidate()
sm.becomeLeader()

Expand All @@ -114,8 +116,8 @@ func TestSnapshotSucceed(t *testing.T) {
func TestSnapshotAbort(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeFollower(testingSnap.Metadata.Term, 0)
require.True(t, sm.restore(testingSnap))
sm.becomeCandidate()
sm.becomeLeader()

Expand Down
65 changes: 24 additions & 41 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,11 +1366,11 @@ func TestHandleHeartbeat(t *testing.T) {
// TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response.
func TestHandleHeartbeatResp(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
storage.SetHardState(pb.HardState{Term: 3, Vote: 1, Commit: 3})
storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these entries necessary for the test? Maybe both storage.SetHardState and storage.Append can be removed to keep things simpler

sm := newTestRaft(1, 5, 1, storage)
sm.becomeCandidate()
sm.becomeLeader()
sm.raftLog.commitTo(sm.raftLog.lastIndex())

// A heartbeat response from a node that is behind; re-send MsgApp
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
Expand Down Expand Up @@ -2916,9 +2916,8 @@ func TestRestore(t *testing.T) {

storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
if ok := sm.restore(s); !ok {
t.Fatal("restore fail, want succeed")
}
sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, sm.restore(s))

if sm.raftLog.lastIndex() != s.Metadata.Index {
t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
Expand Down Expand Up @@ -2955,9 +2954,8 @@ func TestRestoreWithLearner(t *testing.T) {

storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3))
sm := newTestLearnerRaft(3, 8, 2, storage)
if ok := sm.restore(s); !ok {
t.Error("restore fail, want succeed")
}
sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, sm.restore(s))

if sm.raftLog.lastIndex() != s.Metadata.Index {
t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
Expand Down Expand Up @@ -3001,9 +2999,8 @@ func TestRestoreWithVotersOutgoing(t *testing.T) {

storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
if ok := sm.restore(s); !ok {
t.Fatal("restore fail, want succeed")
}
sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, sm.restore(s))

if sm.raftLog.lastIndex() != s.Metadata.Index {
t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
Expand Down Expand Up @@ -3047,13 +3044,9 @@ func TestRestoreVoterToLearner(t *testing.T) {

storage := newTestMemoryStorage(withPeers(1, 2, 3))
sm := newTestRaft(3, 10, 1, storage)

if sm.isLearner {
t.Errorf("%x is learner, want not", sm.id)
}
if ok := sm.restore(s); !ok {
t.Error("restore failed unexpectedly")
}
sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, !sm.isLearner)
require.True(t, sm.restore(s))
}

// TestRestoreLearnerPromotion checks that a learner can become to a follower after
Expand All @@ -3069,18 +3062,10 @@ func TestRestoreLearnerPromotion(t *testing.T) {

storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3))
sm := newTestLearnerRaft(3, 10, 1, storage)

if !sm.isLearner {
t.Errorf("%x is not learner, want yes", sm.id)
}

if ok := sm.restore(s); !ok {
t.Error("restore fail, want succeed")
}

if sm.isLearner {
t.Errorf("%x is learner, want not", sm.id)
}
require.True(t, sm.isLearner)
sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, sm.restore(s))
require.True(t, !sm.isLearner)
}

// TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader
Expand All @@ -3096,13 +3081,13 @@ func TestLearnerReceiveSnapshot(t *testing.T) {

store := newTestMemoryStorage(withPeers(1), withLearners(2))
n1 := newTestLearnerRaft(1, 10, 1, store)
n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))

n1.restore(s)
n1.becomeFollower(s.Metadata.Term, 0)
require.True(t, n1.restore(s))
snap := n1.raftLog.nextUnstableSnapshot()
store.ApplySnapshot(*snap)
n1.appliedSnap(snap)

n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
nt := newNetwork(n1, n2)

setRandomizedElectionTimeout(n1, n1.electionTimeout)
Expand Down Expand Up @@ -3132,6 +3117,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
ConfState: pb.ConfState{Voters: []uint64{1, 2}},
},
}
sm.becomeFollower(s.Metadata.Term, 0)

// ignore snapshot
if ok := sm.restore(s); ok {
Expand Down Expand Up @@ -3162,8 +3148,8 @@ func TestProvideSnap(t *testing.T) {
}
storage := newTestMemoryStorage(withPeers(1))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(s)

sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, sm.restore(s))
sm.becomeCandidate()
sm.becomeLeader()

Expand Down Expand Up @@ -3192,8 +3178,8 @@ func TestIgnoreProvidingSnap(t *testing.T) {
}
storage := newTestMemoryStorage(withPeers(1))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(s)

sm.becomeFollower(s.Metadata.Term, 0)
require.True(t, sm.restore(s))
sm.becomeCandidate()
sm.becomeLeader()

Expand All @@ -3218,14 +3204,11 @@ func TestRestoreFromSnapMsg(t *testing.T) {
ConfState: pb.ConfState{Voters: []uint64{1, 2}},
},
}
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 11, Snapshot: s}

sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
sm.Step(m)

if sm.lead != uint64(1) {
t.Errorf("sm.lead = %d, want 1", sm.lead)
}
require.Equal(t, uint64(1), sm.lead)

// TODO(bdarnell): what should this test?
}
Expand Down