Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: allow excises to unconditionally be flushable ingests #3897

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,12 @@ func (b *Batch) refreshMemTableSize() error {
}
// This key kind doesn't contribute to the memtable size.
continue
case InternalKeyKindExcise:
if b.minimumFormatMajorVersion < FormatFlushableIngestExcises {
b.minimumFormatMajorVersion = FormatFlushableIngestExcises
}
// This key kind doesn't contribute to the memtable size.
continue
default:
// Note In some circumstances this might be temporary memory
// corruption that can be recovered by discarding the batch and
Expand Down Expand Up @@ -608,7 +614,7 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
b.countRangeDels++
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.countRangeKeys++
case InternalKeyKindIngestSST:
case InternalKeyKindIngestSST, InternalKeyKindExcise:
panic("pebble: invalid key kind for batch")
case InternalKeyKindLogData:
// LogData does not contribute to memtable size.
Expand Down Expand Up @@ -1193,6 +1199,28 @@ func (b *Batch) ingestSST(fileNum base.FileNum) {
b.minimumFormatMajorVersion = FormatFlushableIngest
}

// Excise adds the excise span for a flushable ingest containing an excise. The data
// will only be written to the WAL (not added to memtables or sstables).
func (b *Batch) excise(start, end []byte) {
if b.Empty() {
b.ingestedSSTBatch = true
} else if !b.ingestedSSTBatch {
// Batch contains other key kinds.
panic("pebble: invalid call to excise")
}

origMemTableSize := b.memTableSize
b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise)
copy(b.deferredOp.Key, start)
copy(b.deferredOp.Value, end)
// Since excise writes only to the WAL and does not affect the memtable,
// we restore b.memTableSize to its original value. Note that Batch.count
// is not reset because for the InternalKeyKindIngestSST/Excise the count
// is the number of sstable paths which have been added to the batch.
b.memTableSize = origMemTableSize
b.minimumFormatMajorVersion = FormatFlushableIngestExcises
}

// Empty returns true if the batch is empty, and false otherwise.
func (b *Batch) Empty() bool {
return batchrepr.IsEmpty(b.data)
Expand Down
2 changes: 1 addition & 1 deletion batchrepr/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, o
switch kind {
case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete,
base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete,
base.InternalKeyKindDeleteSized:
base.InternalKeyKindDeleteSized, base.InternalKeyKindExcise:
*r, value, ok = DecodeStr(*r)
if !ok {
return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind)
Expand Down
11 changes: 7 additions & 4 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,6 @@ func (d *DB) waitTableStats() {

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
var sstContainsExciseTombstone bool
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
Expand All @@ -1345,14 +1344,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
case "contains-excise-tombstone":
sstContainsExciseTombstone = true
case "no-wait":
// Handled by callers.
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan); err != nil {
return err
}
return nil
Expand All @@ -1361,6 +1360,10 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths := make([]string, 0, len(td.CmdArgs))
for _, arg := range td.CmdArgs {
if arg.Key == "no-wait" {
// Handled by callers.
continue
}
paths = append(paths, arg.String())
}

Expand Down
58 changes: 52 additions & 6 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ type ingestedFlushable struct {
hasRangeKeys bool
// exciseSpan is populated if an excise operation should be performed during
// flush.
exciseSpan KeyRange
exciseSpan KeyRange
exciseSeqNum base.SeqNum
}

func newIngestedFlushable(
Expand All @@ -172,6 +173,7 @@ func newIngestedFlushable(
newIters tableNewIters,
newRangeKeyIters keyspanimpl.TableNewSpanIter,
exciseSpan KeyRange,
seqNum base.SeqNum,
) *ingestedFlushable {
if invariants.Enabled {
for i := 1; i < len(files); i++ {
Expand Down Expand Up @@ -200,6 +202,7 @@ func newIngestedFlushable(
slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
hasRangeKeys: hasRangeKeys,
exciseSpan: exciseSpan,
exciseSeqNum: seqNum,
}

return ret
Expand Down Expand Up @@ -245,30 +248,69 @@ func (s *ingestedFlushable) constructRangeDelIter(
// TODO(sumeer): *IterOptions are being ignored, so the index block load for
// the point iterator in constructRangeDeIter is not tracked.
func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
return keyspanimpl.NewLevelIter(
liter := keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare,
s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
manifest.KeyTypePoint,
)
if !s.exciseSpan.Valid() {
return liter
}
// We have an excise span to weave into the rangedel iterators.
//
// TODO(bilal): should this be pooled?
miter := &keyspanimpl.MergingIter{}
rdel := keyspan.Span{
Start: s.exciseSpan.Start,
End: s.exciseSpan.End,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
}
rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
return miter
}

// newRangeKeyIter is part of the flushable interface.
func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
if !s.containsRangeKeys() {
return nil
var rkeydelIter keyspan.FragmentIterator
if s.exciseSpan.Valid() {
// We have an excise span to weave into the rangekey iterators.
rkeydel := keyspan.Span{
Start: s.exciseSpan.Start,
End: s.exciseSpan.End,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
}
rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
}

if !s.hasRangeKeys {
if rkeydelIter == nil {
// NB: we have to return the nil literal as opposed to the nil
// value of rkeydelIter, otherwise callers of this function will
// have the return value fail == nil checks.
return nil
}
return rkeydelIter
}

return keyspanimpl.NewLevelIter(
liter := keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
)
if rkeydelIter == nil {
return liter
}
// TODO(bilal): should this be pooled?
miter := &keyspanimpl.MergingIter{}
miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
return miter
}

// containsRangeKeys is part of the flushable interface.
func (s *ingestedFlushable) containsRangeKeys() bool {
return s.hasRangeKeys
return s.hasRangeKeys || s.exciseSpan.Valid()
}

// inuseBytes is part of the flushable interface.
Expand Down Expand Up @@ -329,6 +371,10 @@ func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
// checks above.
return true
}
if s.exciseSpan.Valid() {
uk := s.exciseSpan.UserKeyBounds()
return uk.Overlaps(s.comparer.Compare, &bounds)
}
return false
}

Expand Down
40 changes: 31 additions & 9 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/datadriven"
Expand Down Expand Up @@ -47,7 +48,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}
reset()

loadFileMeta := func(paths []string) []*fileMetadata {
loadFileMeta := func(paths []string, exciseSpan KeyRange, seqNum base.SeqNum) []*fileMetadata {
d.mu.Lock()
pendingOutputs := make([]base.FileNum, len(paths))
for i := range paths {
Expand All @@ -60,11 +61,17 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// not actually ingesting a file.
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
if err != nil {
panic(err)
t.Fatal(err)
}
meta := make([]*fileMetadata, len(lr.local))
if exciseSpan.Valid() {
seqNum++
}
for i := range meta {
meta[i] = lr.local[i].fileMetadata
if err := setSeqNumInMetadata(meta[i], seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
t.Fatal(err)
}
}
if len(meta) == 0 {
// All of the sstables to be ingested were empty. Nothing to do.
Expand All @@ -77,8 +84,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

// Verify the sstables do not overlap.
if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil {
panic("unsorted sstables")
if err := ingestSortAndVerify(d.cmp, lr, exciseSpan); err != nil {
t.Fatal(err)
}

// Hard link the sstables into the DB directory. Since the sstables aren't
Expand All @@ -87,20 +94,21 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil {
panic("couldn't hard link sstables")
t.Fatal(err)
}

// Fsync the directory we added the tables to. We need to do this at some
// point before we update the MANIFEST (via logAndApply), otherwise a crash
// can have the tables referenced in the MANIFEST, but not present in the
// directory.
if err := d.dataDir.Sync(); err != nil {
panic("Couldn't sync data directory")
t.Fatal(err)
}

return meta
}

var seqNum uint64
datadriven.RunTest(t, "testdata/ingested_flushable_api", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "reset":
Expand All @@ -114,12 +122,26 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
case "flushable":
// Creates an ingestedFlushable over the input files.
paths := make([]string, 0, len(td.CmdArgs))
var exciseSpan KeyRange
startSeqNum := base.SeqNum(seqNum)
for _, arg := range td.CmdArgs {
paths = append(paths, arg.String())
switch arg.Key {
case "excise":
parts := strings.Split(arg.Vals[0], "-")
if len(parts) != 2 {
return fmt.Sprintf("invalid excise range: %s", arg.Vals[0])
}
exciseSpan.Start = []byte(parts[0])
exciseSpan.End = []byte(parts[1])
seqNum++
default:
paths = append(paths, arg.String())
seqNum++
}
}

meta := loadFileMeta(paths)
flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{})
meta := loadFileMeta(paths, exciseSpan, startSeqNum)
flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, base.SeqNum(startSeqNum))
return ""
case "iter":
iter := flushable.newIter(nil)
Expand Down
15 changes: 13 additions & 2 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ const (
// fields in the Manifest and thus requires a format major version.
FormatSyntheticPrefixSuffix

// FormatFlushableIngestExcises is a format major version that adds support for
// having excises unconditionally being written as flushable ingestions. This
// is implemented through adding a new key kind that can go in the same batches
// as flushable ingested sstables.
FormatFlushableIngestExcises

// TODO(msbutler): add major version for synthetic suffixes

// -- Add new versions here --
Expand Down Expand Up @@ -222,7 +228,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
switch v {
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted:
return sstable.TableFormatPebblev3
case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix:
case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises:
return sstable.TableFormatPebblev4
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand All @@ -234,7 +241,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat {
func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat {
switch v {
case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted,
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix:
FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix,
FormatFlushableIngestExcises:
return sstable.TableFormatPebblev1
default:
panic(fmt.Sprintf("pebble: unsupported format major version: %s", v))
Expand Down Expand Up @@ -271,6 +279,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{
FormatSyntheticPrefixSuffix: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatSyntheticPrefixSuffix)
},
FormatFlushableIngestExcises: func(d *DB) error {
return d.finalizeFormatVersUpgrade(FormatFlushableIngestExcises)
},
}

const formatVersionMarkerName = `format-version`
Expand Down
8 changes: 6 additions & 2 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) {
require.Equal(t, FormatDeleteSizedAndObsolete, FormatMajorVersion(15))
require.Equal(t, FormatVirtualSSTables, FormatMajorVersion(16))
require.Equal(t, FormatSyntheticPrefixSuffix, FormatMajorVersion(17))
require.Equal(t, FormatFlushableIngestExcises, FormatMajorVersion(18))

// When we add a new version, we should add a check for the new version in
// addition to updating these expected values.
require.Equal(t, FormatNewest, FormatMajorVersion(17))
require.Equal(t, internalFormatNewest, FormatMajorVersion(17))
require.Equal(t, FormatNewest, FormatMajorVersion(18))
require.Equal(t, internalFormatNewest, FormatMajorVersion(18))
}

func TestFormatMajorVersion_MigrationDefined(t *testing.T) {
Expand All @@ -53,6 +54,8 @@ func TestRatchetFormat(t *testing.T) {
require.Equal(t, FormatVirtualSSTables, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatSyntheticPrefixSuffix))
require.Equal(t, FormatSyntheticPrefixSuffix, d.FormatMajorVersion())
require.NoError(t, d.RatchetFormatMajorVersion(FormatFlushableIngestExcises))
require.Equal(t, FormatFlushableIngestExcises, d.FormatMajorVersion())

require.NoError(t, d.Close())

Expand Down Expand Up @@ -203,6 +206,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) {
FormatDeleteSizedAndObsolete: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatVirtualSSTables: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatSyntheticPrefixSuffix: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
FormatFlushableIngestExcises: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4},
}

// Valid versions.
Expand Down
Loading
Loading