From 2806bcb98ec2b58736aec61c93420f40261bdf1f Mon Sep 17 00:00:00 2001 From: Florian Loch Date: Wed, 14 Feb 2024 18:11:11 +0100 Subject: [PATCH] feat: add zstd compression, cuts required disk space in half without throughput penalties --- go.mod | 1 + go.sum | 2 ++ lib.go | 17 ++++++++--- storage.go | 89 +++++++++++++++++++++++++++++++++++++++++++++--------- 4 files changed, 90 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index aa0b1d2..e045203 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/h2non/gock v1.2.0 github.com/hashicorp/go-retryablehttp v0.7.5 github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 + github.com/klauspost/compress v1.17.6 github.com/schollz/progressbar/v3 v3.14.1 go.uber.org/mock v0.4.0 ) diff --git a/go.sum b/go.sum index 8af5605..6ec21a8 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 h1:qGQQKEcAR99REcMpsXCp3lJ03zYT1PkRd3kQGPn9GVg= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= +github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= +github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= diff --git a/lib.go b/lib.go index f737f1d..62907a4 100644 --- a/lib.go +++ b/lib.go @@ -22,11 +22,12 @@ const ( type ProgressFunc func(lowest, current, to, processed, remaining int64) error type config struct { - dataDir string - endpoint string - minWorkers int - progressFn ProgressFunc - stateFile io.ReadWriteSeeker + dataDir string + endpoint string + minWorkers int + progressFn ProgressFunc + stateFile io.ReadWriteSeeker + noCompression bool } type Option func(*config) @@ -61,6 +62,12 @@ func WithProgressFn(progressFn ProgressFunc) Option { } } +func WithNoCompression() Option { + return func(c *config) { + c.noCompression = true + } +} + func Sync(options ...Option) error { config := &config{ dataDir: defaultDataDir, diff --git a/storage.go b/storage.go index fda253e..dcac537 100644 --- a/storage.go +++ b/storage.go @@ -4,6 +4,7 @@ import ( "bufio" "errors" "fmt" + "github.com/klauspost/compress/zstd" "io" "os" "path" @@ -21,40 +22,60 @@ type storage interface { } type fsStorage struct { - dataDir string - writeLock syncPkg.Mutex + dataDir string + writeLock syncPkg.Mutex + doNotUseCompression bool } var _ storage = (*fsStorage)(nil) func (f *fsStorage) Save(key, etag string, data []byte) error { - // We need to synchronize calls to Save because we don't want to create the same parent directory for several files - // at the same time. - f.writeLock.Lock() - defer f.writeLock.Unlock() - - if err := os.MkdirAll(f.subDir(key), dirMode); err != nil { + if err := f.createDirs(key); err != nil { return fmt.Errorf("creating data directory: %w", err) } filePath := f.filePath(key) + file, err := os.Create(filePath) if err != nil { return fmt.Errorf("creating file %q: %w", filePath, err) } defer file.Close() - if _, err := file.WriteString(etag + "\n"); err != nil { + var w io.Writer = file + + // We use the default compression level as non-scientific tests have shown that it's by far the best trade-off + // between compression ratio and speed. + if !f.doNotUseCompression { + enc, err := zstd.NewWriter(file) + if err != nil { + return fmt.Errorf("creating zstd writer: %w", err) + } + defer enc.Close() + + w = enc + } + + if _, err := w.Write([]byte(etag + "\n")); err != nil { return fmt.Errorf("writing etag to file %q: %w", filePath, err) } - if _, err := file.Write(data); err != nil { + if _, err := w.Write(data); err != nil { return fmt.Errorf("writing data to file %q: %w", filePath, err) } return nil } +func (f *fsStorage) createDirs(key string) error { + // We need to synchronize calls to Save because we don't want to create the same parent directory for several files + // at the same time. + f.writeLock.Lock() + defer f.writeLock.Unlock() + + return os.MkdirAll(f.subDir(key), dirMode) +} + func (f *fsStorage) LoadETag(key string) (string, error) { file, err := os.Open(f.filePath(key)) if err != nil { @@ -62,7 +83,19 @@ func (f *fsStorage) LoadETag(key string) (string, error) { } defer file.Close() - etag, err := bufio.NewReader(file).ReadString('\n') + var r io.Reader = file + + if !f.doNotUseCompression { + dec, err := zstd.NewReader(file) + if err != nil { + return "", fmt.Errorf("creating zstd reader: %w", err) + } + defer dec.Close() + + r = dec + } + + etag, err := bufio.NewReader(r).ReadString('\n') if err != nil { return "", fmt.Errorf("reading etag from file %q: %w", f.filePath(key), err) } @@ -77,19 +110,43 @@ func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) { return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err) } + var ( + r io.Reader = file + dec *zstd.Decoder + ) + + if !f.doNotUseCompression { + dec, err = zstd.NewReader(file) + if err != nil { + return nil, fmt.Errorf("creating zstd reader: %w", err) + } + + r = dec + } + // Create a new buffered reader for efficient reading - bufReader := bufio.NewReaderSize(file, 64*1024) // 64KB buffer - this should fit the whole file + bufReader := bufio.NewReaderSize(r, 64*1024) // 64KB buffer - this should fit the whole file // Skip the first line containing the etag if _, _, err := bufReader.ReadLine(); err != nil && !errors.Is(err, io.EOF) { defer file.Close() + if dec != nil { + defer dec.Close() + } return nil, fmt.Errorf("skipping etag line in file %q: %w", f.filePath(key), err) } return &closableReader{ Reader: bufReader, - Closer: file, + closeFn: func() error { + defer file.Close() + if dec != nil { + defer dec.Close() + } + + return nil + }, }, nil } @@ -104,5 +161,9 @@ func (f *fsStorage) filePath(key string) string { type closableReader struct { io.Reader - io.Closer + closeFn func() error +} + +func (c *closableReader) Close() error { + return c.closeFn() }