Skip to content

Commit

Permalink
Added a flag to indicate snapshot store and migrated file snapshots t…
Browse files Browse the repository at this point in the history
…o bbolt when necessary
  • Loading branch information
wraymo committed May 3, 2022
1 parent bf46eed commit c0c14b0
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 36 deletions.
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type Config struct {
TLSConfig *tls.Config
// RaftConfig provides any necessary configuration for the Raft server.
RaftConfig *raft.Config
// When UseBoltSnapshot is set, use bbolt as snapshot store instead of FileSnapshot
UseBoltSnapshot bool
}
5 changes: 3 additions & 2 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func NewHRaftDispatcherWithLogger(config *Config, logger *zap.Logger) (*HRaftDis
MaxPool: 5,
Logger: nil,
},
Enforcer: config.Enforcer,
RaftConfig: config.RaftConfig,
Enforcer: config.Enforcer,
RaftConfig: config.RaftConfig,
UseBoltSnapshot: config.UseBoltSnapshot,
}
s, err := store.NewStore(logger, storeConfig)
if err != nil {
Expand Down
54 changes: 29 additions & 25 deletions store/boltstore/bolt_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,29 +424,52 @@ func (s *BoltSnapshotSink) Write(b []byte) (int, error) {

// Close is used to indicate a successful end.
func (s *BoltSnapshotSink) Close() error {
tx, err := s.boltStore.conn.Begin(true)
return s.boltStore.PutSnapshot(s.meta, s.contents.Bytes())
}

// Implement the sort interface for []*SnapshotMeta.
func (s snapMetaSlice) Len() int {
return len(s)
}

func (s snapMetaSlice) Less(i, j int) bool {
if s[i].Term != s[j].Term {
return s[i].Term < s[j].Term
}
if s[i].Index != s[j].Index {
return s[i].Index < s[j].Index
}
return s[i].ID < s[j].ID
}

func (s snapMetaSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (b *BoltStore) PutSnapshot(meta raft.SnapshotMeta, state []byte) error {
tx, err := b.conn.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()

metaBucket := tx.Bucket(dbSnapMeta)
buf, err := encodeMsgPack(s.meta)
buf, err := encodeMsgPack(meta)
if err != nil {
return err
}
if err = metaBucket.Put([]byte(s.meta.ID), buf.Bytes()); err != nil {
if err = metaBucket.Put([]byte(meta.ID), buf.Bytes()); err != nil {
return err
}

dataBucket := tx.Bucket(dbSnapData)
if err = dataBucket.Put([]byte(s.meta.ID), s.contents.Bytes()); err != nil {
if err = dataBucket.Put([]byte(meta.ID), state); err != nil {
return err
}

// reap any snapshots beyond the retain count.
snapMeta := s.boltStore.getSnapshot(tx)
for i := s.boltStore.retain; i < len(snapMeta); i++ {
snapMeta := b.getSnapshot(tx)
for i := b.retain; i < len(snapMeta); i++ {
if err := metaBucket.Delete([]byte(snapMeta[i].ID)); err != nil {
return err
}
Expand All @@ -459,25 +482,6 @@ func (s *BoltSnapshotSink) Close() error {
return tx.Commit()
}

// Implement the sort interface for []*SnapshotMeta.
func (s snapMetaSlice) Len() int {
return len(s)
}

func (s snapMetaSlice) Less(i, j int) bool {
if s[i].Term != s[j].Term {
return s[i].Term < s[j].Term
}
if s[i].Index != s[j].Index {
return s[i].Index < s[j].Index
}
return s[i].ID < s[j].ID
}

func (s snapMetaSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// Sync performs an fsync on the database file handle. This is not necessary
// under normal operation unless NoSync is enabled, in which this forces the
// database file to sync against the disk.
Expand Down
93 changes: 91 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package store

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/casbin/hraft-dispatcher/store/boltstore"
Expand Down Expand Up @@ -47,6 +51,7 @@ type Store struct {
stableStore raft.StableStore
fms raft.FSM
boltStore *boltstore.BoltStore
useBoltSnapshot bool

enforcer casbin.IDistributedEnforcer

Expand All @@ -62,6 +67,7 @@ type Config struct {
NetworkTransportConfig *raft.NetworkTransportConfig
Enforcer casbin.IDistributedEnforcer
RaftConfig *raft.Config
UseBoltSnapshot bool
}

// NewStore return a instance of Store.
Expand All @@ -73,6 +79,7 @@ func NewStore(logger *zap.Logger, config *Config) (*Store, error) {
networkTransportConfig: config.NetworkTransportConfig,
enforcer: config.Enforcer,
raftConfig: config.RaftConfig,
useBoltSnapshot: config.UseBoltSnapshot,
}

return s, nil
Expand Down Expand Up @@ -116,11 +123,25 @@ func (s *Store) Start(enableBootstrap bool) error {
s.logger.Error("failed to new bolt store", zap.Error(err), zap.String("path", dbPath))
return err
}

s.boltStore = boltDB
s.logStore = boltDB
s.snapshotStore = boltDB
s.stableStore = boltDB

if s.useBoltSnapshot {
// Try to migrate previous file snapshots to bbolt if necessary
if err := s.checkAndMigrateFileSnapshot(); err != nil {
s.logger.Error("failed to migrate previous file snapshots to bbolt")
return err
}
s.snapshotStore = boltDB
} else {
fileSnapshots, err := raft.NewFileSnapshotStore(s.dataDir, retainSnapshotCount, os.Stderr)
if err != nil {
s.logger.Error("failed to new file snapshot store", zap.Error(err), zap.String("raftData", s.dataDir))
return err
}
s.snapshotStore = fileSnapshots
}
}

fsm, err := NewFSM(s.logger, s.dataDir, s.enforcer)
Expand Down Expand Up @@ -230,6 +251,74 @@ func (s *Store) DataDir() string {
return s.dataDir
}

func (s *Store) checkAndMigrateFileSnapshot() error {
const (
snapPath = "snapshots"
metaFilePath = "meta.json"
stateFilePath = "state.bin"
tmpSuffix = ".tmp"
)

// Get snapshot directory
snapDir := filepath.Join(s.dataDir, snapPath)
if stat, err := os.Stat(snapDir); err == nil && stat.IsDir() {
snapshots, err := ioutil.ReadDir(snapDir)
if err != nil {
return err
}

for _, snap := range snapshots {
// Ignore any files
if !snap.IsDir() {
continue
}

// Ignore any temporary snapshots
dirName := snap.Name()
if strings.HasSuffix(dirName, tmpSuffix) {
continue
}

metaPath := filepath.Join(snapDir, dirName, metaFilePath)
fh, err := os.Open(metaPath)
if err != nil {
continue
}

// Read the metadata of the snapshot
buffered := bufio.NewReader(fh)
meta := &raft.SnapshotMeta{}
dec := json.NewDecoder(buffered)
if err := dec.Decode(meta); err != nil {
fh.Close()
continue
}
fh.Close()

// Make sure we can understand this version.
if meta.Version < raft.SnapshotVersionMin || meta.Version > raft.SnapshotVersionMax {
continue
}

// Read the contents of the snapshot
statePath := filepath.Join(snapDir, dirName, stateFilePath)
state, err := ioutil.ReadFile(statePath)
if err != nil {
continue
}

// Put FileSnapshot into bbolt
if err := s.boltStore.PutSnapshot(*meta, state); err != nil {
continue
}
}

os.RemoveAll(snapDir)
}

return nil
}

// applyProtoMessage applies a proto message.
func (s *Store) applyProtoMessage(m proto.Message) error {
cmd, err := proto.Marshal(m)
Expand Down
91 changes: 84 additions & 7 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/x509"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -134,7 +136,7 @@ func TestStore_SingleNode(t *testing.T) {
raftID := "node-leader"
raftAddress := GetLocalIP() + ":6790"

store, err := newStore(enforcer, raftID, raftAddress, true)
store, err := newStore("", enforcer, raftID, raftAddress, true, false)
assert.NoError(t, err)
defer store.Stop()
defer os.RemoveAll(store.DataDir())
Expand Down Expand Up @@ -267,14 +269,14 @@ func TestStore_MultipleNode(t *testing.T) {
leaderID := "node-leader"
followerID := "node-follower"

leaderStore, err := newStore(leaderEnforcer, leaderID, leaderAddress, true)
leaderStore, err := newStore("", leaderEnforcer, leaderID, leaderAddress, true, false)
assert.NoError(t, err)
defer leaderStore.Stop()

err = leaderStore.WaitLeader()
assert.NoError(t, err)

followerStore, err := newStore(followerEnforcer, followerID, followerAddress, false)
followerStore, err := newStore("", followerEnforcer, followerID, followerAddress, false, false)
assert.NoError(t, err)
defer followerStore.Stop()

Expand Down Expand Up @@ -423,10 +425,84 @@ func TestStore_MultipleNode(t *testing.T) {
})
}

func newStore(enforcer casbin.IDistributedEnforcer, id string, address string, enableBootstrap bool) (*Store, error) {
func TestStore_MigrateFileSnapshot(t *testing.T) {
dir, err := ioutil.TempDir("", "casbin-hraft-")
if err != nil {
return nil, err
assert.NoError(t, err)

fileSnapshot, err := raft.NewFileSnapshotStore(dir, retainSnapshotCount, os.Stderr)
assert.NoError(t, err)
_, trans := raft.NewInmemTransport(raft.NewInmemAddr())

ctl := gomock.NewController(t)
defer ctl.Finish()

enforcer := mocks.NewMockIDistributedEnforcer(ctl)
p, err := NewPolicyOperator(zap.NewExample(), dir, enforcer)
assert.NoError(t, err)

enforcer.EXPECT().AddPoliciesSelf(nil, "p", "p", [][]string{{"role:admin", "/", "*"}, {"role:user", "/", "GET"}}).Return([][]string{{"role:admin", "/", "*"}, {"role:user", "/", "GET"}}, nil)
err = p.AddPolicies("p", "p", [][]string{{"role:admin", "/", "*"}, {"role:user", "/", "GET"}})
assert.NoError(t, err)
b, err := p.Backup()
assert.NoError(t, err)
size := len(b)
p.db.Close()

for i := 0; i < 3; i++ {
var sink raft.SnapshotSink
sink, err = fileSnapshot.Create(1, uint64(i), 3, raft.Configuration{}, 0, trans)
assert.NoError(t, err)
_, err := sink.Write(b)
assert.NoError(t, err)
err = sink.Close()
assert.NoError(t, err)
}

snaps, err := fileSnapshot.List()
assert.NoError(t, err)
assert.Equal(t, retainSnapshotCount, len(snaps))

raftID := "node-leader"
raftAddress := GetLocalIP() + ":6790"

enforcer.EXPECT().ClearPolicySelf(nil)
enforcer.EXPECT().AddPoliciesSelf(nil, "p", "p", [][]string{{"role:admin", "/", "*"}})
enforcer.EXPECT().AddPoliciesSelf(nil, "p", "p", [][]string{{"role:user", "/", "GET"}})

// Create store with bbolt snapshots
store, err := newStore(dir, enforcer, raftID, raftAddress, true, true)
assert.NoError(t, err)
defer store.Stop()
defer os.RemoveAll(store.DataDir())

// Make sure file snapshot directory does not exist
const snapPath = "snapshots"
snapDir := filepath.Join(dir, snapPath)
_, err = os.Stat(snapDir)
assert.Error(t, err)

// Make sure the data is consistent
snaps, err = store.snapshotStore.List()
assert.NoError(t, err)
assert.Equal(t, retainSnapshotCount, len(snaps))

for k, v := range snaps {
assert.Equal(t, uint64(2-k), v.Index)
assert.Equal(t, uint64(3), v.Term)
assert.Equal(t, true, reflect.DeepEqual(v.Configuration, raft.Configuration{}))
assert.Equal(t, uint64(0), v.ConfigurationIndex)
assert.Equal(t, int64(size), v.Size)
}
}

func newStore(dataDir string, enforcer casbin.IDistributedEnforcer, id string, address string, enableBootstrap, useBoltSnapshot bool) (*Store, error) {
dir := dataDir
if dir == "" {
var err error
dir, err = ioutil.TempDir("", "casbin-hraft-")
if err != nil {
return nil, err
}
}

tlsConfig, err := GetTLSConfig()
Expand All @@ -453,7 +529,8 @@ func newStore(enforcer casbin.IDistributedEnforcer, id string, address string, e
MaxPool: 5,
Timeout: 10 * time.Second,
},
Enforcer: enforcer,
Enforcer: enforcer,
UseBoltSnapshot: useBoltSnapshot,
})
if err != nil {
return nil, err
Expand Down

0 comments on commit c0c14b0

Please sign in to comment.