diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index d397612af9c..7777cff61fe 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -92,6 +92,9 @@ type raftNode struct { // a chan to send out readState readStateC chan raft.ReadState + // keep track of snapshots being created + snapshotTracker SnapshotTracker + // utility ticker *time.Ticker // contention detectors for raft heartbeat message @@ -136,12 +139,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { raftNodeConfig: cfg, // set up contention detectors for raft heartbeat message. // expect to send a heartbeat within 2 heartbeat intervals. - td: contention.NewTimeoutDetector(2 * cfg.heartbeat), - readStateC: make(chan raft.ReadState, 1), - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - applyc: make(chan toApply), - stopped: make(chan struct{}), - done: make(chan struct{}), + td: contention.NewTimeoutDetector(2 * cfg.heartbeat), + readStateC: make(chan raft.ReadState, 1), + snapshotTracker: *newSnapshotTracker(), + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + applyc: make(chan toApply), + stopped: make(chan struct{}), + done: make(chan struct{}), } if r.heartbeat == 0 { r.ticker = &time.Ticker{} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a6f25548ca1..c061cae2c95 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -17,6 +17,7 @@ package etcdserver import ( "context" "encoding/json" + goerrors "errors" "expvar" "fmt" "math" @@ -963,6 +964,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { <-apply.notifyc s.triggerSnapshot(ep) + s.compactRaftLog(ep.appliedi) select { // snapshot requested via send() case m := <-s.r.msgSnapC: @@ -2126,7 +2128,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // the go routine created below. s.KV().Commit() + s.r.snapshotTracker.Track(snapi) + s.GoAttach(func() { + defer func() { + s.r.snapshotTracker.UnTrack(snapi) + }() + lg := s.Logger() // For backward compatibility, generate v2 snapshot from v3 state. @@ -2154,28 +2162,44 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), ) + }) +} - // When sending a snapshot, etcd will pause compaction. - // After receives a snapshot, the slow follower needs to get all the entries right after - // the snapshot sent to catch up. If we do not pause compaction, the log entries right after - // the snapshot sent might already be compacted. It happens when the snapshot takes long time - // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. - if atomic.LoadInt64(&s.inflightSnapshots) != 0 { - lg.Info("skip compaction since there is an inflight snapshot") - return - } +func (s *EtcdServer) compactRaftLog(appliedi uint64) { + lg := s.Logger() - // keep some in memory log entries for slow followers. - compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries - } + // When sending a snapshot, etcd will pause compaction. + // After receives a snapshot, the slow follower needs to get all the entries right after + // the snapshot sent to catch up. If we do not pause compaction, the log entries right after + // the snapshot sent might already be compacted. It happens when the snapshot takes long time + // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. + if atomic.LoadInt64(&s.inflightSnapshots) != 0 { + lg.Info("skip compaction since there is an inflight snapshot") + return + } + + // keep some in memory log entries for slow followers. + compacti := uint64(0) + if appliedi > s.Cfg.SnapshotCatchUpEntries { + compacti = appliedi - s.Cfg.SnapshotCatchUpEntries + } + + // if there are snapshots being created, compact the raft log up to the minimum snapshot index. + if minSpani, err := s.r.snapshotTracker.MinSnapi(); err == nil && minSpani < appliedi && minSpani > s.Cfg.SnapshotCatchUpEntries { + compacti = minSpani - s.Cfg.SnapshotCatchUpEntries + } - err = s.r.raftStorage.Compact(compacti) + // no need to compact if compacti == 0 + if compacti == 0 { + return + } + + s.GoAttach(func() { + err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. // raft log might already been compact. - if err == raft.ErrCompacted { + if goerrors.Is(err, raft.ErrCompacted) { return } lg.Panic("failed to compact", zap.Error(err)) diff --git a/server/etcdserver/snapshot_tracker.go b/server/etcdserver/snapshot_tracker.go new file mode 100644 index 00000000000..271ab53999e --- /dev/null +++ b/server/etcdserver/snapshot_tracker.go @@ -0,0 +1,95 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "cmp" + "container/heap" + "errors" + "sync" +) + +// SnapshotTracker keeps track of all ongoing snapshot creation. To safeguard ongoing snapshot creation, +// only compact the raft log up to the minimum snapshot index in the track. +type SnapshotTracker struct { + h minHeap[uint64] + mu *sync.Mutex +} + +func newSnapshotTracker() *SnapshotTracker { + return &SnapshotTracker{ + h: minHeap[uint64]{}, + mu: new(sync.Mutex), + } +} + +// MinSnapi returns the minimum snapshot index in the track or an error if the tracker is empty. +func (st *SnapshotTracker) MinSnapi() (uint64, error) { + st.mu.Lock() + defer st.mu.Unlock() + if st.h.Len() == 0 { + return 0, errors.New("SnapshotTracker is empty") + } + return st.h[0], nil +} + +// Track adds a snapi to the tracker. Make sure to call UnTrack once the snapshot has been created. +func (st *SnapshotTracker) Track(snapi uint64) { + st.mu.Lock() + defer st.mu.Unlock() + heap.Push(&st.h, snapi) +} + +// UnTrack removes 'snapi' from the tracker. No action taken if 'snapi' is not found. +func (st *SnapshotTracker) UnTrack(snapi uint64) { + st.mu.Lock() + defer st.mu.Unlock() + + for i := 0; i < len((*st).h); i++ { + if (*st).h[i] == snapi { + heap.Remove(&st.h, i) + return + } + } +} + +// minHeap implements the heap.Interface for E. +type minHeap[E interface { + cmp.Ordered +}] []E + +func (h minHeap[_]) Len() int { + return len(h) +} + +func (h minHeap[_]) Less(i, j int) bool { + return h[i] < h[j] +} + +func (h minHeap[_]) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *minHeap[E]) Push(x any) { + *h = append(*h, x.(E)) +} + +func (h *minHeap[E]) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/server/etcdserver/snapshot_tracker_test.go b/server/etcdserver/snapshot_tracker_test.go new file mode 100644 index 00000000000..098c518e51d --- /dev/null +++ b/server/etcdserver/snapshot_tracker_test.go @@ -0,0 +1,138 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "container/heap" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSnapTracker_MinSnapi(t *testing.T) { + st := *newSnapshotTracker() + + _, err := st.MinSnapi() + assert.NotNil(t, err, "SnapshotTracker should be empty initially") + + st.Track(10) + minSnapi, err := st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the only tracked snapshot index") + + st.Track(5) + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(5), minSnapi, "MinSnapi should return the minimum tracked snapshot index") + + st.UnTrack(5) + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the remaining tracked snapshot index") +} + +func TestSnapTracker_Track(t *testing.T) { + st := *newSnapshotTracker() + st.Track(20) + st.Track(10) + st.Track(15) + + assert.Equal(t, 3, st.h.Len(), "SnapshotTracker should have 3 snapshots tracked") + + minSnapi, err := st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the minimum tracked snapshot index") +} + +func TestSnapTracker_UnTrack(t *testing.T) { + st := *newSnapshotTracker() + st.Track(20) + st.Track(30) + st.Track(40) + // track another snapshot with the same index + st.Track(20) + + st.UnTrack(30) + assert.Equal(t, 3, st.h.Len()) + + minSnapi, err := st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(20), minSnapi) + + st.UnTrack(20) + assert.Equal(t, 2, st.h.Len()) + + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(20), minSnapi) + + st.UnTrack(20) + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(40), minSnapi) + + st.UnTrack(40) + _, err = st.MinSnapi() + assert.NotNil(t, err) +} + +func newMinHeap(elements ...uint64) minHeap[uint64] { + h := minHeap[uint64](elements) + heap.Init(&h) + return h +} + +func TestMinHeapLen(t *testing.T) { + h := newMinHeap(3, 2, 1) + assert.Equal(t, 3, h.Len()) +} + +func TestMinHeapLess(t *testing.T) { + h := newMinHeap(3, 2, 1) + assert.True(t, h.Less(0, 1)) +} + +func TestMinHeapSwap(t *testing.T) { + h := newMinHeap(3, 2, 1) + h.Swap(0, 1) + assert.Equal(t, uint64(2), h[0]) + assert.Equal(t, uint64(1), h[1]) + assert.Equal(t, uint64(3), h[2]) +} + +func TestMinHeapPushPop(t *testing.T) { + h := newMinHeap(3, 2) + heap.Push(&h, uint64(1)) + assert.Equal(t, 3, h.Len()) + + got := heap.Pop(&h).(uint64) + assert.Equal(t, uint64(1), got) +} + +func TestMinHeapEmpty(t *testing.T) { + h := minHeap[uint64]{} + assert.Equal(t, 0, h.Len()) +} + +func TestMinHeapSingleElement(t *testing.T) { + h := newMinHeap(uint64(1)) + assert.Equal(t, 1, h.Len()) + + heap.Push(&h, uint64(2)) + assert.Equal(t, 2, h.Len()) + + got := heap.Pop(&h) + assert.Equal(t, uint64(1), got) +}