Skip to content

Commit

Permalink
colblk: implement HideObsoletePoints in DataBlockIter
Browse files Browse the repository at this point in the history
Benchmark for the no-transform path before/after:
```
name                                                                                old time/op    new time/op    delta
CockroachDataBlockIterShort/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8/Next-10           12.2ns ± 0%    12.5ns ± 3%  +2.38%  (p=0.008 n=5+5)
CockroachDataBlockIterShort/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8/SeekGE-10          113ns ± 0%     114ns ± 0%  +0.80%  (p=0.008 n=5+5)
CockroachDataBlockIterShort/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128/Next-10      10.1ns ± 0%    10.3ns ± 0%  +2.30%  (p=0.016 n=5+4)
CockroachDataBlockIterShort/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128/SeekGE-10    89.4ns ± 0%    90.3ns ± 0%  +1.08%  (p=0.008 n=5+5)
```

Transforms benchmarks:
```
name                                                                                                  time/op
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8/Next-10                      12.3ns ± 1%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8/SeekGE-10                     113ns ± 0%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8,SynthSeqNum/Next-10          12.5ns ± 0%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8,SynthSeqNum/SeekGE-10         114ns ± 1%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8,HideObsolete/Next-10         13.0ns ± 1%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=8,Shared=4,ValueLen=8,HideObsolete/SeekGE-10        117ns ± 1%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128/Next-10                 10.3ns ± 0%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128/SeekGE-10               90.5ns ± 1%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128,SynthSeqNum/Next-10     10.5ns ± 0%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128,SynthSeqNum/SeekGE-10   90.9ns ± 1%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128,HideObsolete/Next-10    10.8ns ± 0%
CockroachDataBlockIterTransforms/AlphaLen=8,Prefix=128,Shared=64,ValueLen=128,HideObsolete/SeekGE-10  93.9ns ± 0%
```
  • Loading branch information
RaduBerinde committed Sep 29, 2024
1 parent d93251c commit d926b52
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 156 deletions.
7 changes: 5 additions & 2 deletions sstable/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ type IndexBlockIterator interface {
// preferable.
type IterTransforms struct {
// SyntheticSeqNum, if set, overrides the sequence number in all keys. It is
// set if the sstable was ingested or it is foregin.
SyntheticSeqNum SyntheticSeqNum
// set if the sstable was ingested or it is foreign.
SyntheticSeqNum SyntheticSeqNum
// HideObsoletePoints, if true, skips over obsolete points during iteration.
// This is the norm when the sstable is foreign or the largest sequence number
// of the sstable is below the one we are reading.
HideObsoletePoints bool
SyntheticPrefix SyntheticPrefix
SyntheticSuffix SyntheticSuffix
Expand Down
43 changes: 38 additions & 5 deletions sstable/colblk/cockroach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"unsafe"

"github.com/cockroachdb/crlib/crbytes"
"github.com/cockroachdb/crlib/crstrings"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/crdbtest"
"github.com/cockroachdb/pebble/sstable/block"
Expand Down Expand Up @@ -461,7 +462,7 @@ func BenchmarkCockroachDataBlockIterFull(b *testing.B) {
ValueLen: valueLen,
}
b.Run(cfg.String(), func(b *testing.B) {
benchmarkCockroachDataBlockIter(b, cfg)
benchmarkCockroachDataBlockIter(b, cfg, block.IterTransforms{})
})
}
}
Expand Down Expand Up @@ -492,11 +493,40 @@ var shortBenchConfigs = []benchConfig{
func BenchmarkCockroachDataBlockIterShort(b *testing.B) {
for _, cfg := range shortBenchConfigs {
b.Run(cfg.String(), func(b *testing.B) {
benchmarkCockroachDataBlockIter(b, cfg)
benchmarkCockroachDataBlockIter(b, cfg, block.IterTransforms{})
})
}
}

func BenchmarkCockroachDataBlockIterTransforms(b *testing.B) {
transforms := []struct {
description string
transforms block.IterTransforms
}{
{},
{
description: "SynthSeqNum",
transforms: block.IterTransforms{
SyntheticSeqNum: 1234,
},
},
{
description: "HideObsolete",
transforms: block.IterTransforms{
HideObsoletePoints: true,
},
},
}
for _, cfg := range shortBenchConfigs {
for _, t := range transforms {
name := cfg.String() + crstrings.If(t.description != "", ","+t.description)
b.Run(name, func(b *testing.B) {
benchmarkCockroachDataBlockIter(b, cfg, t.transforms)
})
}
}
}

type benchConfig struct {
crdbtest.KeyConfig
ValueLen int
Expand All @@ -506,7 +536,9 @@ func (cfg benchConfig) String() string {
return fmt.Sprintf("%s,ValueLen=%d", cfg.KeyConfig, cfg.ValueLen)
}

func benchmarkCockroachDataBlockIter(b *testing.B, cfg benchConfig) {
func benchmarkCockroachDataBlockIter(
b *testing.B, cfg benchConfig, transforms block.IterTransforms,
) {
const targetBlockSize = 32 << 10
seed := uint64(time.Now().UnixNano())
rng := rand.New(rand.NewSource(seed))
Expand All @@ -519,7 +551,8 @@ func benchmarkCockroachDataBlockIter(b *testing.B, cfg benchConfig) {
for w.Size() < targetBlockSize {
ik := base.MakeInternalKey(keys[count], base.SeqNum(rng.Uint64n(uint64(base.SeqNumMax))), base.InternalKeyKindSet)
kcmp := w.KeyWriter.ComparePrev(ik.UserKey)
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, false /* isObsolete */)
isObsolete := rng.Intn(20) == 0
w.Add(ik, values[count], block.InPlaceValuePrefix(kcmp.PrefixEqual()), kcmp, isObsolete)
count++
}
serializedBlock, _ := w.Finish(w.Rows(), w.Size())
Expand All @@ -528,7 +561,7 @@ func benchmarkCockroachDataBlockIter(b *testing.B, cfg benchConfig) {
reader.Init(cockroachKeySchema, serializedBlock)
it.Init(&reader, cockroachKeySchema.NewKeySeeker(), getLazyValuer(func([]byte) base.LazyValue {
return base.LazyValue{ValueOrHandle: []byte("mock external value")}
}), block.IterTransforms{})
}), transforms)
avgRowSize := float64(len(serializedBlock)) / float64(count)

b.Run("Next", func(b *testing.B) {
Expand Down
164 changes: 141 additions & 23 deletions sstable/colblk/data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,11 @@ type DataBlockIter struct {
row int
kv base.InternalKV
kvRow int // the row currently held in kv

// nextObsoletePoint is the row index of the first obsolete point after i.row.
// It is used to optimize skipping of obsolete points during forward
// iteration.
nextObsoletePoint int
}

// Init initializes the data block iterator, configuring it to read from the
Expand All @@ -750,9 +755,10 @@ func (i *DataBlockIter) Init(
getLazyValuer block.GetLazyValueForPrefixAndValueHandler,
transforms block.IterTransforms,
) error {
numRows := int(r.r.header.Rows)
*i = DataBlockIter{
r: r,
maxRow: int(r.r.header.Rows) - 1,
maxRow: numRows - 1,
keySeeker: keyIterator,
getLazyValuer: getLazyValuer,
transforms: transforms,
Expand All @@ -761,6 +767,10 @@ func (i *DataBlockIter) Init(
kv: base.InternalKV{},
keyIter: PrefixBytesIter{},
}
if i.transforms.HideObsoletePoints && r.isObsolete.SeekSetBitGE(0) == numRows {
// There are no obsolete points in the block; don't bother checking.
i.transforms.HideObsoletePoints = false
}
// Allocate a keyIter buffer that's large enough to hold the largest user
// key in the block with 1 byte to spare (so that pointer arithmetic is
// never pointing beyond the allocation, which would violate Go rules).
Expand All @@ -787,7 +797,21 @@ func (i *DataBlockIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.Interna
if flags.TrySeekUsingNext() {
searchDir = +1
}
return i.decodeRow(i.keySeeker.SeekGE(key, i.row, searchDir))
i.row = i.keySeeker.SeekGE(key, i.row, searchDir)
if i.transforms.HideObsoletePoints {
if i.row < i.maxRow {
i.nextObsoletePoint = i.r.isObsolete.SeekSetBitGE(i.row)
} else {
i.nextObsoletePoint = i.maxRow + 1
}
if i.atObsoletePointForward() {
i.skipObsoletePointsForward()
if i.row > i.maxRow {
return nil
}
}
}
return i.decodeRow()
}

// SeekPrefixGE implements the base.InternalIterator interface.
Expand All @@ -806,18 +830,47 @@ func (i *DataBlockIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags)

// SeekLT implements the base.InternalIterator interface.
func (i *DataBlockIter) SeekLT(key []byte, _ base.SeekLTFlags) *base.InternalKV {
row := i.keySeeker.SeekGE(key, i.row, 0 /* searchDir */)
return i.decodeRow(row - 1)
i.row = i.keySeeker.SeekGE(key, i.row, 0 /* searchDir */) - 1
if i.transforms.HideObsoletePoints {
i.nextObsoletePoint = i.r.isObsolete.SeekSetBitGE(max(i.row, 0))
if i.atObsoletePointBackward() {
i.skipObsoletePointsBackward()
if i.row < 0 {
return nil
}
}
}
return i.decodeRow()
}

// First implements the base.InternalIterator interface.
func (i *DataBlockIter) First() *base.InternalKV {
return i.decodeRow(0)
i.row = 0
if i.transforms.HideObsoletePoints {
i.nextObsoletePoint = i.r.isObsolete.SeekSetBitGE(0)
if i.atObsoletePointForward() {
i.skipObsoletePointsForward()
if i.row > i.maxRow {
return nil
}
}
}
return i.decodeRow()
}

// Last implements the base.InternalIterator interface.
func (i *DataBlockIter) Last() *base.InternalKV {
return i.decodeRow(i.r.BlockReader().Rows() - 1)
i.row = i.maxRow
if i.transforms.HideObsoletePoints {
i.nextObsoletePoint = i.maxRow + 1
if i.atObsoletePointBackward() {
i.skipObsoletePointsBackward()
if i.row < 0 {
return nil
}
}
}
return i.decodeRow()
}

// Next advances to the next KV pair in the block.
Expand All @@ -828,6 +881,12 @@ func (i *DataBlockIter) Next() *base.InternalKV {
return nil
}
i.row++
if i.transforms.HideObsoletePoints && i.atObsoletePointForward() {
i.skipObsoletePointsForward()
if i.row > i.maxRow {
return nil
}
}
// Inline decodeKey().
i.kv.K = base.InternalKey{
UserKey: i.keySeeker.MaterializeUserKey(&i.keyIter, i.kvRow, i.row),
Expand Down Expand Up @@ -856,27 +915,91 @@ func (i *DataBlockIter) Next() *base.InternalKV {
// organized as a slice of 64-bit words. If a 64-bit word in the bitmap is zero
// then all of the rows corresponding to the bits in that word have the same
// prefix and we can skip ahead. If a row is non-zero a small bit of bit
// shifting and masking combined with bits.TrailingZeros64 can identify the the
// shifting and masking combined with bits.TrailingZeros64 can identify the
// next bit that is set after the current row. The bitmap uses 1 bit/row (0.125
// bytes/row). A 32KB block containing 1300 rows (25 bytes/row) would need a
// bitmap of 21 64-bit words. Even in the worst case where every word is 0 this
// bitmap can be scanned in ~20 ns (1 ns/word) leading to a total NextPrefix
// time of ~30 ns if a row is found and decodeRow are called. In more normal
// cases, NextPrefix takes ~15% longer that a single Next call.
//
// For comparision, the rowblk nextPrefixV3 optimizations work by setting a bit
// For comparison, the rowblk nextPrefixV3 optimizations work by setting a bit
// in the value prefix byte that indicates that the current key has the same
// prefix as the previous key. Additionally, a bit is stolen from the restart
// table entries indicating whether a restart table entry has the same key
// prefix as the previous entry. Checking the value prefix byte bit requires
// locating that byte which requires decoding 3 varints per key/value pair.
func (i *DataBlockIter) NextPrefix(_ []byte) *base.InternalKV {
return i.decodeRow(i.r.prefixChanged.SeekSetBitGE(i.row + 1))
i.row = i.r.prefixChanged.SeekSetBitGE(i.row + 1)
if i.transforms.HideObsoletePoints {
i.nextObsoletePoint = i.r.isObsolete.SeekSetBitGE(i.row)
if i.atObsoletePointForward() {
i.skipObsoletePointsForward()
}
}

return i.decodeRow()
}

// Prev moves the iterator to the previous KV pair in the block.
func (i *DataBlockIter) Prev() *base.InternalKV {
return i.decodeRow(i.row - 1)
i.row--
if i.transforms.HideObsoletePoints && i.atObsoletePointBackward() {
i.skipObsoletePointsBackward()
if i.row < 0 {
return nil
}
}
return i.decodeRow()
}

// atObsoletePointForward returns true if i.row is an obsolete point. It is
// separate from skipObsoletePointsForward() because that method does not
// inline. It can only be used during forward iteration (i.e. i.row was
// incremented).
//
//gcassert:inline
func (i *DataBlockIter) atObsoletePointForward() bool {
return i.row == i.nextObsoletePoint && i.row <= i.maxRow
}

func (i *DataBlockIter) skipObsoletePointsForward() {
if invariants.Enabled {
i.atObsoletePointCheck()
}
i.row = i.r.isObsolete.SeekUnsetBitGE(i.row)
if i.row <= i.maxRow {
i.nextObsoletePoint = i.r.isObsolete.SeekSetBitGE(i.row)
} else {
i.nextObsoletePoint = i.maxRow + 1
}
}

// atObsoletePointBackward returns true if i.row is an obsolete point. It is
// separate from skipObsoletePointsBackward() because that method does not
// inline. It can only be used during reverse iteration (i.e. i.row was
// decremented).
//
//gcassert:inline
func (i *DataBlockIter) atObsoletePointBackward() bool {
return i.row >= 0 && i.r.isObsolete.At(i.row)
}

func (i *DataBlockIter) skipObsoletePointsBackward() {
if invariants.Enabled {
i.atObsoletePointCheck()
}
i.row = i.r.isObsolete.SeekUnsetBitLE(i.row)
i.nextObsoletePoint = i.row + 1
}

func (i *DataBlockIter) atObsoletePointCheck() {
// We extract this code into a separate function to avoid getting a spurious
// error from GCAssert about At not being inlined because it is compiled out
// altogether in non-invariant builds.
if !i.transforms.HideObsoletePoints || !i.r.isObsolete.At(i.row) {
panic("expected obsolete point")
}
}

// Error implements the base.InternalIterator interface. A DataBlockIter is
Expand Down Expand Up @@ -906,20 +1029,15 @@ func (i *DataBlockIter) DebugTree(tp treeprinter.Node) {
tp.Childf("%T(%p)", i, i)
}

func (i *DataBlockIter) decodeRow(row int) *base.InternalKV {
// decodeRow decodes i.row into i.kv. If i.row is invalid, it returns nil.
func (i *DataBlockIter) decodeRow() *base.InternalKV {
switch {
case row < 0:
i.row = -1
return nil
case row >= i.r.BlockReader().Rows():
i.row = i.r.BlockReader().Rows()
case i.row < 0 || i.row > i.maxRow:
return nil
case i.kvRow == row:
i.row = row
case i.kvRow == i.row:
// Already synthesized the kv at row.
return &i.kv
default:
i.row = row
// Inline decodeKey().
i.kv.K = base.InternalKey{
UserKey: i.keySeeker.MaterializeUserKey(&i.keyIter, i.kvRow, i.row),
Expand All @@ -929,14 +1047,14 @@ func (i *DataBlockIter) decodeRow(row int) *base.InternalKV {
i.kv.K.SetSeqNum(base.SeqNum(n))
}
// Inline i.r.values.At(row).
startOffset := i.r.values.offsets.At(row)
v := unsafe.Slice((*byte)(i.r.values.ptr(startOffset)), i.r.values.offsets.At(row+1)-startOffset)
if i.r.isValueExternal.At(row) {
startOffset := i.r.values.offsets.At(i.row)
v := unsafe.Slice((*byte)(i.r.values.ptr(startOffset)), i.r.values.offsets.At(i.row+1)-startOffset)
if i.r.isValueExternal.At(i.row) {
i.kv.V = i.getLazyValuer.GetLazyValueForPrefixAndValueHandle(v)
} else {
i.kv.V = base.MakeInPlaceValue(v)
}
i.kvRow = row
i.kvRow = i.row
return &i.kv
}
}
Expand Down
Loading

0 comments on commit d926b52

Please sign in to comment.