Skip to content

Commit

Permalink
feat: add zstd compression, cuts required disk space in half without …
Browse files Browse the repository at this point in the history
…throughput penalties
  • Loading branch information
FlorianLoch committed Feb 14, 2024
1 parent d1e35a9 commit 2806bcb
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 19 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/h2non/gock v1.2.0
github.com/hashicorp/go-retryablehttp v0.7.5
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213
github.com/klauspost/compress v1.17.6
github.com/schollz/progressbar/v3 v3.14.1
go.uber.org/mock v0.4.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213 h1:qGQQKEcAR99REcMpsXCp3lJ03zYT1PkRd3kQGPn9GVg=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
Expand Down
17 changes: 12 additions & 5 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ const (
type ProgressFunc func(lowest, current, to, processed, remaining int64) error

type config struct {
dataDir string
endpoint string
minWorkers int
progressFn ProgressFunc
stateFile io.ReadWriteSeeker
dataDir string
endpoint string
minWorkers int
progressFn ProgressFunc
stateFile io.ReadWriteSeeker
noCompression bool
}

type Option func(*config)
Expand Down Expand Up @@ -61,6 +62,12 @@ func WithProgressFn(progressFn ProgressFunc) Option {
}
}

func WithNoCompression() Option {
return func(c *config) {
c.noCompression = true
}
}

func Sync(options ...Option) error {
config := &config{
dataDir: defaultDataDir,
Expand Down
89 changes: 75 additions & 14 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"errors"
"fmt"
"github.com/klauspost/compress/zstd"
"io"
"os"
"path"
Expand All @@ -21,48 +22,80 @@ type storage interface {
}

type fsStorage struct {
dataDir string
writeLock syncPkg.Mutex
dataDir string
writeLock syncPkg.Mutex
doNotUseCompression bool
}

var _ storage = (*fsStorage)(nil)

func (f *fsStorage) Save(key, etag string, data []byte) error {
// We need to synchronize calls to Save because we don't want to create the same parent directory for several files
// at the same time.
f.writeLock.Lock()
defer f.writeLock.Unlock()

if err := os.MkdirAll(f.subDir(key), dirMode); err != nil {
if err := f.createDirs(key); err != nil {
return fmt.Errorf("creating data directory: %w", err)
}

filePath := f.filePath(key)

file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("creating file %q: %w", filePath, err)
}
defer file.Close()

if _, err := file.WriteString(etag + "\n"); err != nil {
var w io.Writer = file

// We use the default compression level as non-scientific tests have shown that it's by far the best trade-off
// between compression ratio and speed.
if !f.doNotUseCompression {
enc, err := zstd.NewWriter(file)
if err != nil {
return fmt.Errorf("creating zstd writer: %w", err)
}
defer enc.Close()

w = enc
}

if _, err := w.Write([]byte(etag + "\n")); err != nil {
return fmt.Errorf("writing etag to file %q: %w", filePath, err)
}

if _, err := file.Write(data); err != nil {
if _, err := w.Write(data); err != nil {
return fmt.Errorf("writing data to file %q: %w", filePath, err)
}

return nil
}

func (f *fsStorage) createDirs(key string) error {
// We need to synchronize calls to Save because we don't want to create the same parent directory for several files
// at the same time.
f.writeLock.Lock()
defer f.writeLock.Unlock()

return os.MkdirAll(f.subDir(key), dirMode)
}

func (f *fsStorage) LoadETag(key string) (string, error) {
file, err := os.Open(f.filePath(key))
if err != nil {
return "", fmt.Errorf("opening file %q: %w", f.filePath(key), err)
}
defer file.Close()

etag, err := bufio.NewReader(file).ReadString('\n')
var r io.Reader = file

if !f.doNotUseCompression {
dec, err := zstd.NewReader(file)
if err != nil {
return "", fmt.Errorf("creating zstd reader: %w", err)
}
defer dec.Close()

r = dec
}

etag, err := bufio.NewReader(r).ReadString('\n')
if err != nil {
return "", fmt.Errorf("reading etag from file %q: %w", f.filePath(key), err)
}
Expand All @@ -77,19 +110,43 @@ func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) {
return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err)
}

var (
r io.Reader = file
dec *zstd.Decoder
)

if !f.doNotUseCompression {
dec, err = zstd.NewReader(file)
if err != nil {
return nil, fmt.Errorf("creating zstd reader: %w", err)
}

r = dec
}

// Create a new buffered reader for efficient reading
bufReader := bufio.NewReaderSize(file, 64*1024) // 64KB buffer - this should fit the whole file
bufReader := bufio.NewReaderSize(r, 64*1024) // 64KB buffer - this should fit the whole file

// Skip the first line containing the etag
if _, _, err := bufReader.ReadLine(); err != nil && !errors.Is(err, io.EOF) {
defer file.Close()
if dec != nil {
defer dec.Close()
}

return nil, fmt.Errorf("skipping etag line in file %q: %w", f.filePath(key), err)
}

return &closableReader{
Reader: bufReader,
Closer: file,
closeFn: func() error {
defer file.Close()
if dec != nil {
defer dec.Close()
}

return nil
},
}, nil
}

Expand All @@ -104,5 +161,9 @@ func (f *fsStorage) filePath(key string) string {

type closableReader struct {
io.Reader
io.Closer
closeFn func() error
}

func (c *closableReader) Close() error {
return c.closeFn()
}

0 comments on commit 2806bcb

Please sign in to comment.