diff --git a/flushable.go b/flushable.go index c8b0c43a671..dfb263e7cef 100644 --- a/flushable.go +++ b/flushable.go @@ -5,9 +5,11 @@ package pebble import ( + "bytes" "context" "fmt" "io" + "slices" "sync/atomic" "time" @@ -318,32 +320,33 @@ func (s *ingestedFlushable) computePossibleOverlaps( } } -// bufferedSSTables holds a set of in-memory sstables produced by a flush. -// Buffering flushed state reduces write amplification by making it more likely -// that we're able to drop KVs before they reach disk. -type bufferedSSTables struct { +// flushableBufferedSSTables holds a set of in-memory sstables produced by a +// flush. Buffering flushed state reduces write amplification by making it more +// likely that we're able to drop KVs before they reach disk. +type flushableBufferedSSTables struct { metas []*fileMetadata readers []*sstable.Reader } var ( - // Assert that *bufferedSSTables implements the flushable interface. - _ flushable = (*bufferedSSTables)(nil) - // Assert that *bufferedSSTables implements the objectCreator interface. - _ objectCreator = (*bufferedSSTables)(nil) + // Assert that *flushableBufferedSSTables implements the flushable + // interface. + _ flushable = (*flushableBufferedSSTables)(nil) ) // newIter is part of the flushable interface. -func (b *bufferedSSTables) newIter(o *IterOptions) internalIterator { +func (b *flushableBufferedSSTables) newIter(o *IterOptions) internalIterator { panic("TODO") } // newFlushIter is part of the flushable interface. -func (b *bufferedSSTables) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { +func (b *flushableBufferedSSTables) newFlushIter( + o *IterOptions, bytesFlushed *uint64, +) internalIterator { panic("TODO") } -func (b *bufferedSSTables) constructRangeDelIter( +func (b *flushableBufferedSSTables) constructRangeDelIter( file *manifest.FileMetadata, _ keyspan.SpanIterOptions, ) (keyspan.FragmentIterator, error) { panic("TODO") @@ -353,12 +356,12 @@ func (b *bufferedSSTables) constructRangeDelIter( // // TODO(sumeer): *IterOptions are being ignored, so the index block load for // the point iterator in constructRangeDeIter is not tracked. -func (b *bufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { +func (b *flushableBufferedSSTables) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { panic("TODO") } // newRangeKeyIter is part of the flushable interface. -func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { +func (b *flushableBufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { if !b.containsRangeKeys() { return nil } @@ -366,33 +369,42 @@ func (b *bufferedSSTables) newRangeKeyIter(o *IterOptions) keyspan.FragmentItera } // containsRangeKeys is part of the flushable interface. -func (b *bufferedSSTables) containsRangeKeys() bool { +func (b *flushableBufferedSSTables) containsRangeKeys() bool { panic("TODO") } // inuseBytes is part of the flushable interface. -func (b *bufferedSSTables) inuseBytes() uint64 { +func (b *flushableBufferedSSTables) inuseBytes() uint64 { panic("TODO") } // totalBytes is part of the flushable interface. -func (b *bufferedSSTables) totalBytes() uint64 { +func (b *flushableBufferedSSTables) totalBytes() uint64 { panic("TODO") } // readyForFlush is part of the flushable interface. -func (b *bufferedSSTables) readyForFlush() bool { +func (b *flushableBufferedSSTables) readyForFlush() bool { // Buffered sstables are always ready for flush; they're immutable. return true } // computePossibleOverlaps is part of the flushable interface. -func (b *bufferedSSTables) computePossibleOverlaps( +func (b *flushableBufferedSSTables) computePossibleOverlaps( fn func(bounded) shouldContinue, bounded ...bounded, ) { panic("TODO") } +type bufferedSSTables struct { + curr bytes.Buffer + bufs [][]byte + metas []*fileMetadata +} + +// Assert that *bufferedSSTables implements the objectCreator interface. +var _ objectCreator = (*bufferedSSTables)(nil) + // Create implements the objectCreator interface. func (b *bufferedSSTables) Create( ctx context.Context, @@ -418,6 +430,32 @@ func (b *bufferedSSTables) Sync() error { panic("TODO") } +// Assert that bufferedSSTables implements objstorage.Writable. +// +// A flush writes files sequentially, so the bufferedSSTables type implements +// Writable directly, serving as the destination for writes across all sstables +// written by the flush. +var _ objstorage.Writable = (*bufferedSSTables)(nil) + +// Finish implements objstorage.Writable. +func (o *bufferedSSTables) Write(p []byte) error { + _, err := o.curr.Write(p) + o.curr.Reset() + return err +} + +// Finish implements objstorage.Writable. +func (o *bufferedSSTables) Finish() error { + o.bufs = append(o.bufs, slices.Clone(o.curr.Bytes())) + o.curr.Reset() + return nil +} + +// Abort implements objstorage.Writable. +func (o *bufferedSSTables) Abort() { + o.curr.Reset() +} + // computePossibleOverlapsGenericImpl is an implemention of the flushable // interface's computePossibleOverlaps function for flushable implementations // with only in-memory state that do not have special requirements and should