Skip to content

Commit

Permalink
log module will be used for replication. (async)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brijeshlakkad committed Aug 20, 2022
1 parent 2665fe6 commit 04c7756
Show file tree
Hide file tree
Showing 11 changed files with 868 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.0
github.com/travisjeffery/go-dynaport v1.0.0
github.com/tysonmote/gommap v0.0.2
go.opencensus.io v0.23.0
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
8 changes: 7 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -420,6 +421,8 @@ github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=
github.com/travisjeffery/go-dynaport v1.0.0/go.mod h1:0LHuDS4QAx+mAc4ri3WkQdavgVoBIZ7cE9ob17KIAJk=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/tysonmote/gommap v0.0.2 h1:TNTjXaXxiLWuWVTU9BfSb1bAEvfrptf8m5+N3LyTd6Q=
github.com/tysonmote/gommap v0.0.2/go.mod h1:zZKhSp7mLDDzdl8MHbaDEJ3PH9VibPlFXV1t+4wmC00=
github.com/umpc/go-sortedmap v0.0.0-20180422175548-64ab94c482f4/go.mod h1:X6iKjXCleSyo/LZzKZ9zDF/ZB2L9gC36I5gLMf32w3M=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -932,8 +935,9 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4=
gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
Expand All @@ -958,6 +962,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 h1:Izowp2XBH6Ya6rv+hqbceQyw/gSGoXfH/UPoTGduL54=
launchpad.net/gocheck v0.0.0-20140225173054-000000000087/go.mod h1:hj7XX3B/0A+80Vse0e+BUHsHMTEhd0O4cpUHr/e/BUM=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
Expand Down
9 changes: 9 additions & 0 deletions internal/log/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package log

type Config struct {
Segment struct {
MaxStoreBytes uint64
MaxIndexBytes uint64
InitialOffset uint64
}
}
93 changes: 93 additions & 0 deletions internal/log/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package log

import (
"io"
"os"

"github.com/tysonmote/gommap"
)

var (
offWidth uint64 = 4
posWidth uint64 = 8
endWidth = offWidth + posWidth
)

type index struct {
file *os.File
mmap gommap.MMap
size uint64
}

func newIndex(f *os.File, c Config) (*index, error) {
idx := &index{
file: f,
}
fi, err := os.Stat(f.Name())
if err != nil {
return nil, err
}

idx.size = uint64(fi.Size())
if err = os.Truncate(
f.Name(), int64(c.Segment.MaxIndexBytes),
); err != nil {
return nil, err
}

if idx.mmap, err = gommap.Map(
idx.file.Fd(),
gommap.PROT_READ|gommap.PROT_WRITE,
gommap.MAP_SHARED,
); err != nil {
return nil, err
}
return idx, nil
}

func (i *index) Close() error {
if err := i.mmap.Sync(gommap.MS_SYNC); err != nil {
return err
}
if err := i.file.Sync(); err != nil {
return err
}
if err := i.file.Truncate(int64(i.size)); err != nil {
return err
}

return i.file.Close()
}

func (i *index) Read(in int64) (out uint32, pos uint64, err error) {
if i.size == 0 {
return 0, 0, io.EOF
}
if in == -1 {
out = uint32((i.size / endWidth) - 1)
} else {
out = uint32(in)
}
pos = uint64(out) * endWidth
if i.size < pos+endWidth {
return 0, 0, io.EOF
}
out = enc.Uint32(i.mmap[pos : pos+offWidth])
pos = enc.Uint64(i.mmap[pos+offWidth : pos+endWidth])
return out, pos, nil
}

func (i *index) Write(off uint32, pos uint64) error {
if uint64(len(i.mmap)) < i.size+endWidth {
return io.EOF
}
enc.PutUint32(i.mmap[i.size:i.size+offWidth], off)
enc.PutUint64(i.mmap[i.size+offWidth:i.size+endWidth], pos)

i.size += uint64(endWidth)
return nil
}

func (i *index) Name() string {
return i.file.Name()
}
55 changes: 55 additions & 0 deletions internal/log/index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package log

import (
"io"
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func TestIndex(t *testing.T) {
f, err := ioutil.TempFile(os.TempDir(), "index_test")
require.NoError(t, err)
defer os.Remove(f.Name())

c := Config{}
c.Segment.MaxIndexBytes = 1024
idx, err := newIndex(f, c)
require.NoError(t, err)
_, _, err = idx.Read(-1)
require.Error(t, err)
require.Equal(t, f.Name(), idx.Name())

entries := []struct {
Off uint32
Pos uint64
}{
{Off: 0, Pos: 0},
{Off: 1, Pos: 10},
}

for _, want := range entries {
err = idx.Write(want.Off, want.Pos)
require.NoError(t, err)

_, pos, err := idx.Read(int64(want.Off))
require.NoError(t, err)
require.Equal(t, want.Pos, pos)
}

// index and scanner should error when reading past existing entries
_, _, err = idx.Read(int64(len(entries)))
require.Equal(t, io.EOF, err)
_ = idx.Close()

// index should build its state from the existing file
f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600)
idx, err = newIndex(f, c)
require.NoError(t, err)
off, pos, err := idx.Read(-1)
require.NoError(t, err)
require.Equal(t, uint32(1), off)
require.Equal(t, entries[1].Pos, pos)
}
Loading

0 comments on commit 04c7756

Please sign in to comment.