Skip to content

Commit

Permalink
db: allow excises to unconditionally be flushable ingests
Browse files Browse the repository at this point in the history
Previously, we'd only allow IngestAndExcise to become flushable
ingests if we had a guarantee from the caller that the ingestion
contained rangedel/rangekeydel overlapping the excise span. This
change reworks flushable ingests to return rangedels/rangekeydels
from their own iterators, even if one does not exist in the sstables
themselves. This, plus the update to the WAL to be able to persist
the excise span, allows for IngestAndExcise to allow flushable
ingests in more cases than before.

Also resolves TODOs in replayWAL() around properly sequencing
flushable ingests so their sstables can go in levels lower than
L0. This sequencing was necessary to make excises work correctly;
as excises require an up-to-date view of the version to work
correctly.

Fixes #2676.
  • Loading branch information
itsbilal committed Sep 25, 2024
1 parent b34a393 commit d73ab80
Show file tree
Hide file tree
Showing 28 changed files with 515 additions and 161 deletions.
30 changes: 29 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,12 @@ func (b *Batch) refreshMemTableSize() error {
}
// This key kind doesn't contribute to the memtable size.
continue
case InternalKeyKindExcise:
if b.minimumFormatMajorVersion < FormatFlushableIngestExcises {
b.minimumFormatMajorVersion = FormatFlushableIngestExcises
}
// This key kind doesn't contribute to the memtable size.
continue
default:
// Note In some circumstances this might be temporary memory
// corruption that can be recovered by discarding the batch and
Expand Down Expand Up @@ -608,7 +614,7 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
b.countRangeDels++
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.countRangeKeys++
case InternalKeyKindIngestSST:
case InternalKeyKindIngestSST, InternalKeyKindExcise:
panic("pebble: invalid key kind for batch")
case InternalKeyKindLogData:
// LogData does not contribute to memtable size.
Expand Down Expand Up @@ -1193,6 +1199,28 @@ func (b *Batch) ingestSST(fileNum base.FileNum) {
b.minimumFormatMajorVersion = FormatFlushableIngest
}

// Excise adds the excise span for a flushable ingest containing an excise. The data
// will only be written to the WAL (not added to memtables or sstables).
func (b *Batch) excise(start, end []byte) {
if b.Empty() {
b.ingestedSSTBatch = true
} else if !b.ingestedSSTBatch {
// Batch contains other key kinds.
panic("pebble: invalid call to excise")
}

origMemTableSize := b.memTableSize
b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise)
copy(b.deferredOp.Key, start)
copy(b.deferredOp.Value, end)
// Since excise writes only to the WAL and does not affect the memtable,
// we restore b.memTableSize to its original value. Note that Batch.count
// is not reset because for the InternalKeyKindIngestSST/Excise the count
// is the number of sstable paths which have been added to the batch.
b.memTableSize = origMemTableSize
b.minimumFormatMajorVersion = FormatFlushableIngestExcises
}

// Empty returns true if the batch is empty, and false otherwise.
func (b *Batch) Empty() bool {
return batchrepr.IsEmpty(b.data)
Expand Down
2 changes: 1 addition & 1 deletion batchrepr/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, o
switch kind {
case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete,
base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete,
base.InternalKeyKindDeleteSized:
base.InternalKeyKindDeleteSized, base.InternalKeyKindExcise:
*r, value, ok = DecodeStr(*r)
if !ok {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
Expand Down
11 changes: 7 additions & 4 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,6 @@ func (d *DB) waitTableStats() {

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
var sstContainsExciseTombstone bool
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
Expand All @@ -1345,14 +1344,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
case "contains-excise-tombstone":
sstContainsExciseTombstone = true
case "no-wait":
// Handled by callers.
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan); err != nil {
return err
}
return nil
Expand All @@ -1361,6 +1360,10 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths := make([]string, 0, len(td.CmdArgs))
for _, arg := range td.CmdArgs {
if arg.Key == "no-wait" {
// Handled by callers.
continue
}
paths = append(paths, arg.String())
}

Expand Down
58 changes: 52 additions & 6 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ type ingestedFlushable struct {
hasRangeKeys bool
// exciseSpan is populated if an excise operation should be performed during
// flush.
exciseSpan KeyRange
exciseSpan KeyRange
exciseSeqNum base.SeqNum
}

func newIngestedFlushable(
Expand All @@ -172,6 +173,7 @@ func newIngestedFlushable(
newIters tableNewIters,
newRangeKeyIters keyspanimpl.TableNewSpanIter,
exciseSpan KeyRange,
seqNum base.SeqNum,
) *ingestedFlushable {
if invariants.Enabled {
for i := 1; i < len(files); i++ {
Expand Down Expand Up @@ -200,6 +202,7 @@ func newIngestedFlushable(
slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
hasRangeKeys: hasRangeKeys,
exciseSpan: exciseSpan,
exciseSeqNum: seqNum,
}

return ret
Expand Down Expand Up @@ -245,30 +248,69 @@ func (s *ingestedFlushable) constructRangeDelIter(
// TODO(sumeer): *IterOptions are being ignored, so the index block load for
// the point iterator in constructRangeDeIter is not tracked.
func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
return keyspanimpl.NewLevelIter(
liter := keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare,
s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
manifest.KeyTypePoint,
)
if !s.exciseSpan.Valid() {
return liter
}
// We have an excise span to weave into the rangedel iterators.
//
// TODO(bilal): should this be pooled?
miter := &keyspanimpl.MergingIter{}
rdel := keyspan.Span{
Start: s.exciseSpan.Start,
End: s.exciseSpan.End,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
}
rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
return miter
}

// newRangeKeyIter is part of the flushable interface.
func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
if !s.containsRangeKeys() {
return nil
var rkeydelIter keyspan.FragmentIterator
if s.exciseSpan.Valid() {
// We have an excise span to weave into the rangekey iterators.
rkeydel := keyspan.Span{
Start: s.exciseSpan.Start,
End: s.exciseSpan.End,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
}
rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
}

if !s.hasRangeKeys {
if rkeydelIter == nil {
// NB: we have to return the nil literal as opposed to the nil
// value of rkeydelIter, otherwise callers of this function will
// have the return value fail == nil checks.
return nil
}
return rkeydelIter
}

return keyspanimpl.NewLevelIter(
liter := keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
)
if rkeydelIter == nil {
return liter
}
// TODO(bilal): should this be pooled?
miter := &keyspanimpl.MergingIter{}
miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
return miter
}

// containsRangeKeys is part of the flushable interface.
func (s *ingestedFlushable) containsRangeKeys() bool {
return s.hasRangeKeys
return s.hasRangeKeys || s.exciseSpan.Valid()
}

// inuseBytes is part of the flushable interface.
Expand Down Expand Up @@ -329,6 +371,10 @@ func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
// checks above.
return true
}
if s.exciseSpan.Valid() {
uk := s.exciseSpan.UserKeyBounds()
return uk.Overlaps(s.comparer.Compare, &bounds)
}
return false
}

Expand Down
40 changes: 31 additions & 9 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -47,7 +48,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}
reset()

loadFileMeta := func(paths []string) []*fileMetadata {
loadFileMeta := func(paths []string, exciseSpan KeyRange, seqNum base.SeqNum) []*fileMetadata {
d.mu.Lock()
pendingOutputs := make([]base.FileNum, len(paths))
for i := range paths {
Expand All @@ -60,11 +61,17 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// not actually ingesting a file.
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
if err != nil {
panic(err)
t.Fatal(err)
}
meta := make([]*fileMetadata, len(lr.local))
if exciseSpan.Valid() {
seqNum++
}
for i := range meta {
meta[i] = lr.local[i].fileMetadata
if err := setSeqNumInMetadata(meta[i], seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
t.Fatal(err)
}
}
if len(meta) == 0 {
// All of the sstables to be ingested were empty. Nothing to do.
Expand All @@ -77,8 +84,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

// Verify the sstables do not overlap.
if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil {
panic("unsorted sstables")
if err := ingestSortAndVerify(d.cmp, lr, exciseSpan); err != nil {
t.Fatal(err)
}

// Hard link the sstables into the DB directory. Since the sstables aren't
Expand All @@ -87,20 +94,21 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil {
panic("couldn't hard link sstables")
t.Fatal(err)
}

// Fsync the directory we added the tables to. We need to do this at some
// point before we update the MANIFEST (via logAndApply), otherwise a crash
// can have the tables referenced in the MANIFEST, but not present in the
// directory.
if err := d.dataDir.Sync(); err != nil {
panic("Couldn't sync data directory")
t.Fatal(err)
}

return meta
}

var seqNum uint64
datadriven.RunTest(t, "testdata/ingested_flushable_api", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
Expand All @@ -114,12 +122,26 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
case "flushable":
// Creates an ingestedFlushable over the input files.
paths := make([]string, 0, len(td.CmdArgs))
var exciseSpan KeyRange
startSeqNum := base.SeqNum(seqNum)
for _, arg := range td.CmdArgs {
paths = append(paths, arg.String())
switch arg.Key {
case "excise":
parts := strings.Split(arg.Vals[0], "-")
if len(parts) != 2 {
return fmt.Sprintf("invalid excise range: %s", arg.Vals[0])
}
exciseSpan.Start = []byte(parts[0])
exciseSpan.End = []byte(parts[1])
seqNum++
default:
paths = append(paths, arg.String())
seqNum++
}
}

meta := loadFileMeta(paths)
flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{})
meta := loadFileMeta(paths, exciseSpan, startSeqNum)
flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, base.SeqNum(startSeqNum))
return ""
case "iter":
iter := flushable.newIter(nil)
Expand Down
15 changes: 13 additions & 2 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ const (
// fields in the Manifest and thus requires a format major version.
FormatSyntheticPrefixSuffix

// FormatFlushableIngestExcises is a format major version that adds support for
// having excises unconditionally being written as flushable ingestions. This
// is implemented through adding a new key kind that can go in the same batches
// as flushable ingested sstables.
FormatFlushableIngestExcises

// TODO(msbutler): add major version for synthetic suffixes

// -- Add new versions here --
Expand Down Expand Up @@ -222,7 +228,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
switch v {
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted:
return sstable.TableFormatPebblev3
case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix:
case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises:
return sstable.TableFormatPebblev4
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand All @@ -234,7 +241,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat {
switch v {
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted,
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix:
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises:
return sstable.TableFormatPebblev1
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand Down Expand Up @@ -271,6 +279,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatSyntheticPrefixSuffix: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatSyntheticPrefixSuffix)
},
FormatFlushableIngestExcises: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatFlushableIngestExcises)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
8 changes: 6 additions & 2 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
require.Equal(t, FormatDeleteSizedAndObsolete, FormatMajorVersion(15))
require.Equal(t, FormatVirtualSSTables, FormatMajorVersion(16))
require.Equal(t, FormatSyntheticPrefixSuffix, FormatMajorVersion(17))
require.Equal(t, FormatFlushableIngestExcises, FormatMajorVersion(18))

// When we add a new version, we should add a check for the new version in
// addition to updating these expected values.
require.Equal(t, FormatNewest, FormatMajorVersion(17))
require.Equal(t, internalFormatNewest, FormatMajorVersion(17))
require.Equal(t, FormatNewest, FormatMajorVersion(18))
require.Equal(t, internalFormatNewest, FormatMajorVersion(18))
}

func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
Expand All @@ -53,6 +54,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatVirtualSSTables, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatSyntheticPrefixSuffix))
require.Equal(t, FormatSyntheticPrefixSuffix, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatFlushableIngestExcises))
require.Equal(t, FormatFlushableIngestExcises, d.FormatMajorVersion())

require.NoError(t, d.Close())

Expand Down Expand Up @@ -203,6 +206,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatDeleteSizedAndObsolete: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatVirtualSSTables: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatSyntheticPrefixSuffix: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatFlushableIngestExcises: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
}

// Valid versions.
Expand Down
Loading

0 comments on commit d73ab80

Please sign in to comment.