Skip to content

Commit

Permalink
sstable: add ComparePrev, SetSnapshotPinnedProperties to RawWriter
Browse files Browse the repository at this point in the history
Extend the RawWriter interface to support two additional methods required by
compactions. The RawRowWriter.UnsafeLastPointUserKey method is adapted into
ComparePrev so that it may be implemented by RawColumnWriter without
materializing the previous key.
  • Loading branch information
jbowens committed Sep 27, 2024
1 parent b910a19 commit 12dc887
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 21 deletions.
2 changes: 1 addition & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
// compaction or flush.
func (d *DB) newCompactionOutput(
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
) (objstorage.ObjectMetadata, *sstable.RawRowWriter, CPUWorkHandle, error) {
) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
d.mu.Lock()
diskFileNum := d.mu.versions.getNextDiskFileNum()
d.mu.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions internal/compact/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *Runner) MoreDataToWrite() bool {
// Result.Tables. Should only be called if MoreDataToWrite() returned true.
//
// WriteTable always closes the Writer.
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawRowWriter) {
func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw sstable.RawWriter) {
if r.err != nil {
panic("error already encountered")
}
Expand All @@ -159,7 +159,7 @@ func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawRo
r.tables[len(r.tables)-1].WriterMeta = *writerMeta
}

func (r *Runner) writeKeysToTable(tw *sstable.RawRowWriter) (splitKey []byte, _ error) {
func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ error) {
firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
if r.key != nil && firstKey == nil {
firstKey = r.key.UserKey
Expand All @@ -171,13 +171,13 @@ func (r *Runner) writeKeysToTable(tw *sstable.RawRowWriter) (splitKey []byte, _
r.cmp, firstKey, r.TableSplitLimit(firstKey),
r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
)
lastUserKeyFn := func() []byte {
return tw.UnsafeLastPointUserKey()
comparePrev := func(k []byte) int {
return tw.ComparePrev(k)
}
var pinnedKeySize, pinnedValueSize, pinnedCount uint64
key, value := r.key, r.value
for ; key != nil; key, value = r.iter.Next() {
if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) {
if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), comparePrev) {
break
}

Expand Down
8 changes: 4 additions & 4 deletions internal/compact/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *OutputSplitter) setNextBoundary(nextGrandparent *manifest.FileMetadata)
// key. It is passed the current estimated file size and a function that can be
// used to retrieve the previous user key.
//
// The lastUserKeyFn function is used to guarantee no split user keys, without
// The comparePrevFn function is used to guarantee no split user keys, without
// OutputSplitter copying each key internally. It is not performance sensitive,
// as it is only called once we decide to split.
//
Expand All @@ -206,7 +206,7 @@ func (s *OutputSplitter) setNextBoundary(nextGrandparent *manifest.FileMetadata)
//
// INVARIANT: nextUserKey must match the current frontier.
func (s *OutputSplitter) ShouldSplitBefore(
nextUserKey []byte, estimatedFileSize uint64, lastUserKeyFn func() []byte,
nextUserKey []byte, estimatedFileSize uint64, comparePrevFn func([]byte) int,
) ShouldSplit {
if invariants.Enabled && s.splitKey != nil {
panic("ShouldSplitBefore called after it returned SplitNow")
Expand Down Expand Up @@ -255,10 +255,10 @@ func (s *OutputSplitter) ShouldSplitBefore(
}

// TODO(radu): it would make for a cleaner interface if we didn't rely on a
// lastUserKeyFn. We could make a copy of the key here and split at the next
// comparePrevFn. We could make a copy of the key here and split at the next
// user key that is different; the main difficulty is that various tests
// expect 1 key per output table if the target file size is very small.
if lastKey := lastUserKeyFn(); lastKey == nil || s.cmp(lastKey, nextUserKey) != 0 {
if comparePrevFn(nextUserKey) != 0 {
s.splitKey = slices.Clone(nextUserKey)
return SplitNow
}
Expand Down
2 changes: 1 addition & 1 deletion internal/compact/splitting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestOutputSplitter(t *testing.T) {
if i > 0 || rand.Intn(2) == 0 {
f.Advance([]byte(key))
}
if s.ShouldSplitBefore([]byte(key), estimatedSize, func() []byte { return []byte(last) }) {
if s.ShouldSplitBefore([]byte(key), estimatedSize, func(k []byte) int { return f.cmp(k, []byte(last)) }) {
return fmt.Sprintf("%s %d: split at %q", key, estimatedSize, s.SplitKey())
}
last = key
Expand Down
24 changes: 24 additions & 0 deletions sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,30 @@ func (w *RawColumnWriter) EstimatedSize() uint64 {
return sz
}

// ComparePrev compares the provided user to the last point key written to the
// writer. The returned value is equivalent to Compare(key, prevKey) where
// prevKey is the last point key written to the writer.
//
// If no key has been written yet, ComparePrev returns +1.
//
// Must not be called after Writer is closed.
func (w *RawColumnWriter) ComparePrev(k []byte) int {
if w == nil || w.dataBlock.Rows() == 0 {
return +1
}
return int(w.dataBlock.KeyWriter.ComparePrev(k).UserKeyComparison)
}

// SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
// be used internally by Pebble.
func (w *RawColumnWriter) SetSnapshotPinnedProperties(
pinnedKeyCount, pinnedKeySize, pinnedValueSize uint64,
) {
w.props.SnapshotPinnedKeys = pinnedKeyCount
w.props.SnapshotPinnedKeySize = pinnedKeySize
w.props.SnapshotPinnedValueSize = pinnedValueSize
}

// Metadata returns the metadata for the finished sstable. Only valid to call
// after the sstable has been finished.
func (w *RawColumnWriter) Metadata() (*WriterMetadata, error) {
Expand Down
19 changes: 9 additions & 10 deletions sstable/rowblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,19 +1550,18 @@ func (w *RawRowWriter) assertFormatCompatibility() error {
return nil
}

// UnsafeLastPointUserKey returns the last point key written to the writer to
// which this option was passed during creation. The returned key points
// directly into a buffer belonging to the Writer. The value's lifetime ends the
// next time a point key is added to the Writer.
// ComparePrev compares the provided user to the last point key written to the
// writer. The returned value is equivalent to Compare(key, prevKey) where
// prevKey is the last point key written to the writer.
//
// If no key has been written yet, ComparePrev returns +1.
//
// Must not be called after Writer is closed.
func (w *RawRowWriter) UnsafeLastPointUserKey() []byte {
if w != nil && w.dataBlockBuf.dataBlock.EntryCount() >= 1 {
// w.dataBlockBuf.dataBlock.curKey is guaranteed to point to the last point key
// which was added to the Writer.
return w.dataBlockBuf.dataBlock.CurUserKey()
func (w *RawRowWriter) ComparePrev(k []byte) int {
if w == nil || w.dataBlockBuf.dataBlock.EntryCount() == 0 {
return +1
}
return nil
return w.compare(k, w.dataBlockBuf.dataBlock.CurUserKey())
}

// EncodeSpan encodes the keys in the given span. The span can contain either
Expand Down
11 changes: 11 additions & 0 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ type RawWriter interface {
// EstimatedSize returns the estimated size of the sstable being written if
// a call to Close() was made without adding additional keys.
EstimatedSize() uint64
// ComparePrev compares the provided user to the last point key written to the
// writer. The returned value is equivalent to Compare(key, prevKey) where
// prevKey is the last point key written to the writer.
//
// If no key has been written yet, ComparePrev returns +1.
//
// Must not be called after Writer is closed.
ComparePrev(k []byte) int
// SetSnapshotPinnedProperties sets the properties for pinned keys. Should only
// be used internally by Pebble.
SetSnapshotPinnedProperties(keyCount, keySize, valueSize uint64)
// Close finishes writing the table and closes the underlying file that the
// table was written to.
Close() error
Expand Down

0 comments on commit 12dc887

Please sign in to comment.