Skip to content

Commit

Permalink
refactor: improve performance by prefixing lines when syncing (where …
Browse files Browse the repository at this point in the history
…is causes no slowdown), not when exporting (where it causes massive slowdown)
  • Loading branch information
FlorianLoch committed Feb 14, 2024
1 parent 50e6714 commit 75b0ac8
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 69 deletions.
30 changes: 2 additions & 28 deletions export.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package hibpsync

import (
"bufio"
"fmt"
"io"
)
Expand All @@ -19,8 +18,8 @@ func export(from, to int64, store storage, w io.Writer) error {
}
defer dataReader.Close()

if err := copyAndPrefixLines(dataReader, w, rangePrefix); err != nil {
return fmt.Errorf("copying data for range %q: %w", rangePrefix, err)
if _, err := io.Copy(w, dataReader); err != nil {
return fmt.Errorf("writing data for range %q: %w", rangePrefix, err)
}

if i+1 < to {
Expand All @@ -43,28 +42,3 @@ func export(from, to int64, store storage, w io.Writer) error {

return nil
}

func copyAndPrefixLines(in io.Reader, out io.Writer, prefix string) error {
firstLine := true

scanner := bufio.NewScanner(in)
for scanner.Scan() {
if !firstLine {
if _, err := out.Write(lineSeparator); err != nil {
return fmt.Errorf("writing line separator to export writer: %w", err)
}
}

firstLine = false

if _, err := out.Write([]byte(prefix)); err != nil {
return fmt.Errorf("writing prefix to export writer: %w", err)
}

if _, err := out.Write(scanner.Bytes()); err != nil {
return fmt.Errorf("writing suffix and count to export writer: %w", err)
}
}

return nil
}
6 changes: 3 additions & 3 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ func TestExport(t *testing.T) {
ctrl := gomock.NewController(t)
storageMock := NewMockstorage(ctrl)

storageMock.EXPECT().LoadData("00000").Return(io.NopCloser(bytes.NewReader([]byte("suffix:counter11\nsuffix:counter12"))), nil)
storageMock.EXPECT().LoadData("00001").Return(io.NopCloser(bytes.NewReader([]byte("suffix:counter2"))), nil)
storageMock.EXPECT().LoadData("00002").Return(io.NopCloser(bytes.NewReader([]byte("suffix:counter3"))), nil)
storageMock.EXPECT().LoadData("00000").Return(io.NopCloser(bytes.NewReader([]byte("00000suffix:counter11\n00000suffix:counter12"))), nil)
storageMock.EXPECT().LoadData("00001").Return(io.NopCloser(bytes.NewReader([]byte("00001suffix:counter2"))), nil)
storageMock.EXPECT().LoadData("00002").Return(io.NopCloser(bytes.NewReader([]byte("00002suffix:counter3"))), nil)

buf := bytes.NewBuffer([]byte{})

Expand Down
24 changes: 10 additions & 14 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hibpsync

import (
"bufio"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -76,8 +77,9 @@ func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) {
return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err)
}

if err := skipLine(file); err != nil {
file.Close()
if err := skipLine(file); err != nil && errors.Is(err, io.EOF) {
defer file.Close()

return nil, fmt.Errorf("skipping etag line in file %q: %w", f.filePath(key), err)
}

Expand All @@ -89,21 +91,15 @@ func skipLine(reader io.ReadSeeker) error {
br := bufio.NewReader(reader)

// Read until the first newline character
_, err := br.ReadString('\n')
if err != nil && err != io.EOF {
return err
}

// Get the current offset
offset, err := reader.Seek(0, io.SeekCurrent)
firstBytes, err := br.ReadBytes('\n')
if err != nil {
return err
return fmt.Errorf("reading first line: %w", err)
}

// Seek back to the beginning of the file
_, err = reader.Seek(offset, io.SeekStart)
if err != nil {
return err
// The buffered reader most likely will have read more from the reader than just the first line, so we need to
// seek back to the beginning of the second line.
if _, err = reader.Seek(int64(len(firstBytes)), io.SeekStart); err != nil {
return fmt.Errorf("seeking to beginning of second line: %w", err)
}

return nil
Expand Down
38 changes: 37 additions & 1 deletion sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package hibpsync

import (
"bufio"
"bytes"
"errors"
"fmt"
"github.com/alitto/pond"
Expand Down Expand Up @@ -42,7 +44,12 @@ func sync(from, to int64, client *hibpClient, store storage, pool *pond.WorkerPo
}

if !resp.NotModified {
if err := store.Save(rangePrefix, resp.ETag, resp.Data); err != nil {
prefixedLines, err := prefixLines(resp.Data, rangePrefix)
if err != nil {
return fmt.Errorf("prefixing lines: %w", err)
}

if err := store.Save(rangePrefix, resp.ETag, prefixedLines); err != nil {
return fmt.Errorf("saving range: %w", err)
}
}
Expand Down Expand Up @@ -84,6 +91,35 @@ func toRangeString(i int64) string {
return fmt.Sprintf("%05X", i)
}

func prefixLines(in []byte, prefix string) ([]byte, error) {
firstLine := true

// Actually, we know that the size will be: len(in) + rows * len(prefix)
// But we do not know the number of rows - so starting from len(in) seems to be a good choice.
out := bytes.NewBuffer(make([]byte, 0, len(in)))

scanner := bufio.NewScanner(bytes.NewReader(in))
for scanner.Scan() {
if !firstLine {
if _, err := out.Write(lineSeparator); err != nil {
return nil, fmt.Errorf("adding line separator: %w", err)
}
}

firstLine = false

if _, err := out.Write([]byte(prefix)); err != nil {
return nil, fmt.Errorf("adding prefix: %w", err)
}

if _, err := out.Write(scanner.Bytes()); err != nil {
return nil, fmt.Errorf("adding suffix and counter: %w", err)
}
}

return out.Bytes(), nil
}

func lowestInFlight(inFlight mapset.Set[int64], to int64) int64 {
lowest := int64(math.MaxInt64)

Expand Down
46 changes: 23 additions & 23 deletions sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,63 @@ func TestSync(t *testing.T) {
Get("/range/00000").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data1")
BodyString("suffix1")
gock.New(baseURL).
Get("/range/00001").
MatchHeader("If-None-Match", "etag received earlier").
Reply(http.StatusNotModified).
AddHeader("ETag", "etag received earlier").
BodyString("data2")
BodyString("suffix2")
gock.New(baseURL).
Get("/range/00002").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data3")
BodyString("suffix31:2\nsuffix32:3")
gock.New(baseURL).
Get("/range/00003").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data4")
BodyString("suffix4")
gock.New(baseURL).
Get("/range/00004").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data5")
BodyString("suffix5")
gock.New(baseURL).
Get("/range/00005").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data6")
BodyString("suffix6")
gock.New(baseURL).
Get("/range/00006").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data7")
BodyString("suffix7")
gock.New(baseURL).
Get("/range/00007").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data8")
BodyString("suffix8")
gock.New(baseURL).
Get("/range/00008").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data9")
BodyString("suffix9")
gock.New(baseURL).
Get("/range/00009").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data10")
BodyString("suffix10")
gock.New(baseURL).
Get("/range/0000A").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data11")
BodyString("suffix11")
gock.New(baseURL).
Get("/range/0000B").
Reply(200).
AddHeader("ETag", "etag").
BodyString("data12")
BodyString("suffix12")

client := &hibpClient{
endpoint: defaultEndpoint,
Expand All @@ -89,29 +89,29 @@ func TestSync(t *testing.T) {
storageMock := NewMockstorage(ctrl)

storageMock.EXPECT().LoadETag("00000").Return("", nil)
storageMock.EXPECT().Save("00000", "etag", []byte("data1")).Return(nil)
storageMock.EXPECT().Save("00000", "etag", []byte("00000suffix1")).Return(nil)
storageMock.EXPECT().LoadETag("00001").Return("etag received earlier", nil)
// 00001 does not need to be written as its ETag has not changed
storageMock.EXPECT().LoadETag("00002").Return("", nil)
storageMock.EXPECT().Save("00002", "etag", []byte("data3")).Return(nil)
storageMock.EXPECT().Save("00002", "etag", []byte("00002suffix31:2\n00002suffix32:3")).Return(nil)
storageMock.EXPECT().LoadETag("00003").Return("", nil)
storageMock.EXPECT().Save("00003", "etag", []byte("data4")).Return(nil)
storageMock.EXPECT().Save("00003", "etag", []byte("00003suffix4")).Return(nil)
storageMock.EXPECT().LoadETag("00004").Return("", nil)
storageMock.EXPECT().Save("00004", "etag", []byte("data5")).Return(nil)
storageMock.EXPECT().Save("00004", "etag", []byte("00004suffix5")).Return(nil)
storageMock.EXPECT().LoadETag("00005").Return("", nil)
storageMock.EXPECT().Save("00005", "etag", []byte("data6")).Return(nil)
storageMock.EXPECT().Save("00005", "etag", []byte("00005suffix6")).Return(nil)
storageMock.EXPECT().LoadETag("00006").Return("", nil)
storageMock.EXPECT().Save("00006", "etag", []byte("data7")).Return(nil)
storageMock.EXPECT().Save("00006", "etag", []byte("00006suffix7")).Return(nil)
storageMock.EXPECT().LoadETag("00007").Return("", nil)
storageMock.EXPECT().Save("00007", "etag", []byte("data8")).Return(nil)
storageMock.EXPECT().Save("00007", "etag", []byte("00007suffix8")).Return(nil)
storageMock.EXPECT().LoadETag("00008").Return("", nil)
storageMock.EXPECT().Save("00008", "etag", []byte("data9")).Return(nil)
storageMock.EXPECT().Save("00008", "etag", []byte("00008suffix9")).Return(nil)
storageMock.EXPECT().LoadETag("00009").Return("", nil)
storageMock.EXPECT().Save("00009", "etag", []byte("data10")).Return(nil)
storageMock.EXPECT().Save("00009", "etag", []byte("00009suffix10")).Return(nil)
storageMock.EXPECT().LoadETag("0000A").Return("", nil)
storageMock.EXPECT().Save("0000A", "etag", []byte("data11")).Return(nil)
storageMock.EXPECT().Save("0000A", "etag", []byte("0000Asuffix11")).Return(nil)
storageMock.EXPECT().LoadETag("0000B").Return("", nil)
storageMock.EXPECT().Save("0000B", "etag", []byte("data12")).Return(nil)
storageMock.EXPECT().Save("0000B", "etag", []byte("0000Bsuffix12")).Return(nil)

var callCounter atomic.Int64

Expand Down

0 comments on commit 75b0ac8

Please sign in to comment.