From d73ab80fd7de09fe5da05a523944bd5b71c10059 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Tue, 20 Aug 2024 10:40:36 -0400 Subject: [PATCH] db: allow excises to unconditionally be flushable ingests Previously, we'd only allow IngestAndExcise to become flushable ingests if we had a guarantee from the caller that the ingestion contained rangedel/rangekeydel overlapping the excise span. This change reworks flushable ingests to return rangedels/rangekeydels from their own iterators, even if one does not exist in the sstables themselves. This, plus the update to the WAL to be able to persist the excise span, allows for IngestAndExcise to allow flushable ingests in more cases than before. Also resolves TODOs in replayWAL() around properly sequencing flushable ingests so their sstables can go in levels lower than L0. This sequencing was necessary to make excises work correctly; as excises require an up-to-date view of the version to work correctly. Fixes #2676. --- batch.go | 30 +++++++++- batchrepr/reader.go | 2 +- data_test.go | 11 ++-- flushable.go | 58 +++++++++++++++++-- flushable_test.go | 40 +++++++++++--- format_major_version.go | 15 ++++- format_major_version_test.go | 8 ++- ingest.go | 52 +++++++++++------ ingest_test.go | 41 +++++++++++--- internal.go | 1 + internal/base/internal.go | 22 +++++++- internal/base/internal_test.go | 2 +- mem_table.go | 4 +- metamorphic/generator.go | 11 ++-- metamorphic/ops.go | 24 +++----- metamorphic/parser.go | 2 +- open.go | 24 ++++++-- open_test.go | 2 +- testdata/checkpoint | 50 +++++++++-------- testdata/checkpoint_shared | 28 ++++++---- testdata/concurrent_excise | 48 ++++++++++++++-- testdata/determinism | 4 +- testdata/event_listener | 11 +++- testdata/excise | 98 ++++++++++++++++++++++++++++----- testdata/flushable_ingest | 14 ++--- testdata/ingested_flushable_api | 70 +++++++++++++++++++++-- tool/db.go | 2 +- tool/wal.go | 2 + 28 files changed, 515 insertions(+), 161 deletions(-) diff --git a/batch.go b/batch.go index 556874e2eb..1c95d4ef97 100644 --- a/batch.go +++ b/batch.go @@ -552,6 +552,12 @@ func (b *Batch) refreshMemTableSize() error { } // This key kind doesn't contribute to the memtable size. continue + case InternalKeyKindExcise: + if b.minimumFormatMajorVersion < FormatFlushableIngestExcises { + b.minimumFormatMajorVersion = FormatFlushableIngestExcises + } + // This key kind doesn't contribute to the memtable size. + continue default: // Note In some circumstances this might be temporary memory // corruption that can be recovered by discarding the batch and @@ -608,7 +614,7 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error { b.countRangeDels++ case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: b.countRangeKeys++ - case InternalKeyKindIngestSST: + case InternalKeyKindIngestSST, InternalKeyKindExcise: panic("pebble: invalid key kind for batch") case InternalKeyKindLogData: // LogData does not contribute to memtable size. @@ -1193,6 +1199,28 @@ func (b *Batch) ingestSST(fileNum base.FileNum) { b.minimumFormatMajorVersion = FormatFlushableIngest } +// Excise adds the excise span for a flushable ingest containing an excise. The data +// will only be written to the WAL (not added to memtables or sstables). +func (b *Batch) excise(start, end []byte) { + if b.Empty() { + b.ingestedSSTBatch = true + } else if !b.ingestedSSTBatch { + // Batch contains other key kinds. + panic("pebble: invalid call to excise") + } + + origMemTableSize := b.memTableSize + b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise) + copy(b.deferredOp.Key, start) + copy(b.deferredOp.Value, end) + // Since excise writes only to the WAL and does not affect the memtable, + // we restore b.memTableSize to its original value. Note that Batch.count + // is not reset because for the InternalKeyKindIngestSST/Excise the count + // is the number of sstable paths which have been added to the batch. + b.memTableSize = origMemTableSize + b.minimumFormatMajorVersion = FormatFlushableIngestExcises +} + // Empty returns true if the batch is empty, and false otherwise. func (b *Batch) Empty() bool { return batchrepr.IsEmpty(b.data) diff --git a/batchrepr/reader.go b/batchrepr/reader.go index 652e94b8b5..cceb2be397 100644 --- a/batchrepr/reader.go +++ b/batchrepr/reader.go @@ -97,7 +97,7 @@ func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, o switch kind { case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete, - base.InternalKeyKindDeleteSized: + base.InternalKeyKindDeleteSized, base.InternalKeyKindExcise: *r, value, ok = DecodeStr(*r) if !ok { return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) diff --git a/data_test.go b/data_test.go index 3679a39dea..453a0e6948 100644 --- a/data_test.go +++ b/data_test.go @@ -1331,7 +1331,6 @@ func (d *DB) waitTableStats() { func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { var exciseSpan KeyRange - var sstContainsExciseTombstone bool paths := make([]string, 0, len(td.CmdArgs)) for i, arg := range td.CmdArgs { switch td.CmdArgs[i].Key { @@ -1345,14 +1344,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } exciseSpan.Start = []byte(fields[0]) exciseSpan.End = []byte(fields[1]) - case "contains-excise-tombstone": - sstContainsExciseTombstone = true + case "no-wait": + // Handled by callers. default: paths = append(paths, arg.String()) } } - if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil { + if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan); err != nil { return err } return nil @@ -1361,6 +1360,10 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { paths := make([]string, 0, len(td.CmdArgs)) for _, arg := range td.CmdArgs { + if arg.Key == "no-wait" { + // Handled by callers. + continue + } paths = append(paths, arg.String()) } diff --git a/flushable.go b/flushable.go index c014bb9b11..c1ddb639fd 100644 --- a/flushable.go +++ b/flushable.go @@ -163,7 +163,8 @@ type ingestedFlushable struct { hasRangeKeys bool // exciseSpan is populated if an excise operation should be performed during // flush. - exciseSpan KeyRange + exciseSpan KeyRange + exciseSeqNum base.SeqNum } func newIngestedFlushable( @@ -172,6 +173,7 @@ func newIngestedFlushable( newIters tableNewIters, newRangeKeyIters keyspanimpl.TableNewSpanIter, exciseSpan KeyRange, + seqNum base.SeqNum, ) *ingestedFlushable { if invariants.Enabled { for i := 1; i < len(files); i++ { @@ -200,6 +202,7 @@ func newIngestedFlushable( slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files), hasRangeKeys: hasRangeKeys, exciseSpan: exciseSpan, + exciseSeqNum: seqNum, } return ret @@ -245,30 +248,69 @@ func (s *ingestedFlushable) constructRangeDelIter( // TODO(sumeer): *IterOptions are being ignored, so the index block load for // the point iterator in constructRangeDeIter is not tracked. func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { - return keyspanimpl.NewLevelIter( + liter := keyspanimpl.NewLevelIter( context.TODO(), keyspan.SpanIterOptions{}, s.comparer.Compare, s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypePoint, ) + if !s.exciseSpan.Valid() { + return liter + } + // We have an excise span to weave into the rangedel iterators. + // + // TODO(bilal): should this be pooled? + miter := &keyspanimpl.MergingIter{} + rdel := keyspan.Span{ + Start: s.exciseSpan.Start, + End: s.exciseSpan.End, + Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}}, + } + rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel}) + miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter) + return miter } // newRangeKeyIter is part of the flushable interface. func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { - if !s.containsRangeKeys() { - return nil + var rkeydelIter keyspan.FragmentIterator + if s.exciseSpan.Valid() { + // We have an excise span to weave into the rangekey iterators. + rkeydel := keyspan.Span{ + Start: s.exciseSpan.Start, + End: s.exciseSpan.End, + Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}}, + } + rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel}) + } + + if !s.hasRangeKeys { + if rkeydelIter == nil { + // NB: we have to return the nil literal as opposed to the nil + // value of rkeydelIter, otherwise callers of this function will + // have the return value fail == nil checks. + return nil + } + return rkeydelIter } - return keyspanimpl.NewLevelIter( + liter := keyspanimpl.NewLevelIter( context.TODO(), keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters, s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange, ) + if rkeydelIter == nil { + return liter + } + // TODO(bilal): should this be pooled? + miter := &keyspanimpl.MergingIter{} + miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter) + return miter } // containsRangeKeys is part of the flushable interface. func (s *ingestedFlushable) containsRangeKeys() bool { - return s.hasRangeKeys + return s.hasRangeKeys || s.exciseSpan.Valid() } // inuseBytes is part of the flushable interface. @@ -329,6 +371,10 @@ func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool { // checks above. return true } + if s.exciseSpan.Valid() { + uk := s.exciseSpan.UserKeyBounds() + return uk.Overlaps(s.comparer.Compare, &bounds) + } return false } diff --git a/flushable_test.go b/flushable_test.go index f8ee20580b..3be5fd90ba 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strings" "testing" "github.com/cockroachdb/datadriven" @@ -47,7 +48,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } reset() - loadFileMeta := func(paths []string) []*fileMetadata { + loadFileMeta := func(paths []string, exciseSpan KeyRange, seqNum base.SeqNum) []*fileMetadata { d.mu.Lock() pendingOutputs := make([]base.FileNum, len(paths)) for i := range paths { @@ -60,11 +61,17 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // not actually ingesting a file. lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs) if err != nil { - panic(err) + t.Fatal(err) } meta := make([]*fileMetadata, len(lr.local)) + if exciseSpan.Valid() { + seqNum++ + } for i := range meta { meta[i] = lr.local[i].fileMetadata + if err := setSeqNumInMetadata(meta[i], seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + t.Fatal(err) + } } if len(meta) == 0 { // All of the sstables to be ingested were empty. Nothing to do. @@ -77,8 +84,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil { - panic("unsorted sstables") + if err := ingestSortAndVerify(d.cmp, lr, exciseSpan); err != nil { + t.Fatal(err) } // Hard link the sstables into the DB directory. Since the sstables aren't @@ -87,7 +94,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // fall back to copying, and if that fails we undo our work and return an // error. if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil { - panic("couldn't hard link sstables") + t.Fatal(err) } // Fsync the directory we added the tables to. We need to do this at some @@ -95,12 +102,13 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // can have the tables referenced in the MANIFEST, but not present in the // directory. if err := d.dataDir.Sync(); err != nil { - panic("Couldn't sync data directory") + t.Fatal(err) } return meta } + var seqNum uint64 datadriven.RunTest(t, "testdata/ingested_flushable_api", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { case "reset": @@ -114,12 +122,26 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { case "flushable": // Creates an ingestedFlushable over the input files. paths := make([]string, 0, len(td.CmdArgs)) + var exciseSpan KeyRange + startSeqNum := base.SeqNum(seqNum) for _, arg := range td.CmdArgs { - paths = append(paths, arg.String()) + switch arg.Key { + case "excise": + parts := strings.Split(arg.Vals[0], "-") + if len(parts) != 2 { + return fmt.Sprintf("invalid excise range: %s", arg.Vals[0]) + } + exciseSpan.Start = []byte(parts[0]) + exciseSpan.End = []byte(parts[1]) + seqNum++ + default: + paths = append(paths, arg.String()) + seqNum++ + } } - meta := loadFileMeta(paths) - flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{}) + meta := loadFileMeta(paths, exciseSpan, startSeqNum) + flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, base.SeqNum(startSeqNum)) return "" case "iter": iter := flushable.newIter(nil) diff --git a/format_major_version.go b/format_major_version.go index 90ef9f0b6a..dd62f32ff4 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -185,6 +185,12 @@ const ( // fields in the Manifest and thus requires a format major version. FormatSyntheticPrefixSuffix + // FormatFlushableIngestExcises is a format major version that adds support for + // having excises unconditionally being written as flushable ingestions. This + // is implemented through adding a new key kind that can go in the same batches + // as flushable ingested sstables. + FormatFlushableIngestExcises + // TODO(msbutler): add major version for synthetic suffixes // -- Add new versions here -- @@ -222,7 +228,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { switch v { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted: return sstable.TableFormatPebblev3 - case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix: + case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, + FormatFlushableIngestExcises: return sstable.TableFormatPebblev4 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -234,7 +241,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat { switch v { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted, - FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix: + FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, + FormatFlushableIngestExcises: return sstable.TableFormatPebblev1 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -271,6 +279,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatSyntheticPrefixSuffix: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatSyntheticPrefixSuffix) }, + FormatFlushableIngestExcises: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatFlushableIngestExcises) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index 3cf01976c4..364f36197c 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -24,11 +24,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) { require.Equal(t, FormatDeleteSizedAndObsolete, FormatMajorVersion(15)) require.Equal(t, FormatVirtualSSTables, FormatMajorVersion(16)) require.Equal(t, FormatSyntheticPrefixSuffix, FormatMajorVersion(17)) + require.Equal(t, FormatFlushableIngestExcises, FormatMajorVersion(18)) // When we add a new version, we should add a check for the new version in // addition to updating these expected values. - require.Equal(t, FormatNewest, FormatMajorVersion(17)) - require.Equal(t, internalFormatNewest, FormatMajorVersion(17)) + require.Equal(t, FormatNewest, FormatMajorVersion(18)) + require.Equal(t, internalFormatNewest, FormatMajorVersion(18)) } func TestFormatMajorVersion_MigrationDefined(t *testing.T) { @@ -53,6 +54,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatVirtualSSTables, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatSyntheticPrefixSuffix)) require.Equal(t, FormatSyntheticPrefixSuffix, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatFlushableIngestExcises)) + require.Equal(t, FormatFlushableIngestExcises, d.FormatMajorVersion()) require.NoError(t, d.Close()) @@ -203,6 +206,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatDeleteSizedAndObsolete: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, FormatVirtualSSTables: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, FormatSyntheticPrefixSuffix: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, + FormatFlushableIngestExcises: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, } // Valid versions. diff --git a/ingest.go b/ingest.go index f43c1a8e53..0fd2088698 100644 --- a/ingest.go +++ b/ingest.go @@ -141,7 +141,7 @@ func ingestSynthesizeShared( // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a // correct bound, we just use the maximum key kind (which sorts first). // Similarly, we use the smallest key kind for the largest key. - smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax) + smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSSTable) largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0) if sm.LargestPointKey.IsExclusiveSentinel() { largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey) @@ -217,12 +217,12 @@ func ingestLoad1External( if e.EndKeyIsInclusive { meta.ExtendPointKeyBounds( opts.Comparer.Compare, - base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax), + base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable), base.MakeInternalKey(largestCopy, 0, 0)) } else { meta.ExtendPointKeyBounds( opts.Comparer.Compare, - base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax), + base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable), base.MakeRangeDeleteSentinelKey(largestCopy)) } } @@ -1038,7 +1038,7 @@ func (d *DB) Ingest(ctx context.Context, paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */) + _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, nil /* external */) return err } @@ -1126,7 +1126,7 @@ func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperati if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(ctx, paths, nil, KeyRange{}, false, nil) + return d.ingest(ctx, paths, nil, KeyRange{}, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally @@ -1146,7 +1146,7 @@ func (d *DB) IngestExternalFiles( if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(ctx, nil, nil, KeyRange{}, false, external) + return d.ingest(ctx, nil, nil, KeyRange{}, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1165,7 +1165,6 @@ func (d *DB) IngestAndExcise( shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange, - sstsContainExciseTombstone bool, ) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) @@ -1188,13 +1187,24 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external) + return d.ingest(ctx, paths, shared, exciseSpan, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { + // If there's an excise being done atomically with the same ingest, we + // assign the lowest sequence number in the set of sequence numbers for this + // ingestion to the excise. Note that we've already allocated fileCount+1 + // sequence numbers in this case. + // + // This mimics the behaviour in the non-flushable ingest case (see the callsite + // for ingestUpdateSeqNum). + fileSeqNumStart := seqNum + if exciseSpan.Valid() { + fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise. + } // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the // version edit is applied is the mechanism that persists the @@ -1204,12 +1214,12 @@ func (d *DB) newIngestedFlushableEntry( // time, then we'll lose the ingest sequence number information. But this // information will also be reconstructed on node restart. for i, m := range meta { - if err := setSeqNumInMetadata(m, seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { return nil, err } } - f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan) + f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum) // NB: The logNum/seqNum are the WAL number which we're writing this entry // to and the sequence number within the WAL which we'll write this entry @@ -1243,6 +1253,9 @@ func (d *DB) handleIngestAsFlushable( meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange, ) error { b := d.NewBatch() + if exciseSpan.Valid() { + b.excise(exciseSpan.Start, exciseSpan.End) + } for _, m := range meta { b.ingestSST(m.FileNum) } @@ -1272,6 +1285,11 @@ func (d *DB) handleIngestAsFlushable( d.mu.Lock() } + // The excise span is going to outlive this ingestion call. Copy it. + exciseSpan = KeyRange{ + Start: slices.Clone(exciseSpan.Start), + End: slices.Clone(exciseSpan.End), + } entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) if err != nil { return err @@ -1314,7 +1332,6 @@ func (d *DB) ingest( paths []string, shared []SharedSSTMeta, exciseSpan KeyRange, - sstsContainExciseTombstone bool, external []ExternalFile, ) (IngestOperationStats, error) { if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil { @@ -1524,17 +1541,16 @@ func (d *DB) ingest( hasRemoteFiles := len(shared) > 0 || len(external) > 0 canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest && (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) && - !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles + !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles && + (!exciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises) - if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) { + if !canIngestFlushable { // We're not able to ingest as a flushable, // so we must synchronously flush. // - // TODO(bilal): Currently, if any of the files being ingested are shared or - // there's an excise span present, we cannot use flushable ingests and need - // to wait synchronously. Either remove this caveat by fleshing out - // flushable ingest logic to also account for these cases, or remove this - // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676 + // TODO(bilal): Currently, if any of the files being ingested are shared, + // we cannot use flushable ingests and need + // to wait synchronously. if mem.flushable == d.mu.mem.mutable { err = d.makeRoomForWrite(nil) } diff --git a/ingest_test.go b/ingest_test.go index de39271742..b80112a415 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -431,7 +431,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { ) cache := NewCache(0) defer func() { - if !closed { + if d != nil && !closed { require.NoError(t, d.Close()) } cache.Unref() @@ -443,6 +443,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { reset := func(strictMem bool) { if d != nil && !closed { require.NoError(t, d.Close()) + d = nil } blockFlush = false @@ -710,9 +711,19 @@ func TestExcise(t *testing.T) { case "ingest": flushed = false + noWait := false + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "no-wait": + noWait = true + } + } if err := runIngestCmd(td, d, mem); err != nil { return err.Error() } + if noWait { + return "" + } // Wait for a possible flush. d.mu.Lock() for d.mu.compact.flushing { @@ -726,12 +737,26 @@ func TestExcise(t *testing.T) { case "ingest-and-excise": flushed = false - d.mu.Lock() - prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount - d.mu.Unlock() + noWait := false + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "no-wait": + noWait = true + } + } + var prevFlushableIngests uint64 + if !noWait { + d.mu.Lock() + prevFlushableIngests = d.mu.versions.metrics.Flush.AsIngestCount + d.mu.Unlock() + } + if err := runIngestAndExciseCmd(td, d, mem); err != nil { return err.Error() } + if noWait { + return "" + } // Wait for a possible flush. d.mu.Lock() for d.mu.compact.flushing { @@ -1127,7 +1152,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1358,7 +1383,7 @@ func TestSimpleIngestShared(t *testing.T) { } _, err = d.IngestAndExcise( context.Background(), []string{}, []SharedSSTMeta{sharedSSTMeta}, nil, /* external */ - KeyRange{Start: []byte("d"), End: []byte("ee")}, false) + KeyRange{Start: []byte("d"), End: []byte("ee")}) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1628,7 +1653,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -2063,7 +2088,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) diff --git a/internal.go b/internal.go index 3e32434717..d6997c6bce 100644 --- a/internal.go +++ b/internal.go @@ -29,6 +29,7 @@ const ( InternalKeyKindRangeKeyMax = base.InternalKeyKindRangeKeyMax InternalKeyKindIngestSST = base.InternalKeyKindIngestSST InternalKeyKindDeleteSized = base.InternalKeyKindDeleteSized + InternalKeyKindExcise = base.InternalKeyKindExcise InternalKeyKindInvalid = base.InternalKeyKindInvalid ) diff --git a/internal/base/internal.go b/internal/base/internal.go index baf47a6395..0fc719f4b5 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -126,8 +126,8 @@ const ( // InternalKeyKindIngestSST is used to distinguish a batch that corresponds to // the WAL entry for ingested sstables that are added to the flushable - // queue. This InternalKeyKind cannot appear, amongst other key kinds in a - // batch, or in an sstable. + // queue. This InternalKeyKind cannot appear amongst other key kinds in a + // batch (with the exception of alongside InternalKeyKindExcise), or in an sstable. InternalKeyKindIngestSST InternalKeyKind = 22 // InternalKeyKindDeleteSized keys behave identically to @@ -137,6 +137,14 @@ const ( // heuristics, but is not required to be accurate for correctness. InternalKeyKindDeleteSized InternalKeyKind = 23 + // InternalKeyKindExcise is used to persist the Excise part of an IngestAndExcise + // to a WAL. An Excise is similar to a RangeDel+RangeKeyDel combined, in that it + // deletes all point and range keys in a given key range while also immediately + // truncating sstables to exclude this key span. This InternalKeyKind cannot + // appear amongst other key kinds in a batch (with the exception of alongside + // InternalKeyKindIngestSST), or in an sstable. + InternalKeyKindExcise InternalKeyKind = 24 + // This maximum value isn't part of the file format. Future extensions may // increase this value. // @@ -146,7 +154,13 @@ const ( // which sorts 'less than or equal to' any other valid internalKeyKind, when // searching for any kind of internal key formed by a certain user key and // seqNum. - InternalKeyKindMax InternalKeyKind = 23 + InternalKeyKindMax InternalKeyKind = 24 + + // InternalKeyKindMaxForSSTable is the largest valid key kind that can exist + // in an SSTable. This should usually equal InternalKeyKindMax, except + // if the current InternalKeyKindMax is a kind that is never added to an + // SSTable or memtable (eg. InternalKeyKindExcise). + InternalKeyKindMaxForSSTable InternalKeyKind = InternalKeyKindDeleteSized // Internal to the sstable format. Not exposed by any sstable iterator. // Declared here to prevent definition of valid key kinds that set this bit. @@ -191,6 +205,7 @@ var internalKeyKindNames = []string{ InternalKeyKindRangeKeyDelete: "RANGEKEYDEL", InternalKeyKindIngestSST: "INGESTSST", InternalKeyKindDeleteSized: "DELSIZED", + InternalKeyKindExcise: "EXCISE", InternalKeyKindInvalid: "INVALID", } @@ -290,6 +305,7 @@ var kindsMap = map[string]InternalKeyKind{ "RANGEKEYDEL": InternalKeyKindRangeKeyDelete, "INGESTSST": InternalKeyKindIngestSST, "DELSIZED": InternalKeyKindDeleteSized, + "EXCISE": InternalKeyKindExcise, } // ParseSeqNum parses the string representation of a sequence number. diff --git a/internal/base/internal_test.go b/internal/base/internal_test.go index d06b789741..626a7ec86f 100644 --- a/internal/base/internal_test.go +++ b/internal/base/internal_test.go @@ -41,7 +41,7 @@ func TestInvalidInternalKey(t *testing.T) { "\x01\x02\x03\x04\x05\x06\x07", "foo", "foo\x08\x07\x06\x05\x04\x03\x02", - "foo\x18\x07\x06\x05\x04\x03\x02\x01", + "foo\x19\x07\x06\x05\x04\x03\x02\x01", } for _, tc := range testCases { k := DecodeInternalKey([]byte(tc)) diff --git a/mem_table.go b/mem_table.go index d2a08421ca..277a83537d 100644 --- a/mem_table.go +++ b/mem_table.go @@ -230,8 +230,8 @@ func (m *memTable) apply(batch *Batch, seqNum base.SeqNum) error { // Don't increment seqNum for LogData, since these are not applied // to the memtable. seqNum-- - case InternalKeyKindIngestSST: - panic("pebble: cannot apply ingested sstable key kind to memtable") + case InternalKeyKindIngestSST, InternalKeyKindExcise: + panic("pebble: cannot apply ingested sstable or excise kind keys to memtable") default: err = ins.Add(&m.skl, ikey, value) } diff --git a/metamorphic/generator.go b/metamorphic/generator.go index a582935622..cb40aab21f 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -1300,12 +1300,11 @@ func (g *generator) writerIngestAndExcise() { } g.add(&ingestAndExciseOp{ - dbID: dbID, - batchID: batchID, - derivedDBID: derivedDBID, - exciseStart: start, - exciseEnd: end, - sstContainsExciseTombstone: g.rng.Intn(2) == 0, + dbID: dbID, + batchID: batchID, + derivedDBID: derivedDBID, + exciseStart: start, + exciseEnd: end, }) } diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 1033738945..7c5ff02f02 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -879,11 +879,10 @@ func (o *ingestOp) keys() []*[]byte { return nil } func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil } type ingestAndExciseOp struct { - dbID objID - batchID objID - derivedDBID objID - exciseStart, exciseEnd []byte - sstContainsExciseTombstone bool + dbID objID + batchID objID + derivedDBID objID + exciseStart, exciseEnd []byte } func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { @@ -899,13 +898,6 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { return } - if o.sstContainsExciseTombstone { - // Add a rangedel and rangekeydel to the batch. This ensures it'll end up - // inside the sstable. Note that all entries in the sstable will have the - // same sequence number, so the ordering within the batch doesn't matter. - err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) - err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) - } path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */) if err2 != nil { h.Recordf("Build(%s) // %v", o.batchID, err2) @@ -923,7 +915,7 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { _, err := db.IngestAndExcise(context.Background(), []string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, - }, o.sstContainsExciseTombstone) + }) return err })) } else { @@ -956,7 +948,7 @@ func (o *ingestAndExciseOp) syncObjs() objIDSlice { } func (o *ingestAndExciseOp) String() string { - return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q, %t /* sstContainsExciseTombstone */)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd, o.sstContainsExciseTombstone) + return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd) } func (o *ingestAndExciseOp) keys() []*[]byte { @@ -1976,7 +1968,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } @@ -2039,7 +2031,7 @@ func (r *replicateOp) runExternalReplicate( return } - _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } diff --git a/metamorphic/parser.go b/metamorphic/parser.go index 833c3b4534..46924a7787 100644 --- a/metamorphic/parser.go +++ b/metamorphic/parser.go @@ -76,7 +76,7 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { case *ingestOp: return &t.dbID, nil, []interface{}{&t.batchIDs} case *ingestAndExciseOp: - return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd, &t.sstContainsExciseTombstone} + return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd} case *ingestExternalFilesOp: return &t.dbID, nil, []interface{}{&t.objs} case *initOp: diff --git a/open.go b/open.go index 3670ceaaae..f274b4d5fd 100644 --- a/open.go +++ b/open.go @@ -769,6 +769,7 @@ func (d *DB) replayIngestedFlushable( seqNum := b.SeqNum() fileNums := make([]base.DiskFileNum, 0, b.Count()) + var exciseSpan KeyRange addFileNum := func(encodedFileNum []byte) { fileNum, n := binary.Uvarint(encodedFileNum) if n <= 0 { @@ -778,17 +779,25 @@ func (d *DB) replayIngestedFlushable( } for i := 0; i < int(b.Count()); i++ { - kind, encodedKey, _, ok, err := br.Next() + kind, key, val, ok, err := br.Next() if err != nil { return nil, err } - if kind != InternalKeyKindIngestSST { + if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise { panic("pebble: invalid batch key kind") } if !ok { panic("pebble: invalid batch count") } - addFileNum(encodedKey) + if kind == base.InternalKeyKindExcise { + if exciseSpan.Valid() { + panic("pebble: multiple excise spans in a single batch") + } + exciseSpan.Start = slices.Clone(key) + exciseSpan.End = slices.Clone(val) + continue + } + addFileNum(key) } if _, _, _, ok, err := br.Next(); err != nil { @@ -811,11 +820,14 @@ func (d *DB) replayIngestedFlushable( } numFiles := len(meta) + if exciseSpan.Valid() { + numFiles++ + } if uint32(numFiles) != b.Count() { panic("pebble: couldn't load all files in WAL entry") } - return d.newIngestedFlushableEntry(meta, seqNum, logNum, KeyRange{}) + return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) } // replayWAL replays the edits in the specified WAL. If the DB is in read @@ -947,8 +959,8 @@ func (d *DB) replayWAL( br := b.Reader() if kind, _, _, ok, err := br.Next(); err != nil { return nil, 0, err - } else if ok && kind == InternalKeyKindIngestSST { - // We're in the flushable ingests case. + } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) { + // We're in the flushable ingests (+ possibly excises) case. // // Ingests require an up-to-date view of the LSM to determine the target // level of ingested sstables, and to accurately compute excises. Instead of diff --git a/open_test.go b/open_test.go index fbee9af41e..8a9510b354 100644 --- a/open_test.go +++ b/open_test.go @@ -331,7 +331,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000004.017", + "marker.format-version.000005.018", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/testdata/checkpoint b/testdata/checkpoint index d69dba834c..ef8088a3cc 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -35,6 +35,10 @@ create: db/marker.format-version.000004.017 close: db/marker.format-version.000004.017 remove: db/marker.format-version.000003.016 sync: db +create: db/marker.format-version.000005.018 +close: db/marker.format-version.000005.018 +remove: db/marker.format-version.000004.017 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -97,9 +101,9 @@ close: . open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.017 -close: checkpoints/checkpoint1/marker.format-version.000001.017 +create: checkpoints/checkpoint1/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.018 +close: checkpoints/checkpoint1/marker.format-version.000001.018 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 link: db/000005.sst -> checkpoints/checkpoint1/000005.sst @@ -137,9 +141,9 @@ close: checkpoints open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.017 -close: checkpoints/checkpoint2/marker.format-version.000001.017 +create: checkpoints/checkpoint2/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.018 +close: checkpoints/checkpoint2/marker.format-version.000001.018 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 link: db/000007.sst -> checkpoints/checkpoint2/000007.sst @@ -172,9 +176,9 @@ close: checkpoints open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.017 -close: checkpoints/checkpoint3/marker.format-version.000001.017 +create: checkpoints/checkpoint3/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.018 +close: checkpoints/checkpoint3/marker.format-version.000001.018 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 link: db/000005.sst -> checkpoints/checkpoint3/000005.sst @@ -258,7 +262,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -268,7 +272,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly @@ -335,7 +339,7 @@ list checkpoints/checkpoint2 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint2 readonly @@ -377,7 +381,7 @@ list checkpoints/checkpoint3 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint3 readonly @@ -491,9 +495,9 @@ close: checkpoints open-dir: checkpoints/checkpoint4 link: db/OPTIONS-000003 -> checkpoints/checkpoint4/OPTIONS-000003 open-dir: checkpoints/checkpoint4 -create: checkpoints/checkpoint4/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint4/marker.format-version.000001.017 -close: checkpoints/checkpoint4/marker.format-version.000001.017 +create: checkpoints/checkpoint4/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint4/marker.format-version.000001.018 +close: checkpoints/checkpoint4/marker.format-version.000001.018 sync: checkpoints/checkpoint4 close: checkpoints/checkpoint4 link: db/000010.sst -> checkpoints/checkpoint4/000010.sst @@ -581,7 +585,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 @@ -596,9 +600,9 @@ close: checkpoints open-dir: checkpoints/checkpoint5 link: db/OPTIONS-000003 -> checkpoints/checkpoint5/OPTIONS-000003 open-dir: checkpoints/checkpoint5 -create: checkpoints/checkpoint5/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint5/marker.format-version.000001.017 -close: checkpoints/checkpoint5/marker.format-version.000001.017 +create: checkpoints/checkpoint5/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint5/marker.format-version.000001.018 +close: checkpoints/checkpoint5/marker.format-version.000001.018 sync: checkpoints/checkpoint5 close: checkpoints/checkpoint5 link: db/000010.sst -> checkpoints/checkpoint5/000010.sst @@ -694,9 +698,9 @@ close: checkpoints open-dir: checkpoints/checkpoint6 link: db/OPTIONS-000003 -> checkpoints/checkpoint6/OPTIONS-000003 open-dir: checkpoints/checkpoint6 -create: checkpoints/checkpoint6/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint6/marker.format-version.000001.017 -close: checkpoints/checkpoint6/marker.format-version.000001.017 +create: checkpoints/checkpoint6/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint6/marker.format-version.000001.018 +close: checkpoints/checkpoint6/marker.format-version.000001.018 sync: checkpoints/checkpoint6 close: checkpoints/checkpoint6 link: db/000011.sst -> checkpoints/checkpoint6/000011.sst diff --git a/testdata/checkpoint_shared b/testdata/checkpoint_shared index af0a597441..157e050a37 100644 --- a/testdata/checkpoint_shared +++ b/testdata/checkpoint_shared @@ -23,6 +23,10 @@ sync: db create: db/marker.format-version.000001.017 close: db/marker.format-version.000001.017 sync: db +create: db/marker.format-version.000002.018 +close: db/marker.format-version.000002.018 +remove: db/marker.format-version.000001.017 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -85,9 +89,9 @@ close: . open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.017 -close: checkpoints/checkpoint1/marker.format-version.000001.017 +create: checkpoints/checkpoint1/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.018 +close: checkpoints/checkpoint1/marker.format-version.000001.018 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -134,9 +138,9 @@ close: checkpoints open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.017 -close: checkpoints/checkpoint2/marker.format-version.000001.017 +create: checkpoints/checkpoint2/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.018 +close: checkpoints/checkpoint2/marker.format-version.000001.018 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -179,9 +183,9 @@ close: checkpoints open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.017 -close: checkpoints/checkpoint3/marker.format-version.000001.017 +create: checkpoints/checkpoint3/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.018 +close: checkpoints/checkpoint3/marker.format-version.000001.018 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -238,7 +242,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000002.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -248,7 +252,7 @@ list checkpoints/checkpoint1 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -300,7 +304,7 @@ list checkpoints/checkpoint2 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 diff --git a/testdata/concurrent_excise b/testdata/concurrent_excise index 270aaae5c8..281767fe90 100644 --- a/testdata/concurrent_excise +++ b/testdata/concurrent_excise @@ -130,7 +130,7 @@ switch 1 ---- ok -# The below file-only snapshot should be errored out by the concurrent excise. +# The below file-only snapshot should not error out, despite a concurrent excise. batch set d something @@ -237,7 +237,7 @@ L6: build ext5 set bb something set b something -del b-c +del-range b c ---- lsm @@ -257,6 +257,19 @@ batch set bb new ---- +iter +first +next +next +next +next +---- +a: (new, .) +bb: (new, .) +d: (new, .) +e: (new, .) +. + lsm ---- L0.0: @@ -266,9 +279,19 @@ L6: 000005:[a#10,SET-a#10,SET] 000006:[e#11,SET-e#11,SET] -ingest-and-excise ext5 excise=b-c contains-excise-tombstone +ingest-and-excise ext5 excise=b-c +---- + +iter +first +next +next +next ---- -flushable ingest +a: (new, .) +b: (something, .) +bb: (something, .) +d: (new, .) lsm ---- @@ -277,7 +300,7 @@ L0.0: 000010:[d#13,SET-e#14,SET] L6: 000005:[a#10,SET-a#10,SET] - 000012:[b#16,SET-bb#16,SET] + 000012:[b#17,RANGEDEL-c#inf,RANGEDEL] 000006:[e#11,SET-e#11,SET] unblock c1 @@ -288,7 +311,20 @@ compact a-z ---- ok +iter +first +next +next +next +next +---- +a: (new, .) +b: (something, .) +bb: (something, .) +d: (new, .) +e: (new, .) + lsm ---- L6: - 000016:[a#0,SET-e#0,SET] + 000015:[a#0,SET-e#0,SET] diff --git a/testdata/determinism b/testdata/determinism index f7a30ce981..e1026ba04b 100644 --- a/testdata/determinism +++ b/testdata/determinism @@ -58,11 +58,11 @@ set alpha 5 ---- 7:batch -ingest-and-excise contains-excise-tombstone excise=a-b ext-ab +ingest-and-excise excise=a-b ext-ab ---- 8:ingest-and-excise -ingest-and-excise contains-excise-tombstone excise=b-c ext-bc +ingest-and-excise excise=b-c ext-bc ---- 9:ingest-and-excise diff --git a/testdata/event_listener b/testdata/event_listener index 77c34a2a27..44118fd4ef 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -46,6 +46,11 @@ close: db/marker.format-version.000004.017 remove: db/marker.format-version.000003.016 sync: db upgraded to format version: 017 +create: db/marker.format-version.000005.018 +close: db/marker.format-version.000005.018 +remove: db/marker.format-version.000004.017 +sync: db +upgraded to format version: 018 create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -353,9 +358,9 @@ close: . open-dir: checkpoint link: db/OPTIONS-000003 -> checkpoint/OPTIONS-000003 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.017 -sync-data: checkpoint/marker.format-version.000001.017 -close: checkpoint/marker.format-version.000001.017 +create: checkpoint/marker.format-version.000001.018 +sync-data: checkpoint/marker.format-version.000001.018 +close: checkpoint/marker.format-version.000001.018 sync: checkpoint close: checkpoint link: db/000013.sst -> checkpoint/000013.sst diff --git a/testdata/excise b/testdata/excise index b6492ee658..ec3e0dff5d 100644 --- a/testdata/excise +++ b/testdata/excise @@ -542,8 +542,7 @@ c: (something, .) . -# Test to verify that IngestAndExcise now uses flushableIngest when directed to -# do so using a flag that is passed down by the caller. +# Test to verify that IngestAndExcise now uses flushableIngest. reset ---- @@ -625,18 +624,18 @@ set f something del b-e ---- -ingest-and-excise ext5 excise=b-e contains-excise-tombstone +ingest-and-excise ext5 excise=b-e ---- -flushable ingest +memtable flushed lsm ---- L0.0: - 000010:[b#17,SET-f#17,SET] - 000013:[x#15,SET-x#15,SET] + 000010:[b#18,SET-f#18,SET] + 000012:[x#15,SET-x#15,SET] L6: - 000014(000009):[a#0,SET-a#0,SET] - 000015(000009):[g#0,SET-g#0,SET] + 000013(000009):[a#0,SET-a#0,SET] + 000014(000009):[g#0,SET-g#0,SET] iter lower=c upper=e last @@ -667,7 +666,7 @@ compact a z lsm ---- L6: - 000018:[a#0,SET-x#0,SET] + 000017:[a#0,SET-x#0,SET] batch commit ---- @@ -683,17 +682,17 @@ set f somethingElse del b-e ---- -ingest-and-excise ext5 ext6 excise=b-e contains-excise-tombstone +ingest-and-excise ext5 ext6 excise=b-e ---- lsm ---- L0.0: - 000019:[a#22,SET-a#22,SET] - 000020:[b#23,SET-f#23,SET] + 000018:[a#22,SET-a#22,SET] + 000019:[b#23,SET-f#23,SET] L6: - 000021(000018):[a#0,SET-aa#0,SET] - 000022(000018):[f#0,SET-x#0,SET] + 000020(000017):[a#0,SET-aa#0,SET] + 000021(000017):[f#0,SET-x#0,SET] iter lower=a upper=f first @@ -709,3 +708,74 @@ b: (somethingElse, .) c: (somethingElse, .) . . + +# Two ovelrapping ingestions wait on one another even if +# the overlap is only on the excise span. + +reset +---- + +batch +set a foo +set b bar +set bb neverseen +set c baz +---- + +build ext7 +set b foo +set c bar +---- + +ingest-and-excise ext7 excise=b-g no-wait +---- + +build ext8 +set d gee +set e fee +---- + +ingest ext8 no-wait +---- + +iter +first +next +next +next +next +next +---- +a: (foo, .) +b: (foo, .) +c: (bar, .) +d: (gee, .) +e: (fee, .) +. + +flush +---- + +lsm +---- +L0.0: + 000007(000006):[a#10,SET-a#10,SET] +L6: + 000004:[b#15,SET-c#15,SET] + 000008:[d#16,SET-e#16,SET] + + +iter +first +next +next +next +next +next +---- +a: (foo, .) +b: (foo, .) +c: (bar, .) +d: (gee, .) +e: (fee, .) +. diff --git a/testdata/flushable_ingest b/testdata/flushable_ingest index cf5d2d3596..731538b30f 100644 --- a/testdata/flushable_ingest +++ b/testdata/flushable_ingest @@ -60,7 +60,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 # Test basic WAL replay @@ -81,7 +81,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open @@ -390,7 +390,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 close @@ -410,7 +410,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open @@ -443,7 +443,7 @@ MANIFEST-000001 MANIFEST-000011 OPTIONS-000014 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000002.MANIFEST-000011 # Make sure that the new mutable memtable can accept writes. @@ -586,7 +586,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 close @@ -605,7 +605,7 @@ MANIFEST-000001 OPTIONS-000003 ext ext1 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open diff --git a/testdata/ingested_flushable_api b/testdata/ingested_flushable_api index 5d36168efe..a54cca56b4 100644 --- a/testdata/ingested_flushable_api +++ b/testdata/ingested_flushable_api @@ -40,7 +40,7 @@ iter rangekeyIter ---- -d-g:{(#0,RANGEKEYSET,1,val1)} +d-g:{(#1,RANGEKEYSET,1,val1)} containsRangeKey ---- @@ -65,8 +65,8 @@ iter rangedelIter ---- -a-j:{(#0,RANGEDEL)} -o-z:{(#0,RANGEDEL)} +a-j:{(#2,RANGEDEL)} +o-z:{(#2,RANGEDEL)} rangekeyIter ---- @@ -93,15 +93,73 @@ flushable ext4 iter ---- -k#0,SET +k#3,SET rangekeyIter ---- -y-z:{(#0,RANGEKEYSET,1,val1)} +y-z:{(#3,RANGEKEYSET,1,val1)} rangedelIter ---- -a-j:{(#0,RANGEDEL)} +a-j:{(#3,RANGEDEL)} + +readyForFlush +---- +true + +containsRangeKey +---- +true + +build ext5 +set a aa +set k kk +range-key-set y z 1 val1 +---- + +flushable ext5 excise=a-g +---- + +iter +---- +a#5,SET +k#5,SET + +rangekeyIter +---- +a-g:{(#4,RANGEKEYDEL)} +y-z:{(#5,RANGEKEYSET,1,val1)} + +rangedelIter +---- +a-g:{(#4,RANGEDEL)} + +readyForFlush +---- +true + +containsRangeKey +---- +true + +build ext6 +range-key-set a c 1 val1 +---- + +flushable ext6 excise=d-g +---- + +iter +---- + +rangekeyIter +---- +a-c:{(#7,RANGEKEYSET,1,val1)} +d-g:{(#6,RANGEKEYDEL)} + +rangedelIter +---- +d-g:{(#6,RANGEDEL)} readyForFlush ---- diff --git a/tool/db.go b/tool/db.go index 51396f52d9..2f7a2bd5bd 100644 --- a/tool/db.go +++ b/tool/db.go @@ -625,7 +625,7 @@ func (d *dbT) runExcise(cmd *cobra.Command, args []string) { return } - _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span, true /* sstsContainExciseTombstone */) + _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span) if err != nil { fmt.Fprintf(stderr, "Error excising: %s\n", err) return diff --git a/tool/wal.go b/tool/wal.go index 27a279b850..4d0ffde5ab 100644 --- a/tool/wal.go +++ b/tool/wal.go @@ -271,6 +271,8 @@ func (w *walT) dumpBatch( case base.InternalKeyKindIngestSST: fileNum, _ := binary.Uvarint(ukey) fmt.Fprintf(stdout, "%s", base.FileNum(fileNum)) + case base.InternalKeyKindExcise: + fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value)) case base.InternalKeyKindSingleDelete: fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey)) case base.InternalKeyKindSetWithDelete: