From 04c77563a6b4830a5d06c416afbfb090c7e914c6 Mon Sep 17 00:00:00 2001 From: Brijesh Lakkad Date: Sat, 20 Aug 2022 12:43:44 -0400 Subject: [PATCH] log module will be used for replication. (async) --- go.mod | 1 + go.sum | 8 +- internal/log/config.go | 9 ++ internal/log/index.go | 93 ++++++++++++++++ internal/log/index_test.go | 55 ++++++++++ internal/log/log.go | 201 +++++++++++++++++++++++++++++++++++ internal/log/log_test.go | 121 +++++++++++++++++++++ internal/log/segment.go | 116 ++++++++++++++++++++ internal/log/segment_test.go | 59 ++++++++++ internal/log/store.go | 104 ++++++++++++++++++ internal/log/store_test.go | 102 ++++++++++++++++++ 11 files changed, 868 insertions(+), 1 deletion(-) create mode 100644 internal/log/config.go create mode 100644 internal/log/index.go create mode 100644 internal/log/index_test.go create mode 100644 internal/log/log.go create mode 100644 internal/log/log_test.go create mode 100644 internal/log/segment.go create mode 100644 internal/log/segment_test.go create mode 100644 internal/log/store.go create mode 100644 internal/log/store_test.go diff --git a/go.mod b/go.mod index 4d0091f..0970782 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 github.com/travisjeffery/go-dynaport v1.0.0 + github.com/tysonmote/gommap v0.0.2 go.opencensus.io v0.23.0 go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 163d990..88edff3 100644 --- a/go.sum +++ b/go.sum @@ -297,6 +297,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -420,6 +421,8 @@ github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= github.com/travisjeffery/go-dynaport v1.0.0/go.mod h1:0LHuDS4QAx+mAc4ri3WkQdavgVoBIZ7cE9ob17KIAJk= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/tysonmote/gommap v0.0.2 h1:TNTjXaXxiLWuWVTU9BfSb1bAEvfrptf8m5+N3LyTd6Q= +github.com/tysonmote/gommap v0.0.2/go.mod h1:zZKhSp7mLDDzdl8MHbaDEJ3PH9VibPlFXV1t+4wmC00= github.com/umpc/go-sortedmap v0.0.0-20180422175548-64ab94c482f4/go.mod h1:X6iKjXCleSyo/LZzKZ9zDF/ZB2L9gC36I5gLMf32w3M= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -932,8 +935,9 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= @@ -958,6 +962,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54= +launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/internal/log/config.go b/internal/log/config.go new file mode 100644 index 0000000..2df654d --- /dev/null +++ b/internal/log/config.go @@ -0,0 +1,9 @@ +package log + +type Config struct { + Segment struct { + MaxStoreBytes uint64 + MaxIndexBytes uint64 + InitialOffset uint64 + } +} diff --git a/internal/log/index.go b/internal/log/index.go new file mode 100644 index 0000000..2e25f92 --- /dev/null +++ b/internal/log/index.go @@ -0,0 +1,93 @@ +package log + +import ( + "io" + "os" + + "github.com/tysonmote/gommap" +) + +var ( + offWidth uint64 = 4 + posWidth uint64 = 8 + endWidth = offWidth + posWidth +) + +type index struct { + file *os.File + mmap gommap.MMap + size uint64 +} + +func newIndex(f *os.File, c Config) (*index, error) { + idx := &index{ + file: f, + } + fi, err := os.Stat(f.Name()) + if err != nil { + return nil, err + } + + idx.size = uint64(fi.Size()) + if err = os.Truncate( + f.Name(), int64(c.Segment.MaxIndexBytes), + ); err != nil { + return nil, err + } + + if idx.mmap, err = gommap.Map( + idx.file.Fd(), + gommap.PROT_READ|gommap.PROT_WRITE, + gommap.MAP_SHARED, + ); err != nil { + return nil, err + } + return idx, nil +} + +func (i *index) Close() error { + if err := i.mmap.Sync(gommap.MS_SYNC); err != nil { + return err + } + if err := i.file.Sync(); err != nil { + return err + } + if err := i.file.Truncate(int64(i.size)); err != nil { + return err + } + + return i.file.Close() +} + +func (i *index) Read(in int64) (out uint32, pos uint64, err error) { + if i.size == 0 { + return 0, 0, io.EOF + } + if in == -1 { + out = uint32((i.size / endWidth) - 1) + } else { + out = uint32(in) + } + pos = uint64(out) * endWidth + if i.size < pos+endWidth { + return 0, 0, io.EOF + } + out = enc.Uint32(i.mmap[pos : pos+offWidth]) + pos = enc.Uint64(i.mmap[pos+offWidth : pos+endWidth]) + return out, pos, nil +} + +func (i *index) Write(off uint32, pos uint64) error { + if uint64(len(i.mmap)) < i.size+endWidth { + return io.EOF + } + enc.PutUint32(i.mmap[i.size:i.size+offWidth], off) + enc.PutUint64(i.mmap[i.size+offWidth:i.size+endWidth], pos) + + i.size += uint64(endWidth) + return nil +} + +func (i *index) Name() string { + return i.file.Name() +} diff --git a/internal/log/index_test.go b/internal/log/index_test.go new file mode 100644 index 0000000..c22c7ef --- /dev/null +++ b/internal/log/index_test.go @@ -0,0 +1,55 @@ +package log + +import ( + "io" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIndex(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "index_test") + require.NoError(t, err) + defer os.Remove(f.Name()) + + c := Config{} + c.Segment.MaxIndexBytes = 1024 + idx, err := newIndex(f, c) + require.NoError(t, err) + _, _, err = idx.Read(-1) + require.Error(t, err) + require.Equal(t, f.Name(), idx.Name()) + + entries := []struct { + Off uint32 + Pos uint64 + }{ + {Off: 0, Pos: 0}, + {Off: 1, Pos: 10}, + } + + for _, want := range entries { + err = idx.Write(want.Off, want.Pos) + require.NoError(t, err) + + _, pos, err := idx.Read(int64(want.Off)) + require.NoError(t, err) + require.Equal(t, want.Pos, pos) + } + + // index and scanner should error when reading past existing entries + _, _, err = idx.Read(int64(len(entries))) + require.Equal(t, io.EOF, err) + _ = idx.Close() + + // index should build its state from the existing file + f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600) + idx, err = newIndex(f, c) + require.NoError(t, err) + off, pos, err := idx.Read(-1) + require.NoError(t, err) + require.Equal(t, uint32(1), off) + require.Equal(t, entries[1].Pos, pos) +} diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 0000000..c0af8c2 --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,201 @@ +package log + +import ( + "errors" + "io" + "io/ioutil" + "os" + "path" + "sort" + "strconv" + "strings" + "sync" +) + +var ( + ErrOffsetOutOfRange = errors.New("the requested offset is outside the log's range.") +) + +type Log struct { + mu sync.RWMutex + + Dir string + Config Config + + activeSegment *segment + segments []*segment +} + +func NewLog(dir string, c Config) (*Log, error) { + if c.Segment.MaxStoreBytes == 0 { + c.Segment.MaxStoreBytes = 1024 + } + if c.Segment.MaxIndexBytes == 0 { + c.Segment.MaxIndexBytes = 1024 + } + l := &Log{ + Dir: dir, + Config: c, + } + + return l, l.setup() +} + +func (l *Log) setup() error { + files, err := ioutil.ReadDir(l.Dir) + if err != nil { + return err + } + var baseOffsets []uint64 + for _, file := range files { + offStr := strings.TrimSuffix( + file.Name(), + path.Ext(file.Name()), + ) + off, _ := strconv.ParseUint(offStr, 10, 0) + baseOffsets = append(baseOffsets, off) + } + sort.Slice(baseOffsets, func(i, j int) bool { + return baseOffsets[i] < baseOffsets[j] + }) + for i := 0; i < len(baseOffsets); i++ { + if err = l.newSegment(baseOffsets[i]); err != nil { + return err + } + // baseOffset contains dup for index and store, so we skip the dup + i++ + } + if l.segments == nil { + if err = l.newSegment(l.Config.Segment.InitialOffset); err != nil { + return err + } + } + return nil +} + +func (l *Log) Append(record *Record) (uint64, error) { + l.mu.Lock() + defer l.mu.Unlock() + // https://github.com/travisjeffery/proglog/issues/6 + off, err := l.activeSegment.Append(record) + if err != nil { + return 0, err + } + if l.activeSegment.IsMaxed() { + err = l.newSegment(off + 1) + } + return off, err +} + +func (l *Log) Read(off uint64) (*Record, error) { + l.mu.RLock() + defer l.mu.RUnlock() + var s *segment + for _, segment := range l.segments { + if segment.baseOffset <= off && off < segment.nextOffset { + s = segment + break + } + } + if s == nil || s.nextOffset <= off { + return nil, ErrOffsetOutOfRange + } + return s.Read(off) +} + +func (l *Log) newSegment(off uint64) error { + s, err := newSegment(l.Dir, off, l.Config) + if err != nil { + return err + } + l.segments = append(l.segments, s) + l.activeSegment = s + return nil +} + +func (l *Log) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + for _, segment := range l.segments { + if err := segment.Close(); err != nil { + return err + } + } + return nil +} + +func (l *Log) Remove() error { + if err := l.Close(); err != nil { + return err + } + return os.RemoveAll(l.Dir) +} + +func (l *Log) Reset() error { + if err := l.Remove(); err != nil { + return err + } + return l.setup() +} + +// To know what nodes have the oldest and newest data and what nodes are falling behind and need to replicate. +func (l *Log) LowestOffset() (uint64, error) { + l.mu.RLock() + defer l.mu.RUnlock() + return l.segments[0].baseOffset, nil +} + +func (l *Log) HighestOffset() (uint64, error) { + l.mu.RLock() + defer l.mu.RUnlock() + off := l.segments[len(l.segments)-1].nextOffset + if off == 0 { + return 0, nil + } + return off - 1, nil +} + +// (In the future, periodically) call Truncate to remove old segments whose data we (hopefully) have processed by then and don’t need anymore. +func (l *Log) Truncate(lowest uint64) error { + l.mu.Lock() + defer l.mu.Unlock() + var segments []*segment + for _, s := range l.segments { + if s.nextOffset <= lowest+1 { + if err := s.Remove(); err != nil { + return err + } + continue + } + segments = append(segments, s) + } + l.segments = segments + return nil +} + +func (l *Log) Reader() io.Reader { + l.mu.RLock() + defer l.mu.RUnlock() + readers := make([]io.Reader, len(l.segments)) + for i, segment := range l.segments { + readers[i] = &originReader{segment.store, 0} + } + return io.MultiReader(readers...) +} + +type originReader struct { + *store + off int64 +} + +func (o *originReader) Read(p []byte) (int, error) { + n, err := o.ReadAt(p, o.off) + o.off += int64(n) + return n, err +} + +type Record struct { + Value []byte + Offset uint64 + Term uint64 +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go new file mode 100644 index 0000000..8870ed9 --- /dev/null +++ b/internal/log/log_test.go @@ -0,0 +1,121 @@ +// START: intro +package log + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLog(t *testing.T) { + for scenario, fn := range map[string]func( + t *testing.T, log *Log, + ){ + "append and read a record succeeds": testAppendRead, + "offset out of range error": testOutOfRangeErr, + "init with existing segments": testInitExisting, + "reader": testReader, + "truncate": testTruncate, + } { + t.Run(scenario, func(t *testing.T) { + dir, err := ioutil.TempDir("", "store-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + c := Config{} + c.Segment.MaxStoreBytes = 32 + log, err := NewLog(dir, c) + require.NoError(t, err) + + fn(t, log) + }) + } +} + +func testAppendRead(t *testing.T, log *Log) { + append := &Record{ + Value: []byte("hello world"), + } + off, err := log.Append(append) + require.NoError(t, err) + require.Equal(t, uint64(0), off) + + read, err := log.Read(off) + require.NoError(t, err) + require.Equal(t, append.Value, read.Value) +} + +func testOutOfRangeErr(t *testing.T, log *Log) { + read, err := log.Read(1) + require.Nil(t, read) + require.Equal(t, err, ErrOffsetOutOfRange) +} + +func testInitExisting(t *testing.T, o *Log) { + append := &Record{ + Value: []byte("hello world"), + } + for i := 0; i < 3; i++ { + _, err := o.Append(append) + require.NoError(t, err) + } + require.NoError(t, o.Close()) + + off, err := o.LowestOffset() + require.NoError(t, err) + require.Equal(t, uint64(0), off) + off, err = o.HighestOffset() + require.NoError(t, err) + require.Equal(t, uint64(2), off) + + n, err := NewLog(o.Dir, o.Config) + require.NoError(t, err) + + off, err = n.LowestOffset() + require.NoError(t, err) + require.Equal(t, uint64(0), off) + off, err = n.HighestOffset() + require.NoError(t, err) + require.Equal(t, uint64(2), off) +} + +func testReader(t *testing.T, log *Log) { + append := &Record{ + Value: []byte("hello world"), + } + off, err := log.Append(append) + require.NoError(t, err) + require.Equal(t, uint64(0), off) + + reader := log.Reader() + b, err := ioutil.ReadAll(reader) + require.NoError(t, err) + + read := &Record{} + buffer := new(bytes.Buffer) + buffer.Write(b[lenWidth:]) + + err = json.NewDecoder(buffer).Decode(read) + require.NoError(t, err) + require.Equal(t, append.Value, read.Value) +} + +func testTruncate(t *testing.T, log *Log) { + append := &Record{ + Value: []byte("hello world"), + } + for i := 0; i < 3; i++ { + _, err := log.Append(append) + require.NoError(t, err) + } + + err := log.Truncate(1) + require.NoError(t, err) + + _, err = log.Read(0) + require.Error(t, err) +} diff --git a/internal/log/segment.go b/internal/log/segment.go new file mode 100644 index 0000000..c1a5c93 --- /dev/null +++ b/internal/log/segment.go @@ -0,0 +1,116 @@ +// START: intro +package log + +import ( + "fmt" + "os" + "path" +) + +type segment struct { + store *store + index *index + baseOffset, nextOffset uint64 + config Config +} + +func newSegment(dir string, baseOffset uint64, c Config) (*segment, error) { + s := &segment{ + baseOffset: baseOffset, + config: c, + } + var err error + storeFile, err := os.OpenFile( + path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".store")), + os.O_RDWR|os.O_CREATE|os.O_APPEND, + 0644, + ) + if err != nil { + return nil, err + } + if s.store, err = newStore(storeFile); err != nil { + return nil, err + } + indexFile, err := os.OpenFile( + path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".index")), + os.O_RDWR|os.O_CREATE, + 0644, + ) + if err != nil { + return nil, err + } + if s.index, err = newIndex(indexFile, c); err != nil { + return nil, err + } + if off, _, err := s.index.Read(-1); err != nil { + s.nextOffset = baseOffset + } else { + s.nextOffset = baseOffset + uint64(off) + 1 + } + return s, nil +} + +func (s *segment) Append(record *Record) (offset uint64, err error) { + cur := s.nextOffset + record.Offset = cur + _, pos, err := s.store.Append(record) + if err != nil { + return 0, err + } + if err = s.index.Write( + // index offsets are relative to base offset + uint32(s.nextOffset-uint64(s.baseOffset)), + pos, + ); err != nil { + return 0, err + } + s.nextOffset++ + return cur, nil +} + +func (s *segment) Read(off uint64) (*Record, error) { + _, pos, err := s.index.Read(int64(off - s.baseOffset)) + if err != nil { + return nil, err + } + record, err := s.store.Read(pos) + if err != nil { + return nil, err + } + return record, err +} + +func (s *segment) IsMaxed() bool { + return s.store.size >= s.config.Segment.MaxStoreBytes || + s.index.size >= s.config.Segment.MaxIndexBytes +} + +func (s *segment) Close() error { + if err := s.index.Close(); err != nil { + return err + } + if err := s.store.Close(); err != nil { + return err + } + return nil +} + +func (s *segment) Remove() error { + if err := s.Close(); err != nil { + return err + } + if err := os.Remove(s.index.Name()); err != nil { + return err + } + if err := os.Remove(s.store.Name()); err != nil { + return err + } + return nil +} + +func nearestMultiple(j, k uint64) uint64 { + if j >= 0 { + return (j / k) * k + } + return ((j - k + 1) / k) * k +} diff --git a/internal/log/segment_test.go b/internal/log/segment_test.go new file mode 100644 index 0000000..6902998 --- /dev/null +++ b/internal/log/segment_test.go @@ -0,0 +1,59 @@ +package log + +import ( + "io" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSegment(t *testing.T) { + dir, _ := ioutil.TempDir("", "segment-test") + defer os.RemoveAll(dir) + + want := &Record{Value: []byte("hello world")} + + c := Config{} + c.Segment.MaxStoreBytes = 1024 + c.Segment.MaxIndexBytes = endWidth * 3 + + s, err := newSegment(dir, 16, c) + require.NoError(t, err) + require.Equal(t, uint64(16), s.nextOffset, s.nextOffset) + require.False(t, s.IsMaxed()) + + for i := uint64(0); i < 3; i++ { + off, err := s.Append(want) + require.NoError(t, err) + require.Equal(t, 16+i, off) + + got, err := s.Read(off) + require.NoError(t, err) + require.Equal(t, want.Value, got.Value) + } + + _, err = s.Append(want) + require.Equal(t, io.EOF, err) + + // maxed index + require.True(t, s.IsMaxed()) + + c.Segment.MaxStoreBytes = uint64(len(want.Value) * 3) + c.Segment.MaxIndexBytes = 1024 + + // segment must be closed + s.Close() + + s, err = newSegment(dir, 16, c) + require.NoError(t, err) + // maxed store + require.True(t, s.IsMaxed()) + + err = s.Remove() + require.NoError(t, err) + s, err = newSegment(dir, 16, c) + require.NoError(t, err) + require.False(t, s.IsMaxed()) +} diff --git a/internal/log/store.go b/internal/log/store.go new file mode 100644 index 0000000..e06dfa3 --- /dev/null +++ b/internal/log/store.go @@ -0,0 +1,104 @@ +package log + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "os" + "sync" +) + +var ( + enc = binary.BigEndian +) + +const ( + lenWidth = 8 +) + +type store struct { + *os.File + mu sync.Mutex + buf *bufio.Writer + size uint64 +} + +func newStore(f *os.File) (*store, error) { + fi, err := os.Stat(f.Name()) + if err != nil { + return nil, err + } + size := uint64(fi.Size()) + return &store{ + File: f, + size: size, + buf: bufio.NewWriter(f), + }, nil +} + +func (s *store) Append(record *Record) (n uint64, pos uint64, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + reqBodyBytes := new(bytes.Buffer) + err = json.NewEncoder(reqBodyBytes).Encode(record) + if err != nil { + return 0, 0, err + } + + pos = s.size + + if err := binary.Write(s.buf, enc, uint64(reqBodyBytes.Len())); err != nil { + return 0, 0, err + } + + w, err := s.buf.Write(reqBodyBytes.Bytes()) + if err != nil { + return 0, 0, err + } + err = s.buf.Flush() + if err != nil { + return 0, 0, err + } + w += lenWidth + s.size += uint64(w) + return uint64(w), pos, nil +} + +func (s *store) Read(pos uint64) (*Record, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if err := s.buf.Flush(); err != nil { + return nil, err + } + size := make([]byte, lenWidth) + if _, err := s.File.ReadAt(size, int64(pos)); err != nil { + return nil, err + } + b := make([]byte, enc.Uint64(size)) + if _, err := s.File.ReadAt(b, int64(pos+lenWidth)); err != nil { + return nil, err + } + record := &Record{} + buffer := new(bytes.Buffer) + buffer.Write(b) + err := json.NewDecoder(buffer).Decode(record) + if err != nil { + return nil, err + } + + return record, nil +} + +func (s *store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.buf.Flush() + if err != nil { + return err + } + return s.File.Close() +} diff --git a/internal/log/store_test.go b/internal/log/store_test.go new file mode 100644 index 0000000..01a4ae7 --- /dev/null +++ b/internal/log/store_test.go @@ -0,0 +1,102 @@ +package log + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +var ( + write = []byte("hello world") +) + +func TestStoreAppendRead(t *testing.T) { + f, err := ioutil.TempFile("", "store_append_read_test") + require.NoError(t, err) + defer os.Remove(f.Name()) + + s, err := newStore(f) + require.NoError(t, err) + + testAppend(t, s) + testRead(t, s) + + s, err = newStore(f) + require.NoError(t, err) + testRead(t, s) +} + +func testAppend(t *testing.T, s *store) { + t.Helper() + for i := uint64(1); i < 4; i++ { + record := &Record{ + Value: write, + } + n, pos, err := s.Append(record) + require.NoError(t, err) + buffer := new(bytes.Buffer) + json.NewEncoder(buffer).Encode(record) + width := uint64(buffer.Len()) + lenWidth + require.Equal(t, pos+n, width*i) + } +} + +func testRead(t *testing.T, s *store) { + t.Helper() + var pos uint64 + for i := uint64(1); i < 4; i++ { + read, err := s.Read(pos) + require.NoError(t, err) + require.Equal(t, write, read.Value) + + buffer := new(bytes.Buffer) + json.NewEncoder(buffer).Encode(read) + width := uint64(buffer.Len()) + lenWidth + pos += width + } +} + +func TestStoreClose(t *testing.T) { + f, err := ioutil.TempFile("", "store_close_test") + require.NoError(t, err) + defer os.Remove(f.Name()) + + s, err := newStore(f) + require.NoError(t, err) + record := &Record{ + Value: write, + } + _, _, err = s.Append(record) + require.NoError(t, err) + + f, beforeSize, err := openFile(f.Name()) + require.NoError(t, err) + + err = s.Close() + require.NoError(t, err) + + _, afterSize, err := openFile(f.Name()) + require.NoError(t, err) + require.True(t, afterSize == beforeSize) +} + +func openFile(name string) (file *os.File, size int64, err error) { + f, err := os.OpenFile( + name, + os.O_RDWR|os.O_CREATE|os.O_APPEND, + 0644, + ) + if err != nil { + return nil, 0, err + } + + fi, err := f.Stat() + if err != nil { + return nil, 0, err + } + return f, fi.Size(), nil +}