From eb6bfc6966a2e90589986c8f6fd808d6d97ab1e9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 26 Jan 2024 02:22:07 +0000 Subject: [PATCH 1/3] raft: advance commit index safely This change makes the commit index advancement in handleHeartbeat safe. Previously, a follower would attempt to update the commit index to whichever was sent in the MsgHeartbeat message. Out-of-bound indices would crash the node. It is always safe to advance a commit index if the follower's log is "in sync" with the leader, i.e. when its log is guaranteed to be a prefix of the leader's log. This becomes true when the first MsgApp append message succeeds. At the moment, the leader will never send a commit index that exceeds the follower's log size. However, this may change in future. This change is a defence-in-depth. Signed-off-by: Pavel Kalinnikov --- raft.go | 14 +++++++++++++- raft_test.go | 17 +++++++++++++---- rawnode_test.go | 1 + 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/raft.go b/raft.go index 5a150562..de58f014 100644 --- a/raft.go +++ b/raft.go @@ -375,6 +375,11 @@ type raft struct { // the leader id lead uint64 + // logSynced is true if this node's log is guaranteed to be a prefix of the + // leader's log at this term. Always true for the leader. Always false for a + // candidate. For a follower, becomes true the first time a MsgApp append to + // the log succeeds. + logSynced bool // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 @@ -763,6 +768,7 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None + r.logSynced = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -908,6 +914,7 @@ func (r *raft) becomeLeader() { r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id + r.logSynced = true // the leader's log is in sync with itself r.state = StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1735,6 +1742,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.logSynced = true // from now on, the log is a prefix of the leader's log r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1778,11 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + // If our log is not a prefix of the leader's log, it is unsafe to advance the + // commit index, because the entries at this index may mismatch. + if r.logSynced { + r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) + } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..fd3739ae 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1332,11 +1332,18 @@ func TestHandleMsgApp(t *testing.T) { func TestHandleHeartbeat(t *testing.T) { commit := uint64(2) tests := []struct { - m pb.Message - wCommit uint64 + m pb.Message + logSynced bool + wCommit uint64 }{ - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, true, commit + 1}, + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, true, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, false, commit}, + + // Increase the commit index only if the log is in sync with the leader. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, false, commit}, + // Do not increase the commit index beyond our log size. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, true, commit + 1}, } for i, tt := range tests { @@ -1345,6 +1352,8 @@ func TestHandleHeartbeat(t *testing.T) { sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit) + sm.logSynced = tt.logSynced + sm.handleHeartbeat(tt.m) if sm.raftLog.committed != tt.wCommit { t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) diff --git a/rawnode_test.go b/rawnode_test.go index bca5e64c..eef09764 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -952,6 +952,7 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { if err != nil { t.Fatal(err) } + rawNode.raft.logSynced = true // needed to be able to advance the commit index for highestApplied := uint64(0); highestApplied != 11; { rd := rawNode.Ready() From 9bffaa4e4af02fd5575d859fb418ff20d72af28f Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 25 Jan 2024 21:55:55 +0000 Subject: [PATCH 2/3] testdata: add a lagging commit test This commit adds a test demonstrating the effect of delayed commit on a follower node after a network hiccup between the leader and this follower. In the described scenario, after the moment of committing an entry on the leader, it takes HeartbeatInterval + 3/2 * RTT until the follower learns this entry is committed. This is suboptimal, and could take HeartbeatInverval + 1/2 * RTT if the leader didn't cut the commit index at Progress.Match before sending it to the follower. Signed-off-by: Pavel Kalinnikov --- testdata/lagging_commit.txt | 174 ++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 testdata/lagging_commit.txt diff --git a/testdata/lagging_commit.txt b/testdata/lagging_commit.txt new file mode 100644 index 00000000..8f8ba336 --- /dev/null +++ b/testdata/lagging_commit.txt @@ -0,0 +1,174 @@ +# This test demonstrates the effect of delayed commit on a follower node after a +# network hiccup between the leader and this follower. + +# Skip logging the boilerplate. Set up a raft group of 3 nodes, and elect node 1 +# as the leader. Nodes 2 and 3 are the followers. +log-level none +---- +ok + +add-nodes 3 voters=(1,2,3) index=10 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok + +# Propose a couple of entries. +propose 1 data1 +---- +ok + +propose 1 data2 +---- +ok + +process-ready 1 +---- +ok + +# The interesting part starts below. +log-level debug +---- +ok + +deliver-msgs 2 3 +---- +1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"] +1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"] +1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"] +1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"] + +process-ready 3 +---- +Ready MustSync=true: +Entries: +1/12 EntryNormal "data1" +1/13 EntryNormal "data2" +Messages: +3->1 MsgAppResp Term:1 Log:0/12 +3->1 MsgAppResp Term:1 Log:0/13 + +# Suppose there is a network blip which prevents the leader learning that the +# follower 3 has appended the proposed entries to the log. +deliver-msgs drop=(1) +---- +dropped: 3->1 MsgAppResp Term:1 Log:0/12 +dropped: 3->1 MsgAppResp Term:1 Log:0/13 + +# In the meantime, the entries are committed, and the leader sends the commit +# index to all the followers. +stabilize 1 2 +---- +> 2 handling Ready + Ready MustSync=true: + Entries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 2->1 MsgAppResp Term:1 Log:0/12 + 2->1 MsgAppResp Term:1 Log:0/13 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/12 + 2->1 MsgAppResp Term:1 Log:0/13 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:13 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 1->2 MsgApp Term:1 Log:1/13 Commit:12 + 1->3 MsgApp Term:1 Log:1/13 Commit:12 + 1->2 MsgApp Term:1 Log:1/13 Commit:13 + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/13 Commit:12 + 1->2 MsgApp Term:1 Log:1/13 Commit:13 +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:13 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 2->1 MsgAppResp Term:1 Log:0/13 + 2->1 MsgAppResp Term:1 Log:0/13 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/13 + 2->1 MsgAppResp Term:1 Log:0/13 + +# The network blip prevents the follower 3 from learning that the previously +# appended entries are now committed. +deliver-msgs drop=(3) +---- +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:13 + +# The network blip ends here. + +status 1 +---- +1: StateReplicate match=13 next=14 +2: StateReplicate match=13 next=14 +3: StateReplicate match=11 next=14 inflight=2 + +# The leader still observes that the entries are in-flight to the follower 3, +# since it hasn't heard from it. Nothing triggers updating the follower's +# commit index, so we have to wait up to the full heartbeat interval before +# the leader sends the commit index. +tick-heartbeat 1 +---- +ok + +# However, the leader does not push the real commit index to the follower 3. It +# cuts the commit index at the Progress.Match mark, because it thinks that it is +# unsafe to send a commit index higher than that. +process-ready 1 +---- +Ready MustSync=false: +Messages: +1->2 MsgHeartbeat Term:1 Log:0/0 Commit:13 +1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + +# Since the heartbeat message does not bump the follower's commit index, it will +# take another roundtrip with the leader to update it. As such, the total time +# it takes for the follower to learn the commit index is: +# +# delay = HeartbeatInterval + 3/2 * RTT +# +# This is suboptimal. It could have taken HeartbeatInterval + 1/2 * RTT, if the +# leader sent the up-to-date commit index in the heartbeat message. +# +# See https://github.com/etcd-io/raft/issues/138 which aims to fix this. +stabilize 1 3 +---- +> 3 receiving messages + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 receiving messages + 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/13 Commit:13 +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:13 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" + Messages: + 3->1 MsgAppResp Term:1 Log:0/13 +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/13 From ffbd5f78545ac5f26ad23f1071c1133cd2d99251 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 26 Jan 2024 03:08:18 +0000 Subject: [PATCH 3/3] raft: send up-to-date commit index in heartbeats TODO: describe why it is now safe Signed-off-by: Pavel Kalinnikov --- raft.go | 15 +++------------ raft_test.go | 13 ++----------- testdata/async_storage_writes_append_aba_race.txt | 14 +++++++------- testdata/lagging_commit.txt | 14 ++++++++------ testdata/replicate_pause.txt | 4 ++-- testdata/snapshot_succeed_via_app_resp.txt | 4 ++-- 6 files changed, 24 insertions(+), 40 deletions(-) diff --git a/raft.go b/raft.go index de58f014..85ffbdf5 100644 --- a/raft.go +++ b/raft.go @@ -672,21 +672,12 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { - // Attach the commit as min(to.matched, r.committed). - // When the leader sends out heartbeat message, - // the receiver(follower) might not be matched with the leader - // or it might not have all the committed entries. - // The leader MUST NOT forward the follower's commit to - // an unmatched index. - commit := min(r.trk.Progress[to].Match, r.raftLog.committed) - m := pb.Message{ + r.send(pb.Message{ To: to, Type: pb.MsgHeartbeat, - Commit: commit, + Commit: r.raftLog.committed, Context: ctx, - } - - r.send(m) + }) } // bcastAppend sends RPC, with entries to all peers that are not up-to-date diff --git a/raft_test.go b/raft_test.go index fd3739ae..c2e591d2 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2699,10 +2699,6 @@ func TestBcastBeat(t *testing.T) { if len(msgs) != 2 { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } - wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.trk.Progress[2].Match), - 3: min(sm.raftLog.committed, sm.trk.Progress[3].Match), - } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat) @@ -2713,13 +2709,8 @@ func TestBcastBeat(t *testing.T) { if m.LogTerm != 0 { t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0) } - if wantCommitMap[m.To] == 0 { - t.Fatalf("#%d: unexpected to %d", i, m.To) - } else { - if m.Commit != wantCommitMap[m.To] { - t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To]) - } - delete(wantCommitMap, m.To) + if m.Commit != sm.raftLog.committed { + t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, sm.raftLog.committed) } if len(m.Entries) != 0 { t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries)) diff --git a/testdata/async_storage_writes_append_aba_race.txt b/testdata/async_storage_writes_append_aba_race.txt index 4b68330f..01a23b58 100644 --- a/testdata/async_storage_writes_append_aba_race.txt +++ b/testdata/async_storage_writes_append_aba_race.txt @@ -346,16 +346,16 @@ process-ready 4 ---- Ready MustSync=false: Messages: -4->1 MsgHeartbeat Term:3 Log:0/0 -4->2 MsgHeartbeat Term:3 Log:0/0 -4->3 MsgHeartbeat Term:3 Log:0/0 -4->5 MsgHeartbeat Term:3 Log:0/0 -4->6 MsgHeartbeat Term:3 Log:0/0 -4->7 MsgHeartbeat Term:3 Log:0/0 +4->1 MsgHeartbeat Term:3 Log:0/0 Commit:11 +4->2 MsgHeartbeat Term:3 Log:0/0 Commit:11 +4->3 MsgHeartbeat Term:3 Log:0/0 Commit:11 +4->5 MsgHeartbeat Term:3 Log:0/0 Commit:11 +4->6 MsgHeartbeat Term:3 Log:0/0 Commit:11 +4->7 MsgHeartbeat Term:3 Log:0/0 Commit:11 deliver-msgs 1 ---- -4->1 MsgHeartbeat Term:3 Log:0/0 +4->1 MsgHeartbeat Term:3 Log:0/0 Commit:11 INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3] INFO 1 became follower at term 3 diff --git a/testdata/lagging_commit.txt b/testdata/lagging_commit.txt index 8f8ba336..5ad33ae4 100644 --- a/testdata/lagging_commit.txt +++ b/testdata/lagging_commit.txt @@ -134,7 +134,7 @@ process-ready 1 Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:13 -1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 +1->3 MsgHeartbeat Term:1 Log:0/0 Commit:13 # Since the heartbeat message does not bump the follower's commit index, it will # take another roundtrip with the leader to update it. As such, the total time @@ -146,12 +146,18 @@ Messages: # leader sent the up-to-date commit index in the heartbeat message. # # See https://github.com/etcd-io/raft/issues/138 which aims to fix this. +# +# Now this is fixed! stabilize 1 3 ---- > 3 receiving messages - 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:13 > 3 handling Ready Ready MustSync=false: + HardState Term:1 Vote:1 Commit:13 + CommittedEntries: + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 > 1 receiving messages @@ -164,10 +170,6 @@ stabilize 1 3 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 3 handling Ready Ready MustSync=false: - HardState Term:1 Vote:1 Commit:13 - CommittedEntries: - 1/12 EntryNormal "data1" - 1/13 EntryNormal "data2" Messages: 3->1 MsgAppResp Term:1 Log:0/13 > 1 receiving messages diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index d9cee59f..0d4936b8 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -129,14 +129,14 @@ stabilize 1 Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 - 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:17 stabilize 2 3 ---- > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 > 3 receiving messages - 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:17 > 2 handling Ready Ready MustSync=false: Messages: diff --git a/testdata/snapshot_succeed_via_app_resp.txt b/testdata/snapshot_succeed_via_app_resp.txt index 80ed3646..17b1887e 100644 --- a/testdata/snapshot_succeed_via_app_resp.txt +++ b/testdata/snapshot_succeed_via_app_resp.txt @@ -64,14 +64,14 @@ process-ready 1 Ready MustSync=false: Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 -1->3 MsgHeartbeat Term:1 Log:0/0 +1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 # Iterate until no more work is done by the new peer. It receives the heartbeat # and responds. stabilize 3 ---- > 3 receiving messages - 1->3 MsgHeartbeat Term:1 Log:0/0 + 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1] INFO 3 became follower at term 1 > 3 handling Ready