Skip to content

Commit

Permalink
wal: failoverWriter, for switching across a sequence of record.LogWri…
Browse files Browse the repository at this point in the history
…ters

Except for Close, failoverWriter methods do not block on IO. Close can
block on IO if we run out of failover slots (10 slots) and new LogWriters
cannot be created.

Informs #3230

Informs CRDB-35401
  • Loading branch information
sumeerbhola committed Feb 11, 2024
1 parent 3e083df commit 38e3430
Show file tree
Hide file tree
Showing 10 changed files with 2,293 additions and 10 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2727,7 +2727,7 @@ func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
metrics := d.mu.log.writer.Metrics()

d.mu.Lock()
if err := d.mu.log.metrics.LogWriterMetrics.Merge(metrics); err != nil {
if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil {
d.opts.Logger.Errorf("metrics error: %s", err)
}

Expand Down
13 changes: 9 additions & 4 deletions record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,10 +1001,15 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
return p[r:]
}

// Metrics must be called after Close. The callee will no longer modify the
// returned LogWriterMetrics.
func (w *LogWriter) Metrics() *LogWriterMetrics {
return w.flusher.metrics
// Metrics must typically be called after Close, since the callee will no
// longer modify the returned LogWriterMetrics. It is also current if there is
// nothing left to flush in the flush loop, but that is an implementation
// detail that callers should not rely on.
func (w *LogWriter) Metrics() LogWriterMetrics {
w.flusher.Lock()
defer w.flusher.Unlock()
m := *w.flusher.metrics
return m
}

// LogWriterMetrics contains misc metrics for the log writer.
Expand Down
8 changes: 6 additions & 2 deletions vfs/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ const (
OpFileStat
// OpFileSync describes a file sync operation.
OpFileSync
// OpFileSyncData describes a file sync operation.
OpFileSyncData
// OpFileFlush describes a file flush operation.
OpFileFlush
)
Expand All @@ -90,7 +92,7 @@ func (o OpKind) ReadOrWrite() OpReadWrite {
switch o {
case OpOpen, OpOpenDir, OpList, OpStat, OpGetDiskUsage, OpFileRead, OpFileReadAt, OpFileStat:
return OpIsRead
case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForWrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileWriteAt, OpFileSync, OpFileFlush, OpFilePreallocate:
case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForWrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileWriteAt, OpFileSync, OpFileSyncData, OpFileFlush, OpFilePreallocate:
return OpIsWrite
default:
panic(fmt.Sprintf("unrecognized op %v\n", o))
Expand Down Expand Up @@ -526,7 +528,9 @@ func (f *errorFile) Sync() error {
}

func (f *errorFile) SyncData() error {
// TODO(jackson): Consider error injection.
if err := f.inj.MaybeError(Op{Kind: OpFileSyncData, Path: f.path}); err != nil {
return err
}
return f.file.SyncData()
}

Expand Down
64 changes: 64 additions & 0 deletions wal/failover_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package wal

import (
"sync"
"time"

"github.com/cockroachdb/pebble/vfs"
)

type dirIndex int

const (
primaryDirIndex dirIndex = iota
secondaryDirIndex
numDirIndices
)

type dirAndFileHandle struct {
Dir
vfs.File
}

// switchableWriter is a subset of failoverWriter needed by failoverMonitor.
type switchableWriter interface {
switchToNewDir(dirAndFileHandle) error
ongoingLatencyOrErrorForCurDir() (time.Duration, error)
}

type stopper struct {
quiescer chan struct{} // Closed when quiescing
wg sync.WaitGroup
}

func newStopper() *stopper {
return &stopper{
quiescer: make(chan struct{}),
}
}

func (s *stopper) runAsync(f func()) {
s.wg.Add(1)
go func() {
f()
s.wg.Done()
}()
}

// shouldQuiesce returns a channel which will be closed when stop() has been
// invoked and outstanding goroutines should begin to quiesce.
func (s *stopper) shouldQuiesce() <-chan struct{} {
return s.quiescer
}

func (s *stopper) stop() {
close(s.quiescer)
s.wg.Wait()
}

// Make lint happy.
var _ = (&stopper{}).shouldQuiesce
Loading

0 comments on commit 38e3430

Please sign in to comment.