From 75b0ac838e39129e5dfc2a0d59a53a10746f6f90 Mon Sep 17 00:00:00 2001 From: Florian Loch Date: Wed, 14 Feb 2024 13:07:26 +0100 Subject: [PATCH] refactor: improve performance by prefixing lines when syncing (where is causes no slowdown), not when exporting (where it causes massive slowdown) --- export.go | 30 ++---------------------------- export_test.go | 6 +++--- storage.go | 24 ++++++++++-------------- sync.go | 38 +++++++++++++++++++++++++++++++++++++- sync_test.go | 46 +++++++++++++++++++++++----------------------- 5 files changed, 75 insertions(+), 69 deletions(-) diff --git a/export.go b/export.go index 0caa5b1..d76e959 100644 --- a/export.go +++ b/export.go @@ -1,7 +1,6 @@ package hibpsync import ( - "bufio" "fmt" "io" ) @@ -19,8 +18,8 @@ func export(from, to int64, store storage, w io.Writer) error { } defer dataReader.Close() - if err := copyAndPrefixLines(dataReader, w, rangePrefix); err != nil { - return fmt.Errorf("copying data for range %q: %w", rangePrefix, err) + if _, err := io.Copy(w, dataReader); err != nil { + return fmt.Errorf("writing data for range %q: %w", rangePrefix, err) } if i+1 < to { @@ -43,28 +42,3 @@ func export(from, to int64, store storage, w io.Writer) error { return nil } - -func copyAndPrefixLines(in io.Reader, out io.Writer, prefix string) error { - firstLine := true - - scanner := bufio.NewScanner(in) - for scanner.Scan() { - if !firstLine { - if _, err := out.Write(lineSeparator); err != nil { - return fmt.Errorf("writing line separator to export writer: %w", err) - } - } - - firstLine = false - - if _, err := out.Write([]byte(prefix)); err != nil { - return fmt.Errorf("writing prefix to export writer: %w", err) - } - - if _, err := out.Write(scanner.Bytes()); err != nil { - return fmt.Errorf("writing suffix and count to export writer: %w", err) - } - } - - return nil -} diff --git a/export_test.go b/export_test.go index 3a09e21..0d689f8 100644 --- a/export_test.go +++ b/export_test.go @@ -12,9 +12,9 @@ func TestExport(t *testing.T) { ctrl := gomock.NewController(t) storageMock := NewMockstorage(ctrl) - storageMock.EXPECT().LoadData("00000").Return(io.NopCloser(bytes.NewReader([]byte("suffix:counter11\nsuffix:counter12"))), nil) - storageMock.EXPECT().LoadData("00001").Return(io.NopCloser(bytes.NewReader([]byte("suffix:counter2"))), nil) - storageMock.EXPECT().LoadData("00002").Return(io.NopCloser(bytes.NewReader([]byte("suffix:counter3"))), nil) + storageMock.EXPECT().LoadData("00000").Return(io.NopCloser(bytes.NewReader([]byte("00000suffix:counter11\n00000suffix:counter12"))), nil) + storageMock.EXPECT().LoadData("00001").Return(io.NopCloser(bytes.NewReader([]byte("00001suffix:counter2"))), nil) + storageMock.EXPECT().LoadData("00002").Return(io.NopCloser(bytes.NewReader([]byte("00002suffix:counter3"))), nil) buf := bytes.NewBuffer([]byte{}) diff --git a/storage.go b/storage.go index 5c66ee3..2b4ef17 100644 --- a/storage.go +++ b/storage.go @@ -2,6 +2,7 @@ package hibpsync import ( "bufio" + "errors" "fmt" "io" "os" @@ -76,8 +77,9 @@ func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) { return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err) } - if err := skipLine(file); err != nil { - file.Close() + if err := skipLine(file); err != nil && errors.Is(err, io.EOF) { + defer file.Close() + return nil, fmt.Errorf("skipping etag line in file %q: %w", f.filePath(key), err) } @@ -89,21 +91,15 @@ func skipLine(reader io.ReadSeeker) error { br := bufio.NewReader(reader) // Read until the first newline character - _, err := br.ReadString('\n') - if err != nil && err != io.EOF { - return err - } - - // Get the current offset - offset, err := reader.Seek(0, io.SeekCurrent) + firstBytes, err := br.ReadBytes('\n') if err != nil { - return err + return fmt.Errorf("reading first line: %w", err) } - // Seek back to the beginning of the file - _, err = reader.Seek(offset, io.SeekStart) - if err != nil { - return err + // The buffered reader most likely will have read more from the reader than just the first line, so we need to + // seek back to the beginning of the second line. + if _, err = reader.Seek(int64(len(firstBytes)), io.SeekStart); err != nil { + return fmt.Errorf("seeking to beginning of second line: %w", err) } return nil diff --git a/sync.go b/sync.go index 0e45924..31a9a80 100644 --- a/sync.go +++ b/sync.go @@ -1,6 +1,8 @@ package hibpsync import ( + "bufio" + "bytes" "errors" "fmt" "github.com/alitto/pond" @@ -42,7 +44,12 @@ func sync(from, to int64, client *hibpClient, store storage, pool *pond.WorkerPo } if !resp.NotModified { - if err := store.Save(rangePrefix, resp.ETag, resp.Data); err != nil { + prefixedLines, err := prefixLines(resp.Data, rangePrefix) + if err != nil { + return fmt.Errorf("prefixing lines: %w", err) + } + + if err := store.Save(rangePrefix, resp.ETag, prefixedLines); err != nil { return fmt.Errorf("saving range: %w", err) } } @@ -84,6 +91,35 @@ func toRangeString(i int64) string { return fmt.Sprintf("%05X", i) } +func prefixLines(in []byte, prefix string) ([]byte, error) { + firstLine := true + + // Actually, we know that the size will be: len(in) + rows * len(prefix) + // But we do not know the number of rows - so starting from len(in) seems to be a good choice. + out := bytes.NewBuffer(make([]byte, 0, len(in))) + + scanner := bufio.NewScanner(bytes.NewReader(in)) + for scanner.Scan() { + if !firstLine { + if _, err := out.Write(lineSeparator); err != nil { + return nil, fmt.Errorf("adding line separator: %w", err) + } + } + + firstLine = false + + if _, err := out.Write([]byte(prefix)); err != nil { + return nil, fmt.Errorf("adding prefix: %w", err) + } + + if _, err := out.Write(scanner.Bytes()); err != nil { + return nil, fmt.Errorf("adding suffix and counter: %w", err) + } + } + + return out.Bytes(), nil +} + func lowestInFlight(inFlight mapset.Set[int64], to int64) int64 { lowest := int64(math.MaxInt64) diff --git a/sync_test.go b/sync_test.go index e8fe923..6e7885a 100644 --- a/sync_test.go +++ b/sync_test.go @@ -22,63 +22,63 @@ func TestSync(t *testing.T) { Get("/range/00000"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data1") + BodyString("suffix1") gock.New(baseURL). Get("/range/00001"). MatchHeader("If-None-Match", "etag received earlier"). Reply(http.StatusNotModified). AddHeader("ETag", "etag received earlier"). - BodyString("data2") + BodyString("suffix2") gock.New(baseURL). Get("/range/00002"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data3") + BodyString("suffix31:2\nsuffix32:3") gock.New(baseURL). Get("/range/00003"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data4") + BodyString("suffix4") gock.New(baseURL). Get("/range/00004"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data5") + BodyString("suffix5") gock.New(baseURL). Get("/range/00005"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data6") + BodyString("suffix6") gock.New(baseURL). Get("/range/00006"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data7") + BodyString("suffix7") gock.New(baseURL). Get("/range/00007"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data8") + BodyString("suffix8") gock.New(baseURL). Get("/range/00008"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data9") + BodyString("suffix9") gock.New(baseURL). Get("/range/00009"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data10") + BodyString("suffix10") gock.New(baseURL). Get("/range/0000A"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data11") + BodyString("suffix11") gock.New(baseURL). Get("/range/0000B"). Reply(200). AddHeader("ETag", "etag"). - BodyString("data12") + BodyString("suffix12") client := &hibpClient{ endpoint: defaultEndpoint, @@ -89,29 +89,29 @@ func TestSync(t *testing.T) { storageMock := NewMockstorage(ctrl) storageMock.EXPECT().LoadETag("00000").Return("", nil) - storageMock.EXPECT().Save("00000", "etag", []byte("data1")).Return(nil) + storageMock.EXPECT().Save("00000", "etag", []byte("00000suffix1")).Return(nil) storageMock.EXPECT().LoadETag("00001").Return("etag received earlier", nil) // 00001 does not need to be written as its ETag has not changed storageMock.EXPECT().LoadETag("00002").Return("", nil) - storageMock.EXPECT().Save("00002", "etag", []byte("data3")).Return(nil) + storageMock.EXPECT().Save("00002", "etag", []byte("00002suffix31:2\n00002suffix32:3")).Return(nil) storageMock.EXPECT().LoadETag("00003").Return("", nil) - storageMock.EXPECT().Save("00003", "etag", []byte("data4")).Return(nil) + storageMock.EXPECT().Save("00003", "etag", []byte("00003suffix4")).Return(nil) storageMock.EXPECT().LoadETag("00004").Return("", nil) - storageMock.EXPECT().Save("00004", "etag", []byte("data5")).Return(nil) + storageMock.EXPECT().Save("00004", "etag", []byte("00004suffix5")).Return(nil) storageMock.EXPECT().LoadETag("00005").Return("", nil) - storageMock.EXPECT().Save("00005", "etag", []byte("data6")).Return(nil) + storageMock.EXPECT().Save("00005", "etag", []byte("00005suffix6")).Return(nil) storageMock.EXPECT().LoadETag("00006").Return("", nil) - storageMock.EXPECT().Save("00006", "etag", []byte("data7")).Return(nil) + storageMock.EXPECT().Save("00006", "etag", []byte("00006suffix7")).Return(nil) storageMock.EXPECT().LoadETag("00007").Return("", nil) - storageMock.EXPECT().Save("00007", "etag", []byte("data8")).Return(nil) + storageMock.EXPECT().Save("00007", "etag", []byte("00007suffix8")).Return(nil) storageMock.EXPECT().LoadETag("00008").Return("", nil) - storageMock.EXPECT().Save("00008", "etag", []byte("data9")).Return(nil) + storageMock.EXPECT().Save("00008", "etag", []byte("00008suffix9")).Return(nil) storageMock.EXPECT().LoadETag("00009").Return("", nil) - storageMock.EXPECT().Save("00009", "etag", []byte("data10")).Return(nil) + storageMock.EXPECT().Save("00009", "etag", []byte("00009suffix10")).Return(nil) storageMock.EXPECT().LoadETag("0000A").Return("", nil) - storageMock.EXPECT().Save("0000A", "etag", []byte("data11")).Return(nil) + storageMock.EXPECT().Save("0000A", "etag", []byte("0000Asuffix11")).Return(nil) storageMock.EXPECT().LoadETag("0000B").Return("", nil) - storageMock.EXPECT().Save("0000B", "etag", []byte("data12")).Return(nil) + storageMock.EXPECT().Save("0000B", "etag", []byte("0000Bsuffix12")).Return(nil) var callCounter atomic.Int64