diff --git a/db.go b/db.go index 63667dd9d7..1469fb76da 100644 --- a/db.go +++ b/db.go @@ -2727,7 +2727,7 @@ func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) { metrics := d.mu.log.writer.Metrics() d.mu.Lock() - if err := d.mu.log.metrics.LogWriterMetrics.Merge(metrics); err != nil { + if err := d.mu.log.metrics.LogWriterMetrics.Merge(&metrics); err != nil { d.opts.Logger.Errorf("metrics error: %s", err) } diff --git a/record/log_writer.go b/record/log_writer.go index 95a5dd2846..6668746e00 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -1001,10 +1001,15 @@ func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) { return p[r:] } -// Metrics must be called after Close. The callee will no longer modify the -// returned LogWriterMetrics. -func (w *LogWriter) Metrics() *LogWriterMetrics { - return w.flusher.metrics +// Metrics must typically be called after Close, since the callee will no +// longer modify the returned LogWriterMetrics. It is also current if there is +// nothing left to flush in the flush loop, but that is an implementation +// detail that callers should not rely on. +func (w *LogWriter) Metrics() LogWriterMetrics { + w.flusher.Lock() + defer w.flusher.Unlock() + m := *w.flusher.metrics + return m } // LogWriterMetrics contains misc metrics for the log writer. diff --git a/vfs/errorfs/errorfs.go b/vfs/errorfs/errorfs.go index 56177f67c1..479703a747 100644 --- a/vfs/errorfs/errorfs.go +++ b/vfs/errorfs/errorfs.go @@ -81,6 +81,8 @@ const ( OpFileStat // OpFileSync describes a file sync operation. OpFileSync + // OpFileSyncData describes a file sync operation. + OpFileSyncData // OpFileFlush describes a file flush operation. OpFileFlush ) @@ -90,7 +92,7 @@ func (o OpKind) ReadOrWrite() OpReadWrite { switch o { case OpOpen, OpOpenDir, OpList, OpStat, OpGetDiskUsage, OpFileRead, OpFileReadAt, OpFileStat: return OpIsRead - case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForWrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileWriteAt, OpFileSync, OpFileFlush, OpFilePreallocate: + case OpCreate, OpLink, OpRemove, OpRemoveAll, OpRename, OpReuseForWrite, OpMkdirAll, OpLock, OpFileClose, OpFileWrite, OpFileWriteAt, OpFileSync, OpFileSyncData, OpFileFlush, OpFilePreallocate: return OpIsWrite default: panic(fmt.Sprintf("unrecognized op %v\n", o)) @@ -526,7 +528,9 @@ func (f *errorFile) Sync() error { } func (f *errorFile) SyncData() error { - // TODO(jackson): Consider error injection. + if err := f.inj.MaybeError(Op{Kind: OpFileSyncData, Path: f.path}); err != nil { + return err + } return f.file.SyncData() } diff --git a/wal/failover_manager.go b/wal/failover_manager.go new file mode 100644 index 0000000000..f86b5f53d2 --- /dev/null +++ b/wal/failover_manager.go @@ -0,0 +1,64 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package wal + +import ( + "sync" + "time" + + "github.com/cockroachdb/pebble/vfs" +) + +type dirIndex int + +const ( + primaryDirIndex dirIndex = iota + secondaryDirIndex + numDirIndices +) + +type dirAndFileHandle struct { + Dir + vfs.File +} + +// switchableWriter is a subset of failoverWriter needed by failoverMonitor. +type switchableWriter interface { + switchToNewDir(dirAndFileHandle) error + ongoingLatencyOrErrorForCurDir() (time.Duration, error) +} + +type stopper struct { + quiescer chan struct{} // Closed when quiescing + wg sync.WaitGroup +} + +func newStopper() *stopper { + return &stopper{ + quiescer: make(chan struct{}), + } +} + +func (s *stopper) runAsync(f func()) { + s.wg.Add(1) + go func() { + f() + s.wg.Done() + }() +} + +// shouldQuiesce returns a channel which will be closed when stop() has been +// invoked and outstanding goroutines should begin to quiesce. +func (s *stopper) shouldQuiesce() <-chan struct{} { + return s.quiescer +} + +func (s *stopper) stop() { + close(s.quiescer) + s.wg.Wait() +} + +// Make lint happy. +var _ = (&stopper{}).shouldQuiesce diff --git a/wal/failover_writer.go b/wal/failover_writer.go new file mode 100644 index 0000000000..4a432856b7 --- /dev/null +++ b/wal/failover_writer.go @@ -0,0 +1,800 @@ +// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package wal + +import ( + "io" + "sync" + "sync/atomic" + "time" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" + "github.com/prometheus/client_golang/prometheus" +) + +// recordQueueEntry is an entry in recordQueue. +type recordQueueEntry struct { + p []byte + opts SyncOptions +} + +const initialBufferLen = 8192 + +// recordQueue is a variable-size single-producer multiple-consumer queue. It +// is not lock-free, but most operations only need mu.RLock. It needs a mutex +// to grow the size, since there is no upper bound on the number of queued +// records (which are all the records that are not synced, and will need to be +// written again in case of failover). Additionally, it needs a mutex to +// atomically grab a snapshot of the queued records and provide them to a new +// LogWriter that is being switched to. +type recordQueue struct { + // Only held for reading for all pop operations and most push operations. + // Held for writing when buffer needs to be grown or when switching to a new + // writer. + mu sync.RWMutex + + // queue is [tail, head). tail is the oldest entry and head is the index for + // the next entry. + // + // Consumers: atomically read and write tail in pop (using + // compare-and-swap). This is not the usual kind of queue consumer since + // they already know the index that they are popping exists, hence don't + // need to look at head. + // + // Producer: atomically reads tail in push. Writes to head. + // + // Based on the above we only need tail to be atomic. However, the producer + // also populates entries in buffer, whose values need to be seen by the + // consumers when doing a pop, which means they need to synchronize using a + // release and acquire memory barrier pair, where the push does the release + // and the pop does the acquire. For this reason we make head also atomic + // and merge head and tail into a single atomic, so that the store of head + // in push and the load of tail in pop accomplishes this release-acquire + // pair. + // + // All updates to headTail hold mu at least for reading. So when mu is held + // for writing, there is a guarantee that headTail is not being updated. + // + // head is most-significant 32 bits and tail is least-significant 32 bits. + headTail atomic.Uint64 + + // Access to buffer requires at least RLock. + buffer []recordQueueEntry + + lastTailObservedByProducer uint32 + + // Read requires RLock. + writer *record.LogWriter + + // When writer != nil, this is the return value of the last call to + // SyncRecordGeneralized. It is updated in (a) WriteRecord calls push, using + // only RLock (since WriteRecord is externally synchronized), (b) + // snapshotAndSwitchWriter, using Lock. (b) excludes (a). + lastLogSize int64 +} + +func (q *recordQueue) init() { + *q = recordQueue{ + buffer: make([]recordQueueEntry, initialBufferLen), + } +} + +// NB: externally synchronized, i.e., no concurrent push calls. +func (q *recordQueue) push( + p []byte, + opts SyncOptions, + latestLogSizeInWriteRecord int64, + latestWriterInWriteRecord *record.LogWriter, +) (index uint32, writer *record.LogWriter, lastLogSize int64) { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + n := int(h - t) + if len(q.buffer) == n { + // Full + m := 2 * n + newBuffer := make([]recordQueueEntry, m) + for i := int(t); i < int(h); i++ { + newBuffer[i%m] = q.buffer[i%n] + } + q.mu.Lock() + q.buffer = newBuffer + q.mu.Unlock() + } + q.mu.RLock() + q.buffer[h] = recordQueueEntry{ + p: p, + opts: opts, + } + // Reclaim memory for consumed entries. We couldn't do that in pop since + // multiple consumers are popping using CAS and that immediately transfers + // ownership to the producer. + for i := q.lastTailObservedByProducer; i < t; i++ { + q.buffer[i] = recordQueueEntry{} + } + q.lastTailObservedByProducer = t + q.headTail.Add(1 << headTailBits) + writer = q.writer + if writer == latestWriterInWriteRecord { + // WriteRecord has written to this writer since the switch. + q.lastLogSize = latestLogSizeInWriteRecord + } + // Else writer is a new writer that was switched to, so ignore the + // latestLogSizeInWriteRecord. + + lastLogSize = q.lastLogSize + q.mu.RUnlock() + return h, writer, lastLogSize +} + +func (q *recordQueue) length() int { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + return int(h - t) +} + +// Pops all entries. Must be called only after the last push returns. +func (q *recordQueue) popAll(err error) (numRecords int, numSyncsPopped int) { + ht := q.headTail.Load() + h, t := unpackHeadTail(ht) + n := int(h - t) + if n == 0 { + return 0, 0 + } + return n, q.pop(h-1, err, false) +} + +// Pops all entries up to and including index. The remaining queue is +// [index+1, head). +// +// NB: we could slightly simplify to only have the latest writer be able to +// pop. This would avoid the CAS below, but it seems better to reduce the +// amount of queued work regardless of who has successfully written it. +func (q *recordQueue) pop(index uint32, err error, runCb bool) (numSyncsPopped int) { + var buf [512]SyncOptions + // Tail can increase, and numEntriesToPop decrease, due to competition with + // other consumers. Head can increase due to the concurrent producer. + headTailEntriesToPop := func() (ht uint64, h uint32, t uint32, numEntriesToPop int) { + ht = q.headTail.Load() + h, t = unpackHeadTail(ht) + tail := int(t) + numEntriesToPop = int(index) - tail + 1 + return ht, h, t, numEntriesToPop + } + ht, head, tail, numEntriesToPop := headTailEntriesToPop() + if numEntriesToPop <= 0 { + return 0 + } + var b []SyncOptions + if numEntriesToPop <= len(buf) { + b = buf[:numEntriesToPop] + } else { + // Do allocation before acquiring the mutex. + b = make([]SyncOptions, numEntriesToPop) + } + q.mu.RLock() + n := len(q.buffer) + for i := 0; i < numEntriesToPop; i++ { + // Grab all the possible entries before doing CAS, since successful CAS + // will also release those buffer slots to the producer. + b[i] = q.buffer[(i+int(tail))%n].opts + } + // CAS, with retry loop, since this pop can race with other consumers. + for { + newHT := makeHeadTail(head, index+1) + if q.headTail.CompareAndSwap(ht, newHT) { + break + } + ht, head, _, numEntriesToPop = headTailEntriesToPop() + if numEntriesToPop <= 0 { + break + } + } + q.mu.RUnlock() + + // The current value of numEntriesToPop is the number of entries that were + // popped. + if numEntriesToPop <= 0 { + return 0 + } + bufLen := len(b) + // [0, bufLen-numEntriesToPop) were not popped, since this pop raced with + // other consumers that popped those entries. + for i := bufLen - numEntriesToPop; i < bufLen; i++ { + if b[i].Done != nil { + numSyncsPopped++ + if err != nil { + *b[i].Err = err + } + b[i].Done.Done() + } + } + return numSyncsPopped +} + +func (q *recordQueue) snapshotAndSwitchWriter( + writer *record.LogWriter, + snapshotFunc func(firstIndex uint32, entries []recordQueueEntry) (logSize int64), +) { + q.mu.Lock() + defer q.mu.Unlock() + q.writer = writer + h, t := unpackHeadTail(q.headTail.Load()) + n := h - t + if n > 0 { + m := uint32(len(q.buffer)) + b := make([]recordQueueEntry, n) + for i := t; i < h; i++ { + b[i-t] = q.buffer[i%m] + } + q.lastLogSize = snapshotFunc(t, b) + } +} + +// getLastIndex is used by failoverWriter.Close. +func (q *recordQueue) getLastIndex() (lastIndex int64) { + h, _ := unpackHeadTail(q.headTail.Load()) + return int64(h) - 1 +} + +const headTailBits = 32 + +func unpackHeadTail(ht uint64) (head, tail uint32) { + const mask = 1<> headTailBits) & mask) + tail = uint32(ht & mask) + return head, tail +} + +func makeHeadTail(head, tail uint32) uint64 { + return (uint64(head) << headTailBits) | uint64(tail) +} + +// Maximum number of physical log files when writing a virtual WAL. Arbitrarily +// chosen value. Setting this to 2 will not simplify the code. We make this a +// constant since we want a fixed size array for writer.writers. +const maxPhysicalLogs = 10 + +// failoverWriter is the implementation of Writer in failover mode. No Writer +// method blocks for IO, except for Close. +// +// Loosely speaking, Close blocks until all records are successfully written +// and synced to some log writer. Monitoring of log writer latency and errors +// continues after Close is called, which means failoverWriter can be switched +// to a new log writer after Close is called, to unblock Close. +// +// More precisely, Close does not block if there is an error in creating or +// closing the latest LogWriter when close was called. This is because errors +// are considered indicative of misconfiguration, and the user of +// failoverWriter can dampen switching when observing errors (e.g. see +// failoverMonitor), so close does not assume any liveness of calls to +// switchToNewDir when such errors occur. Since the caller (see db.go) treats +// an error on Writer.Close as fatal, this does mean that failoverWriter has +// limited ability to mask errors (its primary task is to mask high latency). +type failoverWriter struct { + opts failoverWriterOpts + q recordQueue + writers [maxPhysicalLogs]logWriterAndRecorder + mu struct { + sync.Mutex + // cond is signaled when the latest LogWriter is set in writers (or there + // is a creation error), or when the latest LogWriter is successfully + // closed. It is waited on in Close. We don't use channels and select + // since what Close is waiting on is dynamic based on the local state in + // Close, so using Cond is simpler. + cond *sync.Cond + // nextWriterIndex is advanced before creating the *LogWriter. That is, a + // slot is reserved by taking the current value of nextWriterIndex and + // incrementing it, and then the *LogWriter for that slot is created. When + // newFailoverWriter returns, nextWriterIndex = 1. + // + // The latest *LogWriter is (will be) at nextWriterIndex-1. + // + // INVARIANT: nextWriterIndex <= len(writers) + nextWriterIndex logNameIndex + closed bool + // metrics is initialized in Close. Currently we just use the metrics from + // the latest writer after it is closed, since in the common case with + // only one writer, that writer's flush loop will have finished and the + // metrics will be current. With multiple writers, these metrics can be + // quite inaccurate. The WriteThroughput metric includes an IdleDuration, + // which can be high for a writer that was switched away from, and + // therefore not indicative of overall work being done by the + // failoverWriter. The PendingBufferLen and SyncQueueLen are similarly + // inaccurate once there is no more work being given to a writer. We could + // add a method to LogWriter to stop sampling metrics when it is not the + // latest writer. Then we could aggregate all these metrics across all + // writers. + // + // Note that CockroachDB does not use these metrics in any meaningful way. + // + // TODO(sumeer): do the improved solution outlined above. + metrics record.LogWriterMetrics + } + // State for computing logical offset. The cumulative offset state is in + // offset. Each time we call SyncRecordGeneralized from WriteRecord, we + // compute the delta from the size returned by this LogWriter now, and the + // size returned by this LogWriter in the previous call to + // SyncRecordGeneralized. That previous call to SyncRecordGeneralized may + // have happened from WriteRecord, or asynchronously during a switch. So + // that previous call state requires synchronization and is maintained in + // recordQueue. The offset is incremented by this delta without any + // synchronization, since we rely on external synchronization (like the + // standaloneWriter). + logicalOffset struct { + latestWriterInWriteRecord *record.LogWriter + latestLogSizeInWriteRecord int64 + offset int64 + // Transitions once from false => true when there is a non-nil writer. + notEstimatedOffset bool + } + psiForWriteRecordBacking record.PendingSyncIndex + psiForSwitchBacking record.PendingSyncIndex +} + +type logWriterAndRecorder struct { + // This may never become non-nil, if when the LogWriter was finally created, + // it was no longer the latest writer. Additionally, if there was an error + // in creating the writer, w will remain nil and createError will be set. + w *record.LogWriter + // createError is set if there is an error creating the writer. This is + // useful in Close since we need to know when the work for creating the + // latest writer is done, whether it resulted in success or not. + createError error + r latencyAndErrorRecorder +} + +var _ Writer = &failoverWriter{} + +var _ switchableWriter = &failoverWriter{} + +type failoverWriterOpts struct { + wn NumWAL + logger base.Logger + + // Options that feed into SyncingFileOptions. + noSyncOnClose bool + bytesPerSync int + preallocateSize func() int + + // Options for record.LogWriter. + minSyncInterval func() time.Duration + fsyncLatency prometheus.Histogram + queueSemChan chan struct{} + stopper *stopper + + writerCreatedForTest chan<- struct{} +} + +func newFailoverWriter( + opts failoverWriterOpts, initialDir dirAndFileHandle, +) (*failoverWriter, error) { + ww := &failoverWriter{ + opts: opts, + } + ww.q.init() + ww.mu.cond = sync.NewCond(&ww.mu) + // The initial record.LogWriter creation also happens via a + // switchToNewWriter since we don't want it to block newFailoverWriter. + err := ww.switchToNewDir(initialDir) + if err != nil { + // Switching limit cannot be exceeded when creating. + panic(err) + } + return ww, nil +} + +// WriteRecord implements Writer. +func (ww *failoverWriter) WriteRecord(p []byte, opts SyncOptions) (logicalOffset int64, err error) { + recordIndex, writer, lastLogSize := ww.q.push( + p, opts, ww.logicalOffset.latestLogSizeInWriteRecord, ww.logicalOffset.latestWriterInWriteRecord) + if writer == nil { + // Don't have a record.LogWriter yet, so use an estimate. This estimate + // will get overwritten. + ww.logicalOffset.offset += int64(len(p)) + return ww.logicalOffset.offset, nil + } + // INVARIANT: writer != nil. + notEstimatedOffset := ww.logicalOffset.notEstimatedOffset + if !notEstimatedOffset { + ww.logicalOffset.notEstimatedOffset = true + } + ww.psiForWriteRecordBacking = record.PendingSyncIndex{Index: record.NoSyncIndex} + if opts.Done != nil { + ww.psiForWriteRecordBacking.Index = int64(recordIndex) + } + ww.logicalOffset.latestLogSizeInWriteRecord, err = writer.SyncRecordGeneralized(p, &ww.psiForWriteRecordBacking) + ww.logicalOffset.latestWriterInWriteRecord = writer + if notEstimatedOffset { + delta := ww.logicalOffset.latestLogSizeInWriteRecord - lastLogSize + ww.logicalOffset.offset += delta + } else { + // Overwrite the estimate. This is a best-effort improvement in that it is + // accurate for the common case where writer is the first LogWriter. + // Consider a failover scenario where there was no LogWriter for the first + // 10 records, so they are all accumulated as an estimate. Then the first + // LogWriter successfully writes and syncs the first 5 records and gets + // stuck. A switch happens to a second LogWriter that is handed the + // remaining 5 records, and the the 11th record arrives via a WriteRecord. + // The transition from !notEstimatedOffset to notEstimatedOffset will + // happen on this 11th record, and the logic here will use the length of + // the second LogWriter, that does not reflect the full length. + // + // TODO(sumeer): try to make this more correct, without adding much more + // complexity, and without adding synchronization. + ww.logicalOffset.offset = ww.logicalOffset.latestLogSizeInWriteRecord + } + return ww.logicalOffset.offset, err +} + +// switchToNewDir starts switching to dir. It implements switchableWriter. All +// work is async, and a non-nil error is returned only if the switching limit +// is exceeded. +func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error { + ww.mu.Lock() + // Can have a late switchToNewDir call is the failoverMonitor has not yet + // been told that the writer is closed. Ignore. + if ww.mu.closed { + ww.mu.Unlock() + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return nil + } + // writerIndex is the slot for this writer. + writerIndex := ww.mu.nextWriterIndex + if int(writerIndex) == len(ww.writers) { + ww.mu.Unlock() + return errors.Errorf("exceeded switching limit") + } + ww.mu.nextWriterIndex++ + ww.mu.Unlock() + + // Creation is async. + ww.opts.stopper.runAsync(func() { + // TODO(sumeer): recycling of logs. + filename := dir.FS.PathJoin(dir.Dirname, makeLogFilename(ww.opts.wn, writerIndex)) + recorderAndWriter := &ww.writers[writerIndex].r + var file vfs.File + // handleErrFunc is called when err != nil. It handles the multiple IO error + // cases below. + handleErrFunc := func(err error) { + if file != nil { + file.Close() + } + ww.mu.Lock() + defer ww.mu.Unlock() + ww.writers[writerIndex].createError = err + ww.mu.cond.Signal() + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + } + var err error + // Create file. + recorderAndWriter.writeStart() + file, err = dir.FS.Create(filename) + recorderAndWriter.writeEnd(err) + // TODO(sumeer): should we fatal if primary dir? At some point it is better + // to fatal instead of continuing to failover. + // base.MustExist(dir.FS, filename, ww.opts.logger, err) + if err != nil { + handleErrFunc(err) + return + } + // Sync dir. + recorderAndWriter.writeStart() + err = dir.Sync() + recorderAndWriter.writeEnd(err) + if err != nil { + handleErrFunc(err) + return + } + // Wrap in a syncingFile. + syncingFile := vfs.NewSyncingFile(file, vfs.SyncingFileOptions{ + NoSyncOnClose: ww.opts.noSyncOnClose, + BytesPerSync: ww.opts.bytesPerSync, + PreallocateSize: ww.opts.preallocateSize(), + }) + // Wrap in the latencyAndErrorRecorder. + recorderAndWriter.setWriter(syncingFile) + + // Using NumWAL as the DiskFileNum is fine since it is used only as + // EOF trailer for safe log recycling. Even though many log files can + // map to a single NumWAL, a file used for NumWAL n at index m will + // never get recycled for NumWAL n at a later index (since recycling + // happens when n as a whole is obsolete). + w := record.NewLogWriter(recorderAndWriter, base.DiskFileNum(ww.opts.wn), + record.LogWriterConfig{ + WALMinSyncInterval: ww.opts.minSyncInterval, + WALFsyncLatency: ww.opts.fsyncLatency, + QueueSemChan: ww.opts.queueSemChan, + ExternalSyncQueueCallback: ww.doneSyncCallback, + }) + closeWriter := func() bool { + ww.mu.Lock() + defer ww.mu.Unlock() + if writerIndex+1 != ww.mu.nextWriterIndex || ww.mu.closed { + // Not the latest writer or the writer was closed while this async + // creation was ongoing. + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return true + } + // Latest writer. + ww.writers[writerIndex].w = w + ww.mu.cond.Signal() + // NB: snapshotAndSwitchWriter does not block on IO, since + // SyncRecordGeneralized does no IO. + ww.q.snapshotAndSwitchWriter(w, + func(firstIndex uint32, entries []recordQueueEntry) (logSize int64) { + for i := range entries { + ww.psiForSwitchBacking = record.PendingSyncIndex{Index: record.NoSyncIndex} + if entries[i].opts.Done != nil { + ww.psiForSwitchBacking.Index = int64(firstIndex) + int64(i) + } + var err error + logSize, err = w.SyncRecordGeneralized(entries[i].p, &ww.psiForSwitchBacking) + if err != nil { + // TODO(sumeer): log periodically. The err will also surface via + // the latencyAndErrorRecorder, so if a switch is possible, it + // will be done. + ww.opts.logger.Errorf("%s", err) + } + } + return logSize + }) + if ww.opts.writerCreatedForTest != nil { + ww.opts.writerCreatedForTest <- struct{}{} + } + return false + }() + if closeWriter { + // Never wrote anything to this writer so don't care about the + // returned error. + ww.opts.stopper.runAsync(func() { + _ = w.Close() + }) + } + }) + return nil +} + +// doneSyncCallback is the record.ExternalSyncQueueCallback called by +// record.LogWriter. +// +// recordQueue is popped from only when some work requests a sync (and +// successfully syncs). In the worst case, if no syncs are requested, we could +// queue all the records needed to fill up a memtable in the recordQueue. This +// can have two negative effects: (a) in the case of failover, we need to +// replay all the data in the current mutable memtable, which takes more time, +// (b) the memory usage is proportional to the size of the memtable. We ignore +// these negatives since, (a) users like CockroachDB regularly sync, and (b) +// the default memtable size is only 64MB. +func (ww *failoverWriter) doneSyncCallback(doneSync record.PendingSyncIndex, err error) { + if err != nil { + // Don't pop anything since we can retry after switching to a new + // LogWriter. + return + } + // NB: harmless after Close returns since numSyncsPopped will be 0. + numSyncsPopped := ww.q.pop(uint32(doneSync.Index), err, true) + if ww.opts.queueSemChan != nil { + for i := 0; i < numSyncsPopped; i++ { + <-ww.opts.queueSemChan + } + } +} + +// ongoingLatencyOrErrorForCurDir implements switchableWriter. +func (ww *failoverWriter) ongoingLatencyOrErrorForCurDir() (time.Duration, error) { + ww.mu.Lock() + defer ww.mu.Unlock() + if ww.mu.closed { + return 0, nil + } + return ww.writers[ww.mu.nextWriterIndex-1].r.ongoingLatencyOrError() +} + +// Close implements Writer. +// +// NB: getOngoingLatencyOrErrorForLatestWriter and switchToNewDir can be +// called after Close is called, and there is also a possibility that they get +// called after Close returns and before failoverMonitor knows that the +// failoverWriter is closed. +// +// doneSyncCallback can be called anytime after Close returns since there +// could be stuck writes that finish arbitrarily later. +// +// See the long comment about Close behavior where failoverWriter is declared. +func (ww *failoverWriter) Close() (logicalOffset int64, err error) { + logicalOffset = ww.logicalOffset.offset + // [0, closeCalledCount) have had LogWriter.Close called (though may not + // have finished) or the LogWriter will never be non-nil. Either way, they + // have been "processed". + closeCalledCount := logNameIndex(0) + // lastWriterState is the state for the last writer, for which we are + // waiting for LogWriter.Close to finish or for creation to be unsuccessful. + // What is considered the last writer can change. All state is protected by + // ww.mu. + type lastWriterState struct { + index logNameIndex + closed bool + err error + metrics record.LogWriterMetrics + } + var lastWriter lastWriterState + lastRecordIndex := record.PendingSyncIndex{Index: ww.q.getLastIndex()} + ww.mu.Lock() + defer ww.mu.Unlock() + // Every iteration starts and ends with the mutex held. + // + // Invariant: ww.mu.nextWriterIndex >= 1. + // + // We will loop until we have closed the lastWriter (and use + // lastPossibleWriter.err). We also need to call close on all LogWriters + // that will not close themselves, i.e., those that have already been + // created and installed in failoverWriter.writers (this set may change + // while failoverWriter.Close runs). + for !lastWriter.closed { + numWriters := ww.mu.nextWriterIndex + if numWriters > closeCalledCount { + // INVARIANT: numWriters > closeCalledCount. + lastWriter = lastWriterState{ + index: numWriters - 1, + } + // Try to process [closeCalledCount, numWriters). Will surely process + // [closeCalledCount, numWriters-1), since those writers are either done + // initializing, or will close themselves. The writer at numWriters-1 we + // can only process if it is done initializing, else we will iterate + // again. + for i := closeCalledCount; i < numWriters; i++ { + w := ww.writers[i].w + cErr := ww.writers[i].createError + // Is the current index the last writer. If yes, this is also the last + // loop iteration. + isLastWriter := i == lastWriter.index + if w != nil { + // Can close it, so extend closeCalledCount. + closeCalledCount = i + 1 + if isLastWriter { + // We may care about its error and when it finishes closing. + index := i + ww.opts.stopper.runAsync(func() { + // Last writer(s) (since new writers can be created and become + // last, as we iterate) are guaranteed to have seen the last + // record (since it was queued before Close was called). It is + // possible that a writer got created after the last record was + // dequeued and before this fact was realized by Close. In that + // case we will harmlessly tell it that it synced that last + // record, though it has already been written and synced by + // another writer. + err := w.CloseWithLastQueuedRecord(lastRecordIndex) + ww.mu.Lock() + defer ww.mu.Unlock() + if lastWriter.index == index { + lastWriter.closed = true + lastWriter.err = err + lastWriter.metrics = w.Metrics() + ww.mu.cond.Signal() + } + }) + } else { + // Don't care about the returned error since all the records we + // relied on this writer for were already successfully written. + ww.opts.stopper.runAsync(func() { + _ = w.CloseWithLastQueuedRecord(record.PendingSyncIndex{Index: record.NoSyncIndex}) + }) + } + } else if cErr != nil { + // Have processed it, so extend closeCalledCount. + closeCalledCount = i + 1 + if isLastWriter { + lastWriter.closed = true + lastWriter.err = cErr + lastWriter.metrics = record.LogWriterMetrics{} + } + // Else, ignore. + } else { + if !isLastWriter { + // Not last writer, so will close itself. + closeCalledCount = i + 1 + } + // Else, last writer, so we may have to close it. + } + } + } + if !lastWriter.closed { + // Either waiting for creation of last writer or waiting for the close + // to finish, or something else to become the last writer. + ww.mu.cond.Wait() + } + } + err = lastWriter.err + ww.mu.metrics = lastWriter.metrics + ww.mu.closed = true + _, _ = ww.q.popAll(err) + return logicalOffset, err +} + +// Metrics implements writer. +func (ww *failoverWriter) Metrics() record.LogWriterMetrics { + ww.mu.Lock() + defer ww.mu.Unlock() + return ww.mu.metrics +} + +// latencyAndErrorRecorder records ongoing write and sync operations and errors +// in those operations. record.LogWriter cannot continue functioning after any +// error, so all errors are considered permanent. +// +// writeStart/writeEnd are used directly when creating a file. After the file +// is successfully created, setWriter turns latencyAndErrorRecorder into an +// implementation of writerSyncerCloser that will record for the Write and +// Sync methods. +type latencyAndErrorRecorder struct { + ongoingOperationStart atomic.Int64 + error atomic.Pointer[error] + writerSyncerCloser +} + +type writerSyncerCloser interface { + io.Writer + io.Closer + Sync() error +} + +func (r *latencyAndErrorRecorder) writeStart() { + r.ongoingOperationStart.Store(time.Now().UnixNano()) +} + +func (r *latencyAndErrorRecorder) writeEnd(err error) { + if err != nil { + ptr := &err + r.error.Store(ptr) + } + r.ongoingOperationStart.Store(0) +} + +func (r *latencyAndErrorRecorder) setWriter(w writerSyncerCloser) { + r.writerSyncerCloser = w +} + +func (r *latencyAndErrorRecorder) ongoingLatencyOrError() (time.Duration, error) { + startTime := r.ongoingOperationStart.Load() + var latency time.Duration + if startTime != 0 { + l := time.Now().UnixNano() - startTime + if l < 0 { + l = 0 + } + latency = time.Duration(l) + } + errPtr := r.error.Load() + var err error + if errPtr != nil { + err = *errPtr + } + return latency, err +} + +// Sync implements writerSyncerCloser. +func (r *latencyAndErrorRecorder) Sync() error { + r.writeStart() + err := r.writerSyncerCloser.Sync() + r.writeEnd(err) + return err +} + +// Write implements io.Writer. +func (r *latencyAndErrorRecorder) Write(p []byte) (n int, err error) { + r.writeStart() + n, err = r.writerSyncerCloser.Write(p) + r.writeEnd(err) + return n, err +} diff --git a/wal/failover_writer_test.go b/wal/failover_writer_test.go new file mode 100644 index 0000000000..3c9ff28bc8 --- /dev/null +++ b/wal/failover_writer_test.go @@ -0,0 +1,662 @@ +package wal + +import ( + "flag" + "fmt" + "io" + "math" + "math/rand" + "slices" + "strings" + "sync" + "testing" + "time" + + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/record" + "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/pebble/vfs/errorfs" + "github.com/stretchr/testify/require" +) + +type closeKind uint8 + +const ( + closeSync closeKind = iota + closeAsync + waitForCloseToFinish + noneOfTheAbove +) + +func TestFailoverWriter(t *testing.T) { + datadriven.Walk(t, "testdata/failover_writer", func(t *testing.T, path string) { + memFS := vfs.NewStrictMem() + dirs := [numDirIndices]dirAndFileHandle{ + {Dir: Dir{Dirname: "pri"}}, + {Dir: Dir{Dirname: "sec"}}, + } + var testDirs [numDirIndices]dirAndFileHandle + for i, dir := range dirs { + require.NoError(t, memFS.MkdirAll(dir.Dirname, 0755)) + f, err := memFS.OpenDir("") + require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + testDirs[i].Dir = dir.Dir + } + setDirsFunc := func(t *testing.T, fs vfs.FS, dirs *[numDirIndices]dirAndFileHandle) { + for i := range *dirs { + f := (*dirs)[i].File + if f != nil { + _ = f.Close() + } + (*dirs)[i].FS = fs + f, err := fs.OpenDir((*dirs)[i].Dirname) + require.NoError(t, err) + (*dirs)[i].File = f + } + } + setDirsFunc(t, memFS, &dirs) + setDirsFunc(t, memFS, &testDirs) + dirIndex := 0 + + printLogFiles := func(b *strings.Builder, num NumWAL) { + memFS.ResetToSyncedState() + type filenameAndFS struct { + name string + fs vfs.FS + } + var filenames []filenameAndFS + prefix := base.DiskFileNum(num).String() + for i := range dirs { + fns, err := dirs[i].FS.List(dirs[i].Dirname) + require.NoError(t, err) + for _, fn := range fns { + if strings.HasPrefix(fn, prefix) { + filenames = append(filenames, filenameAndFS{ + name: dirs[i].FS.PathJoin(dirs[i].Dirname, fn), + fs: dirs[i].FS, + }) + } + } + } + slices.SortFunc(filenames, func(a, b filenameAndFS) int { + return strings.Compare(a.name, b.name) + }) + if len(filenames) > 0 { + fmt.Fprintf(b, "log files:\n") + } + for _, fn := range filenames { + fmt.Fprintf(b, " %s\n", fn.name) + func() { + f, err := fn.fs.Open(fn.name) + require.NoError(t, err) + defer f.Close() + rr := record.NewReader(f, base.DiskFileNum(num)) + for { + offset := rr.Offset() + r, err := rr.Next() + if err == nil { + var bb strings.Builder + _, err = io.Copy(&bb, r) + if err == nil { + fmt.Fprintf(b, " %d: %s\n", offset, bb.String()) + } + } + if err != nil { + fmt.Fprintf(b, " %s\n", err.Error()) + break + } + } + }() + } + } + var w *failoverWriter + waitForQueueLen := func(t *testing.T, qlen int) { + for { + n := w.q.length() + require.LessOrEqual(t, qlen, n) + if qlen != n { + time.Sleep(10 * time.Millisecond) + } else { + return + } + } + } + checkLogWriters := func(t *testing.T, b *strings.Builder) { + if w == nil { + return + } + fmt.Fprintf(b, "log writers:\n") + for i := logNameIndex(0); i < w.mu.nextWriterIndex; i++ { + rLatency, rErr := w.writers[i].r.ongoingLatencyOrError() + require.Equal(t, time.Duration(0), rLatency) + if w.writers[i].createError != nil { + require.Equal(t, rErr, w.writers[i].createError) + } + errStr := "no error" + if rErr != nil { + errStr = rErr.Error() + } + fmt.Fprintf(b, " writer %d: %s\n", i, errStr) + } + } + var nextWALNum NumWAL + queueSemChanCap := 100 + queueSemChan := make(chan struct{}, queueSemChanCap) + countSem := func() int { + return queueSemChanCap - len(queueSemChan) + } + var stopper *stopper + var logWriterCreated chan struct{} + var syncs []SyncOptions + resetStateAfterClose := func(t *testing.T) { + done := false + for !done { + select { + case <-queueSemChan: + default: + done = true + } + } + syncs = nil + w = nil + dirIndex = 0 + setDirsFunc(t, memFS, &testDirs) + } + var ( + closeSemCount int + closeErr error + closeWG *sync.WaitGroup + closeOffset int64 + ) + datadriven.RunTest(t, path, + func(t *testing.T, td *datadriven.TestData) string { + closeFunc := func(closeKind closeKind, stopGoroutines bool) string { + if closeKind != waitForCloseToFinish { + closeSemCount = queueSemChanCap + closeErr = nil + closeWG = nil + closeOffset = 0 + } + if td.HasArg("sem-count") { + td.ScanArgs(t, "sem-count", &closeSemCount) + } + if closeKind == waitForCloseToFinish { + closeWG.Wait() + } else if closeKind == closeAsync { + closeWG = &sync.WaitGroup{} + closeWG.Add(1) + go func() { + closeOffset, closeErr = w.Close() + closeWG.Done() + }() + return "" + } else if closeKind == closeSync { + closeOffset, closeErr = w.Close() + } + var b strings.Builder + if closeKind != noneOfTheAbove { + // Print the close error and the record dispositions. + errStr := "ok" + if closeErr != nil { + errStr = closeErr.Error() + } + fmt.Fprintf(&b, "close: %s, offset: %d\n", errStr, closeOffset) + if len(syncs) > 0 { + fmt.Fprintf(&b, "records:\n") + } + for i := range syncs { + infoStr := "no sync" + if syncs[i].Done != nil { + infoStr = "synced" + // Should already be done. + syncs[i].Done.Wait() + err := *syncs[i].Err + if err != nil { + infoStr = fmt.Sprintf("sync error %s", err.Error()) + } + } + fmt.Fprintf(&b, " record %d: %s\n", i, infoStr) + } + metrics := w.Metrics() + fmt.Fprintf(&b, "write bytes metric: %d\n", metrics.WriteThroughput.Bytes) + if metrics.WriteThroughput.Bytes > 0 { + require.Less(t, time.Duration(0), metrics.WriteThroughput.WorkDuration) + } + } + if stopGoroutines { + // We expect the Close to complete without stopping all the + // goroutines. But for deterministic log file output we stop all + // goroutines. + stopper.stop() + printLogFiles(&b, nextWALNum-1) + checkLogWriters(t, &b) + require.Equal(t, closeSemCount, countSem()) + resetStateAfterClose(t) + } + return b.String() + } + createWriter := func(noWaitForLogWriterCreation bool) { + wn := nextWALNum + nextWALNum++ + var err error + stopper = newStopper() + logWriterCreated = make(chan struct{}, 100) + w, err = newFailoverWriter(failoverWriterOpts{ + wn: wn, + preallocateSize: func() int { return 0 }, + queueSemChan: queueSemChan, + stopper: stopper, + writerCreatedForTest: logWriterCreated, + }, testDirs[dirIndex]) + require.NoError(t, err) + if !noWaitForLogWriterCreation { + <-logWriterCreated + } + } + switch td.Cmd { + case "init": + var injs []errorfs.Injector + var noWriter bool + for _, cmdArg := range td.CmdArgs { + switch cmdArg.Key { + case "inject-errors": + if len(injs) != 0 { + return "duplicate inject-errors" + } + injs = make([]errorfs.Injector, len(cmdArg.Vals)) + for i := 0; i < len(cmdArg.Vals); i++ { + inj, err := errorfs.ParseDSL(cmdArg.Vals[i]) + if err != nil { + return fmt.Sprintf("%s: %s", cmdArg.Vals[i], err.Error()) + } + injs[i] = inj + } + case "no-writer": + noWriter = true + default: + return fmt.Sprintf("unknown arg %s", cmdArg.Key) + } + } + fs := vfs.FS(memFS) + if len(injs) != 0 { + fs = errorfs.Wrap(memFS, errorfs.Any(injs...)) + } + fs = newBlockingFS(fs) + setDirsFunc(t, fs, &testDirs) + if !noWriter { + createWriter(false) + } + return "" + + case "create-writer-after-init": + noWaitForLogWriterCreation := false + if td.HasArg("no-wait") { + noWaitForLogWriterCreation = true + } + createWriter(noWaitForLogWriterCreation) + return "" + + case "write": + var synco SyncOptions + var doSync bool + td.ScanArgs(t, "sync", &doSync) + if doSync { + wg := &sync.WaitGroup{} + wg.Add(1) + synco = SyncOptions{ + Done: wg, + Err: new(error), + } + queueSemChan <- struct{}{} + } + syncs = append(syncs, synco) + var value string + td.ScanArgs(t, "value", &value) + offset, err := w.WriteRecord([]byte(value), synco) + require.NoError(t, err) + // The offset can be non-deterministic depending on which LogWriter + // is being written to, so print it only when requested. + if td.HasArg("print-offset") { + return fmt.Sprintf("offset: %d\n", offset) + } + return "" + + case "wait-for-queue": + var qlen int + td.ScanArgs(t, "length", &qlen) + waitForQueueLen(t, qlen) + return "" + + case "switch": + noWaitForLogWriterCreation := false + if td.HasArg("no-wait") { + noWaitForLogWriterCreation = true + } + dirIndex = (dirIndex + 1) % 2 + err := w.switchToNewDir(testDirs[dirIndex]) + if err == nil { + if !noWaitForLogWriterCreation { + <-logWriterCreated + } + return "ok" + } + return err.Error() + + case "close": + return closeFunc(closeSync, true) + + case "close-async": + return closeFunc(closeAsync, false) + + case "ongoing-latency": + var index int + td.ScanArgs(t, "writer-index", &index) + for i := 0; i < 10; i++ { + time.Sleep(time.Duration(i+1) * time.Millisecond) + d, _ := w.writers[index].r.ongoingLatencyOrError() + if d > 0 { + return "found ongoing" + } + } + return "no ongoing" + + case "wait-for-close": + stopGoroutines := true + if td.HasArg("do-not-stop-goroutines") { + stopGoroutines = false + } + return closeFunc(waitForCloseToFinish, stopGoroutines) + + case "stop-goroutines-after-close": + return closeFunc(noneOfTheAbove, true) + + case "blocking-conf": + var filename string + td.ScanArgs(t, "filename", &filename) + var conf blockingConf + if td.HasArg("create") { + conf |= blockingCreate + } + if td.HasArg("write") { + conf |= blockingWrite + } + if td.HasArg("sync") { + conf |= blockingSync + } + if td.HasArg("close") { + conf |= blockingClose + } + if td.HasArg("open-dir") { + conf |= blockingOpenDir + } + testDirs[0].FS.(*blockingFS).setConf(filename, conf) + return fmt.Sprintf("%s: 0b%b", filename, uint8(conf)) + + case "wait-for-and-unblock": + var filename string + td.ScanArgs(t, "filename", &filename) + testDirs[0].FS.(*blockingFS).waitForBlockAndUnblock(filename) + return "" + + case "sleep": + time.Sleep(time.Millisecond) + return "" + + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) + }) +} + +type blockingFS struct { + vfs.FS + mu struct { + sync.Mutex + conf map[string]confAndState + } +} + +type blockingConf uint32 + +type confAndState struct { + blockingConf + block chan struct{} +} + +const ( + blockingCreate blockingConf = 1 + blockingWrite blockingConf = 1 << 1 + blockingSync blockingConf = 1 << 2 + blockingClose blockingConf = 1 << 3 + blockingOpenDir blockingConf = 1 << 4 + blockingAll blockingConf = math.MaxUint32 +) + +var _ vfs.FS = &blockingFS{} + +func newBlockingFS(fs vfs.FS) *blockingFS { + bfs := &blockingFS{ + FS: fs, + } + bfs.mu.conf = make(map[string]confAndState) + return bfs +} + +func (fs *blockingFS) setConf(baseFilename string, conf blockingConf) { + fs.mu.Lock() + defer fs.mu.Unlock() + cs, ok := fs.mu.conf[baseFilename] + if ok { + close(cs.block) + } + if conf == 0 { + delete(fs.mu.conf, baseFilename) + return + } + fs.mu.conf[baseFilename] = confAndState{ + blockingConf: conf, + block: make(chan struct{}), + } +} + +func (fs *blockingFS) waitForBlockAndUnblock(baseFilename string) { + fs.mu.Lock() + cs, ok := fs.mu.conf[baseFilename] + if !ok { + panic(errors.AssertionFailedf("no conf for %s", baseFilename)) + } + fs.mu.Unlock() + cs.block <- struct{}{} +} + +func (fs *blockingFS) maybeBlock(baseFilename string, op blockingConf) { + fs.mu.Lock() + cs, ok := fs.mu.conf[baseFilename] + fs.mu.Unlock() + if ok && cs.blockingConf&op != 0 { + <-cs.block + } +} + +func (fs *blockingFS) Create(name string) (vfs.File, error) { + baseFilename := fs.FS.PathBase(name) + fs.maybeBlock(baseFilename, blockingCreate) + f, err := fs.FS.Create(name) + if err != nil { + return nil, err + } + return &blockingFile{baseFilename: baseFilename, File: f, fs: fs}, nil +} + +func (fs *blockingFS) OpenDir(name string) (vfs.File, error) { + baseFilename := fs.FS.PathBase(name) + fs.maybeBlock(baseFilename, blockingOpenDir) + f, err := fs.FS.OpenDir(name) + if err != nil { + return nil, err + } + return &blockingFile{baseFilename: baseFilename, File: f, fs: fs}, nil +} + +type blockingFile struct { + baseFilename string + vfs.File + fs *blockingFS +} + +var _ vfs.File = blockingFile{} + +func (f blockingFile) Write(p []byte) (n int, err error) { + f.fs.maybeBlock(f.baseFilename, blockingWrite) + return f.File.Write(p) +} + +func (f blockingFile) Sync() error { + f.fs.maybeBlock(f.baseFilename, blockingSync) + return f.File.Sync() +} + +func (f blockingFile) SyncData() error { + f.fs.maybeBlock(f.baseFilename, blockingSync) + return f.File.SyncData() +} + +func (f blockingFile) Close() error { + f.fs.maybeBlock(f.baseFilename, blockingClose) + return f.File.Close() +} + +// TestConcurrentWritersWithManyRecords tests (a) resizing of the recordQueue, +// (b) resizing of the SyncOptions in recordQueue.pop, (c) competition to pop +// with CAS failure, and resulting retries. (c) is observable in this test by +// adding print statements in recordQueue.pop. +func TestConcurrentWritersWithManyRecords(t *testing.T) { + seed := *seed + if seed == 0 { + seed = time.Now().UnixNano() + t.Logf("seed: %d", seed) + } + rng := rand.New(rand.NewSource(seed)) + records := make([][]byte, 20<<10) + recordsMap := map[string]int{} + for i := range records { + records[i] = make([]byte, 50+rng.Intn(100)) + for { + randStr(records[i], rng) + if _, ok := recordsMap[string(records[i])]; ok { + continue + } else { + recordsMap[string(records[i])] = i + break + } + } + } + const numLogWriters = 4 + memFS := vfs.NewStrictMem() + dirs := [numDirIndices]dirAndFileHandle{{Dir: Dir{Dirname: "pri"}}, {Dir: Dir{Dirname: "sec"}}} + for _, dir := range dirs { + require.NoError(t, memFS.MkdirAll(dir.Dirname, 0755)) + f, err := memFS.OpenDir("") + require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) + } + bFS := newBlockingFS(memFS) + for i := range dirs { + dirs[i].FS = bFS + f, err := bFS.OpenDir(dirs[i].Dirname) + require.NoError(t, err) + dirs[i].File = f + } + for i := 0; i < numLogWriters; i++ { + bFS.setConf(makeLogFilename(0, logNameIndex(i)), blockingWrite) + } + stopper := newStopper() + logWriterCreated := make(chan struct{}, 100) + queueSemChan := make(chan struct{}, len(records)) + dirIndex := 0 + ww, err := newFailoverWriter(failoverWriterOpts{ + wn: 0, + preallocateSize: func() int { return 0 }, + queueSemChan: queueSemChan, + stopper: stopper, + writerCreatedForTest: logWriterCreated, + }, dirs[dirIndex]) + require.NoError(t, err) + wg := &sync.WaitGroup{} + switchInterval := len(records) / 4 + for i := 0; i < len(records); i++ { + queueSemChan <- struct{}{} + wg.Add(1) + synco := SyncOptions{Done: wg, Err: new(error)} + _, err := ww.WriteRecord(records[i], synco) + require.NoError(t, err) + if i > 0 && i%switchInterval == 0 { + dirIndex = (dirIndex + 1) % 2 + ww.switchToNewDir(dirs[dirIndex]) + } + } + time.Sleep(5 * time.Millisecond) + for i := 0; i < numLogWriters; i++ { + bFS.setConf(makeLogFilename(0, logNameIndex(i)), 0) + } + _, err = ww.Close() + require.NoError(t, err) + wg.Wait() + require.Equal(t, 0, len(queueSemChan)) + type indexInterval struct { + first, last int + } + for i := 0; i < numLogWriters; i++ { + func() { + f, err := memFS.Open(memFS.PathJoin(dirs[i%2].Dirname, makeLogFilename(0, logNameIndex(i)))) + if err != nil { + t.Logf("file %d: %s", i, err.Error()) + return + } + defer f.Close() + rr := record.NewReader(f, base.DiskFileNum(0)) + interval := indexInterval{} + for { + r, err := rr.Next() + if err != nil { + require.Equal(t, io.EOF, err) + break + } + var bb strings.Builder + _, err = io.Copy(&bb, r) + require.NoError(t, err) + index, ok := recordsMap[bb.String()] + require.True(t, ok) + if interval.first == interval.last { + interval.first = index + interval.last = index + 1 + } else { + require.Equal(t, interval.last, index) + interval.last++ + } + } + require.Equal(t, 0, interval.first) + if i == numLogWriters-1 { + require.Equal(t, len(records), interval.last) + } + }() + } +} + +var seed = flag.Int64("seed", 0, "a pseudorandom number generator seed") + +func randStr(fill []byte, rng *rand.Rand) { + const letters = "abcdefghijklmnopqrstuvwxyz" + const lettersLen = len(letters) + for i := 0; i < len(fill); i++ { + fill[i] = letters[rng.Intn(lettersLen)] + } +} + +// TODO(sumeer): randomized error injection and delay injection test. diff --git a/wal/standalone_manager.go b/wal/standalone_manager.go index 475fe00013..a17e90f6be 100644 --- a/wal/standalone_manager.go +++ b/wal/standalone_manager.go @@ -359,6 +359,6 @@ func (w *standaloneWriter) Close() (logicalOffset int64, err error) { } // Metrics implements Writer. -func (w *standaloneWriter) Metrics() *record.LogWriterMetrics { +func (w *standaloneWriter) Metrics() record.LogWriterMetrics { return w.w.Metrics() } diff --git a/wal/testdata/failover_writer/blocking b/wal/testdata/failover_writer/blocking new file mode 100644 index 0000000000..da31f18f9a --- /dev/null +++ b/wal/testdata/failover_writer/blocking @@ -0,0 +1,358 @@ +init +---- + +blocking-conf filename=000000.log sync +---- +000000.log: 0b100 + +write sync=true value=woolly +---- + +close-async +---- + +ongoing-latency writer-index=0 +---- +found ongoing + +wait-for-and-unblock filename=000000.log +---- + +blocking-conf filename=000000.log +---- +000000.log: 0b0 + +wait-for-close +---- +close: ok, offset: 17 +records: + record 0: synced +write bytes metric: 28 +log files: + pri/000000.log + 0: woolly + EOF +log writers: + writer 0: no error + +# More complex blocking. +init +---- + +# Sync is blocked on first log file. +blocking-conf filename=000001.log sync +---- +000001.log: 0b100 + +# Write wants a sync. +write sync=true value=woolly +---- + +# Wait until log writer is blocked on sync and indicating ongoing latency. +ongoing-latency writer-index=0 +---- +found ongoing + +# Unblock the sync. +wait-for-and-unblock filename=000001.log +---- + +# Queue length drops to 0. +wait-for-queue length=0 +---- + +# Write is blocked on first log file. +blocking-conf filename=000001.log write +---- +000001.log: 0b10 + +# Write does not want a sync, but the file write is blocked. +write sync=false value=mammoth print-offset +---- +offset: 35 + +# See if first log writer is blocked on write and indicating ongoing latency. +# Because we did not request a sync, the log writer is not trying to do a +# write. But the record is in the log writer's buffer. +ongoing-latency writer-index=0 +---- +no ongoing + +# Block writes on the second log file too, which we haven't created yet. +blocking-conf filename=000001-001.log write +---- +000001-001.log: 0b10 + +# Switch to second log file. +switch +---- +ok + +# Ensure second log writer is blocked on write and indicating ongoing latency. +ongoing-latency writer-index=1 +---- +found ongoing + +# Close can complete when second log writer writes the second record, but it +# is blocked. +close-async +---- + +# Unblock writes on second log file +wait-for-and-unblock filename=000001-001.log +---- + +blocking-conf filename=000001-001.log +---- +000001-001.log: 0b0 + +# Queue length drops to 0. +wait-for-queue length=0 +---- + +# Ensure close succeeds. First writer is still blocked. +wait-for-close do-not-stop-goroutines +---- +close: ok, offset: 35 +records: + record 0: synced + record 1: no sync +write bytes metric: 29 + +# Do a noop switch. +switch +---- +ok + +# First log writer is still trying to close, but blocked on the write. +ongoing-latency writer-index=0 +---- +found ongoing + +# Unblock first log writer. +wait-for-and-unblock filename=000001.log +---- + +blocking-conf filename=000001.log +---- +000001.log: 0b0 + +# Everyone is unblocked, so we can stop and wait for all goroutines to stop. +stop-goroutines-after-close +---- +log files: + pri/000001.log + 0: woolly + 17: mammoth + EOF + sec/000001-001.log + 0: mammoth + EOF +log writers: + writer 0: no error + writer 1: no error + +# Block the creation of the writer. Initial offsets will be estimates equal to +# the length of the records. +init no-writer +---- + +blocking-conf filename=000002.log create +---- +000002.log: 0b1 + +create-writer-after-init no-wait +---- + +write sync=true value=woolly print-offset +---- +offset: 6 + +write sync=true value=sheep print-offset +---- +offset: 11 + +wait-for-and-unblock filename=000002.log +---- + +blocking-conf filename=000002.log +---- +000002.log: 0b0 + +wait-for-queue length=0 +---- + +# Offset is now accurate, accounting for all three records. +write sync=false value=yak print-offset +---- +offset: 47 + +close +---- +close: ok, offset: 47 +records: + record 0: synced + record 1: synced + record 2: no sync +write bytes metric: 58 +log files: + pri/000002.log + 0: woolly + 17: sheep + 33: yak + EOF +log writers: + writer 0: no error + +# Two writers. Second writer is blocked on creation when close is called, so +# close has to iterate. Also, first writer gets created after second writer +# creation starts, so closes itself. +init no-writer +---- + +blocking-conf filename=000003.log create +---- +000003.log: 0b1 + +create-writer-after-init no-wait +---- + +blocking-conf filename=000003-001.log create +---- +000003-001.log: 0b1 + +switch no-wait +---- +ok + +write sync=true value=woolly print-offset +---- +offset: 6 + +ongoing-latency writer-index=0 +---- +found ongoing + +ongoing-latency writer-index=1 +---- +found ongoing + +wait-for-and-unblock filename=000003.log +---- + +ongoing-latency writer-index=1 +---- +found ongoing + +close-async +---- + +sleep +---- + +wait-for-and-unblock filename=000003-001.log +---- + +wait-for-close +---- +close: ok, offset: 6 +records: + record 0: synced +write bytes metric: 28 +log files: + pri/000003.log + EOF + sec/000003-001.log + 0: woolly + EOF +log writers: + writer 0: no error + writer 1: no error + +# Two writers. Second writer is has already been created when close is called, +# but first writer has not been created. So first writer will close itself. +# Have to iterate in close for second writer to close. +init no-writer +---- + +blocking-conf filename=000004.log create +---- +000004.log: 0b1 + +create-writer-after-init no-wait +---- + +blocking-conf filename=000004-001.log write +---- +000004-001.log: 0b10 + +# Second log writer is created. +switch +---- +ok + +write sync=true value=mammoth print-offset +---- +offset: 18 + +# Waiting in create. +ongoing-latency writer-index=0 +---- +found ongoing + +# Waiting in write. +ongoing-latency writer-index=1 +---- +found ongoing + +# Let the write proceed in second writer. +wait-for-and-unblock filename=000004-001.log +---- + +ongoing-latency writer-index=0 +---- +found ongoing + +close-async +---- + +sleep +---- + +# Unblock the writing of the EOF trailer. +wait-for-and-unblock filename=000004-001.log +---- + +blocking-conf filename=000004-001.log +---- +000004-001.log: 0b0 + +wait-for-close do-not-stop-goroutines +---- +close: ok, offset: 18 +records: + record 0: synced +write bytes metric: 29 + +# First writer is still blocked. +ongoing-latency writer-index=0 +---- +found ongoing + +# Unblock first writer. +wait-for-and-unblock filename=000004.log +---- + +stop-goroutines-after-close +---- +log files: + pri/000004.log + EOF + sec/000004-001.log + 0: mammoth + EOF +log writers: + writer 0: no error + writer 1: no error + +# TODO(sumeer): More blocking test cases. diff --git a/wal/testdata/failover_writer/errors b/wal/testdata/failover_writer/errors new file mode 100644 index 0000000000..713c1b1fe4 --- /dev/null +++ b/wal/testdata/failover_writer/errors @@ -0,0 +1,388 @@ +# Switch once with tail of first log equal to the head of the second log. This +# is because the record at the tail did not request sync, so stayed in the +# queue when the switch happened, and was replayed. +init +---- + +write sync=true value=woolly +---- + +write sync=false value=sheep +---- + +wait-for-queue length=1 +---- + +switch +---- +ok + +write sync=false value=yak print-offset +---- +offset: 47 + +close +---- +close: ok, offset: 47 +records: + record 0: synced + record 1: no sync + record 2: no sync +write bytes metric: 41 +log files: + pri/000000.log + 0: woolly + 17: sheep + EOF + sec/000000-001.log + 0: sheep + 16: yak + EOF +log writers: + writer 0: no error + writer 1: no error + +# Error is injected on create of first log file. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/000001.log") (OnIndex 0)))) +---- + +# The offset is the cumulative length of the records since there is no writer. +write sync=true value=woolly print-offset +---- +offset: 6 + +# The offset is the cumulative length of the records since there is no writer. +write sync=true value=sheep print-offset +---- +offset: 11 + +switch +---- +ok + +close +---- +close: ok, offset: 11 +records: + record 0: synced + record 1: synced +write bytes metric: 44 +log files: + sec/000001-001.log + 0: woolly + 17: sheep + EOF +log writers: + writer 0: injected error + writer 1: no error + +# Error is injected on write of first (or first and second) record in first +# log file. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/000002.log") (OnIndex 1)))) +---- + +write sync=true value=woolly +---- + +write sync=true value=mammoth +---- + +switch +---- +ok + +close +---- +close: ok, offset: 35 +records: + record 0: synced + record 1: synced +write bytes metric: 46 +log files: + pri/000002.log + EOF + sec/000002-001.log + 0: woolly + 17: mammoth + EOF +log writers: + writer 0: injected error + writer 1: no error + +# Error is injected on sync of first (or first and second) record in first log +# file. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/000003.log") (OnIndex 2)))) +---- + +write sync=true value=woolly +---- + +write sync=true value=sheep +---- + +switch +---- +ok + +close +---- +close: ok, offset: 33 +records: + record 0: synced + record 1: synced +write bytes metric: 44 +log files: + pri/000003.log + EOF + sec/000003-001.log + 0: woolly + 17: sheep + EOF +log writers: + writer 0: injected error + writer 1: no error + +# Error is injected on create of first log file. Close does not block. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/000004.log") (OnIndex 0)))) +---- + +close +---- +close: injected error, offset: 0 +write bytes metric: 0 +log writers: + writer 0: injected error + +# Error is injected on the EOF write that happens in LogWriter.Close. Even +# though all records have already been synced, an error is exposed since the +# EOF trailer was not written. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/000005.log") (OnIndex 3)))) +---- + +write sync=true value=woolly +---- + +wait-for-queue length=0 +---- + +close +---- +close: injected error, offset: 17 +records: + record 0: synced +write bytes metric: 17 +log files: + pri/000005.log + 0: woolly + EOF +log writers: + writer 0: injected error + +# Error is injected on all writer creation. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/*.log")))) +---- + +write sync=true value=woolly print-offset +---- +offset: 6 + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +exceeded switching limit + +close sem-count=99 +---- +close: injected error, offset: 6 +records: + record 0: sync error injected error +write bytes metric: 0 +log writers: + writer 0: injected error + writer 1: injected error + writer 2: injected error + writer 3: injected error + writer 4: injected error + writer 5: injected error + writer 6: injected error + writer 7: injected error + writer 8: injected error + writer 9: injected error + +# Same as previous with record that did not request sync. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/*.log")))) +---- + +write sync=false value=woolly print-offset +---- +offset: 6 + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +switch +---- +ok + +close +---- +close: injected error, offset: 6 +records: + record 0: no sync +write bytes metric: 0 +log writers: + writer 0: injected error + writer 1: injected error + writer 2: injected error + writer 3: injected error + writer 4: injected error + writer 5: injected error + writer 6: injected error + writer 7: injected error + writer 8: injected error + writer 9: injected error + +# Injected error on sync of the primary dir. +init inject-errors=((ErrInjected (And Writes (PathMatch "pri") (OnIndex 0)))) +---- + +write sync=false value=woolly +---- + +switch +---- +ok + +close +---- +close: ok, offset: 6 +records: + record 0: no sync +write bytes metric: 28 +log files: + sec/000008-001.log + 0: woolly + EOF +log writers: + writer 0: injected error + writer 1: no error + +# Injected error on sync of the primary and secondary dir once. Switching back +# to the primary directory works. +init inject-errors=((ErrInjected (And Writes (Or (And (PathMatch "pri") (OnIndex 0)) (And (PathMatch "sec") (OnIndex 0)))))) +---- + +write sync=true value=woolly +---- + +switch +---- +ok + +switch +---- +ok + +close +---- +close: ok, offset: 6 +records: + record 0: synced +write bytes metric: 28 +log files: + pri/000009-002.log + 0: woolly + EOF + pri/000009.log + EOF +log writers: + writer 0: injected error + writer 1: injected error + writer 2: no error + +# Error is injected on one writer creation. Close does not wait for more +# switches to happen. +init inject-errors=((ErrInjected (And Writes (PathMatch "*/*.log")))) +---- + +write sync=true value=woolly print-offset +---- +offset: 6 + +switch +---- +ok + +close sem-count=99 +---- +close: injected error, offset: 6 +records: + record 0: sync error injected error +write bytes metric: 0 +log writers: + writer 0: injected error + writer 1: injected error diff --git a/wal/wal.go b/wal/wal.go index 90f3174b91..201c9f8a4d 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -138,7 +138,9 @@ type Options struct { QueueSemChan chan struct{} // ElevatedWriteStallThresholdLag is the duration for which an elevated - // threshold should continue after a switch back to the primary dir. + // threshold should continue after a switch back to the primary dir. This is + // because we may have accumulated many unflushed memtables and flushing + // them can take some time. Maybe set to 60s. ElevatedWriteStallThresholdLag time.Duration // Logger for logging. @@ -284,7 +286,7 @@ type Writer interface { Close() (logicalOffset int64, err error) // Metrics must be called after Close. The callee will no longer modify the // returned LogWriterMetrics. - Metrics() *record.LogWriterMetrics + Metrics() record.LogWriterMetrics } // Reader reads a virtual WAL.