Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not rename files on mmap failure #25340

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
wal.syncDelay = time.Duration(opt.Config.WALFsyncDelay)
}

fs := NewFileStore(path, etags)
fs := NewFileStore(path, etags, WithMadviseWillNeed(opt.Config.TSMWillNeed))
fs.openLimiter = opt.OpenLimiter
if opt.FileStoreObserver != nil {
fs.WithObserver(opt.FileStoreObserver)
}
fs.tsmMMAPWillNeed = opt.Config.TSMWillNeed

cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), etags)

Expand Down
43 changes: 27 additions & 16 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,8 @@ type FileStore struct {
currentGeneration int
dir string

files []TSMFile
tsmMMAPWillNeed bool // If true then the kernel will be advised MMAP_WILLNEED for TSM files.
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.
files []TSMFile
openLimiter limiter.Fixed // limit the number of concurrent opening TSM files.

logger *zap.Logger // Logger to be used for important messages
traceLogger *zap.Logger // Logger to be used when trace-logging is on.
Expand All @@ -198,6 +197,8 @@ type FileStore struct {
// newReaderBlockCount keeps track of the current new reader block requests.
// If non-zero, no new TSMReader objects may be created.
newReaderBlockCount int

readerOptions []tsmReaderOption
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -234,7 +235,7 @@ func (f FileStat) ContainsKey(key []byte) bool {
}

// NewFileStore returns a new instance of FileStore based on the given directory.
func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) *FileStore {
logger := zap.NewNop()
fs := &FileStore{
dir: dir,
Expand All @@ -250,6 +251,7 @@ func NewFileStore(dir string, tags tsdb.EngineTags) *FileStore {
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
copyFiles: runtime.GOOS == "windows",
readerOptions: options,
}
fs.purger.fileStore = fs
return fs
Expand Down Expand Up @@ -616,28 +618,37 @@ func (f *FileStore) Open(ctx context.Context) error {
defer f.openLimiter.Release()

start := time.Now()
df, err := NewTSMReader(file, WithMadviseWillNeed(f.tsmMMAPWillNeed))
df, err := NewTSMReader(file, f.readerOptions...)
f.logger.Info("Opened file",
zap.String("path", file.Name()),
zap.Int("id", idx),
zap.Duration("duration", time.Since(start)))

// If we are unable to read a TSM file then log the error, rename
// the file, and continue loading the shard without it.
// If we are unable to read a TSM file then log the error.
if err != nil {
if cerr := file.Close(); cerr != nil {
f.logger.Error("Error closing TSM file after error", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(cerr))
}
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %w", file.Name(), e)}
if errors.Is(err, MmapError{}) {
// An MmapError may indicate we have insufficient
// handles for the mmap call, in which case the file should
// be left untouched, and the vm.max_map_count be raised.
f.logger.Error("Cannot read TSM file, system limit for vm.max_map_count may be too low",
zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
readerC <- &res{r: df, err: fmt.Errorf("cannot read file %s, system limit for vm.max_map_count may be too low: %v", file.Name(), err)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a %w here instead of %v

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The master-1.x code uses %v also. My preference would be to get this PR merged and backported to 2.7, then do a PR to fix this in master-1.x and propagate it through the branches.

return
} else {
// If the file is corrupt, rename it and
// continue loading the shard without it.
f.logger.Error("Cannot read corrupt tsm file, renaming", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(err))
if e := os.Rename(file.Name(), file.Name()+"."+BadTSMFileExtension); e != nil {
f.logger.Error("Cannot rename corrupt tsm file", zap.String("path", file.Name()), zap.Int("id", idx), zap.Error(e))
readerC <- &res{r: df, err: fmt.Errorf("cannot rename corrupt file %s: %v", file.Name(), e)}
return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %v", file.Name(), err)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%w instead of %v

return
}
readerC <- &res{r: df, err: fmt.Errorf("cannot read corrupt file %s: %w", file.Name(), err)}
return
}
df.WithObserver(f.obs)
readerC <- &res{r: df}
Expand Down Expand Up @@ -920,7 +931,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF
}
}

tsm, err := NewTSMReader(fd, WithMadviseWillNeed(f.tsmMMAPWillNeed))
tsm, err := NewTSMReader(fd, f.readerOptions...)
if err != nil {
if newName != oldName {
if err1 := os.Rename(newName, oldName); err1 != nil {
Expand Down
114 changes: 114 additions & 0 deletions tsdb/engine/tsm1/file_store_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package tsm1

import (
"github.com/influxdata/influxdb/v2/tsdb"
)

var TestMmapInitFailOption = func(err error) tsmReaderOption {
return func(r *TSMReader) {
r.accessor = &badBlockAccessor{error: err}
}
}

type badBlockAccessor struct {
error
initCalled bool
}

func (b *badBlockAccessor) init() (*indirectIndex, error) {
b.initCalled = true
return nil, b.error
}

func (b *badBlockAccessor) read(key []byte, timestamp int64) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readAll(key []byte) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBlock(entry *IndexEntry, values []Value) ([]Value, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readFloatBlock(entry *IndexEntry, values *[]FloatValue) ([]FloatValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readFloatArrayBlock(entry *IndexEntry, values *tsdb.FloatArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readIntegerBlock(entry *IndexEntry, values *[]IntegerValue) ([]IntegerValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readIntegerArrayBlock(entry *IndexEntry, values *tsdb.IntegerArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readUnsignedBlock(entry *IndexEntry, values *[]UnsignedValue) ([]UnsignedValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readUnsignedArrayBlock(entry *IndexEntry, values *tsdb.UnsignedArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readStringBlock(entry *IndexEntry, values *[]StringValue) ([]StringValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readStringArrayBlock(entry *IndexEntry, values *tsdb.StringArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBooleanBlock(entry *IndexEntry, values *[]BooleanValue) ([]BooleanValue, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBooleanArrayBlock(entry *IndexEntry, values *tsdb.BooleanArray) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) readBytes(entry *IndexEntry, buf []byte) (uint32, []byte, error) {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) rename(path string) error {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) path() string {
//TODO implement me
panic("implement me")
}

func (b *badBlockAccessor) close() error {
if !b.initCalled {
panic("close called without an init call")
}
b.initCalled = false
return nil
}

func (b *badBlockAccessor) free() error {
//TODO implement me
panic("implement me")
}
37 changes: 37 additions & 0 deletions tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -2412,6 +2413,42 @@ func TestFileStore_Open(t *testing.T) {
}
}

func TestFileStore_OpenFail(t *testing.T) {
var err error
dir := t.TempDir()

// Create a TSM file...
data := keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}}

files, err := newFileDir(t, dir, data)
if err != nil {
fatal(t, "creating test files", err)
}
assert.Equal(t, 1, len(files))
f := files[0]

const mmapErrMsg = "mmap failure in test"
const fullMmapErrMsg = "system limit for vm.max_map_count may be too low: " + mmapErrMsg
// With an mmap failure, the files should all be left where they are, because they are not corrupt
openFail(t, dir, fullMmapErrMsg, tsm1.NewMmapError(fmt.Errorf(mmapErrMsg)))
assert.FileExistsf(t, f, "file not found, but should not have been moved for mmap failure")

// With a non-mmap failure, the file failing to open should be moved aside
const otherErrMsg = "some Random Init Failure"
openFail(t, dir, otherErrMsg, fmt.Errorf(otherErrMsg))
assert.NoFileExistsf(t, f, "file found, but should have been moved for open failure")
assert.FileExistsf(t, f+"."+tsm1.BadTSMFileExtension, "file not found, but should have been moved here for open failure")
}

func openFail(t *testing.T, dir string, fullErrMsg string, initErr error) {
fs := tsm1.NewFileStore(dir, tsdb.EngineTags{}, tsm1.TestMmapInitFailOption(initErr))
err := fs.Open(context.Background())
assert.Error(t, err)
assert.Contains(t, err.Error(), fullErrMsg)
defer func() { assert.NoError(t, fs.Close(), "unexpected error on FileStore.Close") }()
assert.Equal(t, 0, fs.Count(), "file count mismatch")
}

func TestFileStore_Remove(t *testing.T) {
dir := t.TempDir()

Expand Down
36 changes: 30 additions & 6 deletions tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -218,6 +219,7 @@ var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption {
}
}

// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure
// NewTSMReader returns a new TSMReader from the given file.
func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
t := &TSMReader{}
Expand All @@ -231,15 +233,17 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) {
}
t.size = stat.Size()
t.lastModified = stat.ModTime().UnixNano()
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
if t.accessor == nil {
t.accessor = &mmapAccessor{
f: f,
mmapWillNeed: t.madviseWillNeed,
}
}

index, err := t.accessor.init()
if err != nil {
_ = t.accessor.close()
return nil, err
cerr := t.accessor.close()
return nil, errors.Join(err, cerr)
}

t.index = index
Expand Down Expand Up @@ -1314,6 +1318,24 @@ type mmapAccessor struct {
index *indirectIndex
}

type MmapError struct {
error
}

func (m *MmapError) Unwrap() error {
return m.error
}

func (m MmapError) Is(e error) bool {
_, oks := e.(MmapError)
_, okp := e.(*MmapError)
return oks || okp
}

func NewMmapError(e error) MmapError {
return MmapError{error: e}
}

func (m *mmapAccessor) init() (*indirectIndex, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -1335,7 +1357,9 @@ func (m *mmapAccessor) init() (*indirectIndex, error) {

m.b, err = mmap(m.f, 0, int(stat.Size()))
if err != nil {
return nil, err
// Wrap the error to let callers know this was an error
// from mmap, and may indicate vm.max_map_count is too low
return nil, NewMmapError(err)
}
if len(m.b) < 8 {
return nil, fmt.Errorf("mmapAccessor: byte slice too small for indirectIndex")
Expand Down