From cc6eb8896196dd0b97b6a41a5b5ccecc434a06f8 Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Wed, 16 May 2018 22:41:16 -0400 Subject: [PATCH 1/2] Re-work segment.Fiels/Terms to use iterators --- .excludemetalint | 2 +- generated-source-files.mk | 20 ++- index/segment/fs/fs_mock.go | 25 +++- index/segment/fs/fst_terms_iterator.go | 124 ++++++++++++++++++ index/segment/fs/options.go | 106 +++++++++++++++ index/segment/fs/segment.go | 102 +++++--------- index/segment/fs/writer.go | 71 +++++++--- index/segment/fs/writer_reader_test.go | 56 +++++--- index/segment/mem/bytes_slice_iterator.go | 84 ++++++++++++ index/segment/mem/concurrent_postings_map.go | 6 +- .../mem/concurrent_postings_map_test.go | 24 +++- index/segment/mem/options.go | 45 +++++-- index/segment/mem/segment.go | 4 +- index/segment/mem/segment_test.go | 6 +- index/segment/mem/terms_dict.go | 9 +- index/segment/mem/terms_dict_test.go | 6 +- index/segment/mem/types.go | 5 +- index/segment/segment_mock.go | 16 +-- index/segment/types.go | 33 ++++- persist/reader.go | 2 +- persist/reader_test.go | 8 +- x/bytes/slice_arraypool_gen.go | 107 +++++++++++++++ 22 files changed, 698 insertions(+), 163 deletions(-) create mode 100644 index/segment/fs/fst_terms_iterator.go create mode 100644 index/segment/fs/options.go create mode 100644 index/segment/mem/bytes_slice_iterator.go create mode 100644 x/bytes/slice_arraypool_gen.go diff --git a/.excludemetalint b/.excludemetalint index 8edf32e..44d2b57 100644 --- a/.excludemetalint +++ b/.excludemetalint @@ -2,4 +2,4 @@ vendor/ generated/ _mock.go .pb.go -index/segment/mem/ids_map_gen.go +_gen.go diff --git a/generated-source-files.mk b/generated-source-files.mk index a7acae7..ef27267 100644 --- a/generated-source-files.mk +++ b/generated-source-files.mk @@ -17,7 +17,7 @@ install-m3x-repo: install-glide install-generics-bin # Generation rule for all generated types .PHONY: genny-all -genny-all: genny-map-all +genny-all: genny-map-all genny-arraypool-all # Tests that all currently generated types match their contents if they were regenerated .PHONY: test-genny-all @@ -115,3 +115,21 @@ genny-map-segment-fs-fst-terms-offset: install-m3x-repo rename_type_prefix=fstTermsOffsets # Rename generated map file mv -f $(m3ninx_package_path)/index/segment/fs/map_gen.go $(m3ninx_package_path)/index/segment/fs/fst_terms_offsets_map_gen.go + + +# generation rule for all generated arraypools +.PHONY: genny-arraypool-all +genny-arraypool-all: \ + genny-arraypool-bytes-slice-array-pool + +# arraypool generation rule for ./x/bytes.SliceArrayPool +.PHONY: genny-arraypool-bytes-slice-array-pool +genny-arraypool-bytes-slice-array-pool: install-m3x-repo + cd $(m3x_package_path) && make genny-arraypool \ + pkg=bytes \ + elem_type=[]byte \ + target_package=$(m3ninx_package)/x/bytes \ + out_file=slice_arraypool_gen.go \ + rename_type_prefix=Slice \ + rename_type_middle=Slice \ + rename_constructor=NewSliceArrayPool diff --git a/index/segment/fs/fs_mock.go b/index/segment/fs/fs_mock.go index e3e01c6..377f7c6 100644 --- a/index/segment/fs/fs_mock.go +++ b/index/segment/fs/fs_mock.go @@ -192,9 +192,9 @@ func (m *MockSegment) EXPECT() *MockSegmentMockRecorder { } // AllDocs mocks base method -func (m *MockSegment) AllDocs() (doc.Iterator, error) { +func (m *MockSegment) AllDocs() (index.IDDocIterator, error) { ret := m.ctrl.Call(m, "AllDocs") - ret0, _ := ret[0].(doc.Iterator) + ret0, _ := ret[0].(index.IDDocIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -229,6 +229,19 @@ func (mr *MockSegmentMockRecorder) ContainsID(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ContainsID", reflect.TypeOf((*MockSegment)(nil).ContainsID), arg0) } +// Doc mocks base method +func (m *MockSegment) Doc(arg0 postings.ID) (doc.Document, error) { + ret := m.ctrl.Call(m, "Doc", arg0) + ret0, _ := ret[0].(doc.Document) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Doc indicates an expected call of Doc +func (mr *MockSegmentMockRecorder) Doc(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Doc", reflect.TypeOf((*MockSegment)(nil).Doc), arg0) +} + // Docs mocks base method func (m *MockSegment) Docs(arg0 postings.List) (doc.Iterator, error) { ret := m.ctrl.Call(m, "Docs", arg0) @@ -243,9 +256,9 @@ func (mr *MockSegmentMockRecorder) Docs(arg0 interface{}) *gomock.Call { } // Fields mocks base method -func (m *MockSegment) Fields() ([][]byte, error) { +func (m *MockSegment) Fields() (segment.FieldsIterator, error) { ret := m.ctrl.Call(m, "Fields") - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(segment.FieldsIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -320,9 +333,9 @@ func (mr *MockSegmentMockRecorder) Size() *gomock.Call { } // Terms mocks base method -func (m *MockSegment) Terms(arg0 []byte) ([][]byte, error) { +func (m *MockSegment) Terms(arg0 []byte) (segment.TermsIterator, error) { ret := m.ctrl.Call(m, "Terms", arg0) - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(segment.TermsIterator) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/index/segment/fs/fst_terms_iterator.go b/index/segment/fs/fst_terms_iterator.go new file mode 100644 index 0000000..da1c7eb --- /dev/null +++ b/index/segment/fs/fst_terms_iterator.go @@ -0,0 +1,124 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + sgmt "github.com/m3db/m3ninx/index/segment" + "github.com/m3db/m3x/context" + xerrors "github.com/m3db/m3x/errors" + + "github.com/couchbase/vellum" +) + +type newFstTermsIterOpts struct { + ctx context.Context + opts Options + fst *vellum.FST + finalizeFST bool +} + +func (o *newFstTermsIterOpts) Close() error { + o.ctx.Close() + if o.finalizeFST { + return o.fst.Close() + } + return nil +} + +func newFSTTermsIter(opts newFstTermsIterOpts) *fstTermsIter { + return &fstTermsIter{iterOpts: opts} +} + +// nolint: maligned +type fstTermsIter struct { + iterOpts newFstTermsIterOpts + + initialized bool + iter *vellum.FSTIterator + err error + done bool + + current []byte + currentValue uint64 +} + +var _ sgmt.OrderedBytesSliceIterator = &fstTermsIter{} + +func (f *fstTermsIter) Next() bool { + if f.done || f.err != nil { + return false + } + + var err error + + if !f.initialized { + f.initialized = true + f.iter = &vellum.FSTIterator{} + err = f.iter.Reset(f.iterOpts.fst, minByteKey, maxByteKey, nil) + } else { + err = f.iter.Next() + } + + if err != nil { + f.done = true + if err != vellum.ErrIteratorDone { + f.err = err + } + return false + } + + nextBytes, nextValue := f.iter.Current() + // taking a copy of the bytes to avoid referring to mmap'd memory + // TODO(prateek): back this by a bytes pool. + f.current = append([]byte(nil), nextBytes...) + f.currentValue = nextValue + return true +} + +func (f *fstTermsIter) CurrentExtended() ([]byte, uint64) { + return f.current, f.currentValue +} + +func (f *fstTermsIter) Current() []byte { + return f.current +} + +func (f *fstTermsIter) Err() error { + return f.err +} + +func (f *fstTermsIter) Close() error { + f.current = nil + + multiErr := xerrors.MultiError{} + + if f.iter != nil { + multiErr = multiErr.Add(f.iter.Close()) + } + f.iter = nil + + multiErr = multiErr.Add(f.iterOpts.Close()) + return multiErr.FinalError() +} + +type noOpCloser struct{} + +func (n noOpCloser) Close() error { return nil } diff --git a/index/segment/fs/options.go b/index/segment/fs/options.go new file mode 100644 index 0000000..358c0ad --- /dev/null +++ b/index/segment/fs/options.go @@ -0,0 +1,106 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "github.com/m3db/m3ninx/postings" + "github.com/m3db/m3ninx/postings/roaring" + "github.com/m3db/m3ninx/x/bytes" + + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" +) + +const ( + defaultBytesArrayPoolCapacity = 1024 +) + +// Options is a collection of knobs for a fs segment. +type Options interface { + // SetInstrumentOptions sets the instrument options. + SetInstrumentOptions(value instrument.Options) Options + + // InstrumentOptions returns the instrument options. + InstrumentOptions() instrument.Options + + // SetBytesSliceArrayPool sets the bytes slice array pool. + SetBytesSliceArrayPool(value bytes.SliceArrayPool) Options + + // BytesSliceArrayPool returns the bytes slice array pool. + BytesSliceArrayPool() bytes.SliceArrayPool + + // SetPostingsListPool sets the postings list pool. + SetPostingsListPool(value postings.Pool) Options + + // PostingsListPool returns the postings list pool. + PostingsListPool() postings.Pool +} + +type opts struct { + iopts instrument.Options + bytesSliceArrPool bytes.SliceArrayPool + postingsPool postings.Pool +} + +// NewOptions returns new options. +func NewOptions() Options { + arrPool := bytes.NewSliceArrayPool(bytes.SliceArrayPoolOpts{ + Capacity: defaultBytesArrayPoolCapacity, + Options: pool.NewObjectPoolOptions(), + }) + arrPool.Init() + + return &opts{ + iopts: instrument.NewOptions(), + bytesSliceArrPool: arrPool, + postingsPool: postings.NewPool(nil, roaring.NewPostingsList), + } +} + +func (o *opts) SetInstrumentOptions(v instrument.Options) Options { + opts := *o + opts.iopts = v + return &opts +} + +func (o *opts) InstrumentOptions() instrument.Options { + return o.iopts +} + +func (o *opts) SetBytesSliceArrayPool(value bytes.SliceArrayPool) Options { + opts := *o + opts.bytesSliceArrPool = value + return &opts +} + +func (o *opts) BytesSliceArrayPool() bytes.SliceArrayPool { + return o.bytesSliceArrPool +} + +func (o *opts) SetPostingsListPool(v postings.Pool) Options { + opts := *o + opts.postingsPool = v + return &opts +} + +func (o *opts) PostingsListPool() postings.Pool { + return o.postingsPool +} diff --git a/index/segment/fs/segment.go b/index/segment/fs/segment.go index 0587f4f..42540db 100644 --- a/index/segment/fs/segment.go +++ b/index/segment/fs/segment.go @@ -30,12 +30,14 @@ import ( "github.com/m3db/m3ninx/doc" "github.com/m3db/m3ninx/generated/proto/fswriter" "github.com/m3db/m3ninx/index" + sgmt "github.com/m3db/m3ninx/index/segment" "github.com/m3db/m3ninx/index/segment/fs/encoding" "github.com/m3db/m3ninx/index/segment/fs/encoding/docs" "github.com/m3db/m3ninx/postings" "github.com/m3db/m3ninx/postings/pilosa" "github.com/m3db/m3ninx/postings/roaring" "github.com/m3db/m3ninx/x" + "github.com/m3db/m3x/context" "github.com/couchbase/vellum" vregex "github.com/couchbaselabs/vellum/regexp" @@ -95,13 +97,8 @@ func (sd SegmentData) Validate() error { return nil } -// NewSegmentOpts represent the collection of knobs used by the Segment. -type NewSegmentOpts struct { - PostingsListPool postings.Pool -} - // NewSegment returns a new Segment backed by the provided options. -func NewSegment(data SegmentData, opts NewSegmentOpts) (Segment, error) { +func NewSegment(data SegmentData, opts Options) (Segment, error) { if err := data.Validate(); err != nil { return nil, err } @@ -132,6 +129,7 @@ func NewSegment(data SegmentData, opts NewSegmentOpts) (Segment, error) { docsDataReader := docs.NewDataReader(data.DocsData) return &fsSegment{ + ctx: context.NewContext(), fieldsFST: fieldsFST, docsDataReader: docsDataReader, docsIndexReader: docsIndexReader, @@ -146,14 +144,13 @@ func NewSegment(data SegmentData, opts NewSegmentOpts) (Segment, error) { type fsSegment struct { sync.RWMutex - closed bool - + ctx context.Context + closed bool fieldsFST *vellum.FST docsDataReader *docs.DataReader docsIndexReader *docs.IndexReader - - data SegmentData - opts NewSegmentOpts + data SegmentData + opts Options numDocs int64 startInclusive postings.ID @@ -209,24 +206,29 @@ func (r *fsSegment) Close() error { return errReaderClosed } r.closed = true + // ensure all references that depened on this segment are released + r.ctx.BlockingClose() return r.fieldsFST.Close() } -// FOLLOWUP(prateek): really need to change the types returned in Fields() and Terms() -// to be iterators to allow us to pool the bytes being returned to the user. Otherwise -// we're forced to copy these massive slices every time. Tracking this under -// https://github.com/m3db/m3ninx/issues/66 -func (r *fsSegment) Fields() ([][]byte, error) { +func (r *fsSegment) Fields() (sgmt.FieldsIterator, error) { r.RLock() defer r.RUnlock() if r.closed { return nil, errReaderClosed } - return r.allKeys(r.fieldsFST) + newCtx := context.NewContext() + r.ctx.DependsOn(newCtx) + return newFSTTermsIter(newFstTermsIterOpts{ + ctx: newCtx, + opts: r.opts, + fst: r.fieldsFST, + finalizeFST: false, + }), nil } -func (r *fsSegment) Terms(field []byte) ([][]byte, error) { +func (r *fsSegment) Terms(field []byte) (sgmt.TermsIterator, error) { r.RLock() defer r.RUnlock() if r.closed { @@ -238,19 +240,14 @@ func (r *fsSegment) Terms(field []byte) ([][]byte, error) { return nil, err } - fstCloser := x.NewSafeCloser(termsFST) - defer fstCloser.Close() - - terms, err := r.allKeys(termsFST) - if err != nil { - return nil, err - } - - if err := fstCloser.Close(); err != nil { - return nil, err - } - - return terms, nil + newCtx := context.NewContext() + r.ctx.DependsOn(newCtx) + return newFSTTermsIter(newFstTermsIterOpts{ + ctx: newCtx, + opts: r.opts, + fst: termsFST, + finalizeFST: true, + }), nil } func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) { @@ -274,7 +271,7 @@ func (r *fsSegment) MatchTerm(field []byte, term []byte) (postings.List, error) if !exists { // i.e. we don't know anything about the term, so can early return an empty postings list - return r.opts.PostingsListPool.Get(), nil + return r.opts.PostingsListPool().Get(), nil } pl, err := r.retrievePostingsListWithRLock(postingsOffset) @@ -308,7 +305,7 @@ func (r *fsSegment) MatchRegexp(field []byte, regexp []byte, compiled *regexp.Re var ( fstCloser = x.NewSafeCloser(termsFST) - pl = r.opts.PostingsListPool.Get() + pl = r.opts.PostingsListPool().Get() iter, iterErr = termsFST.Search(re, minByteKey, maxByteKey) iterCloser = x.NewSafeCloser(iter) ) @@ -356,7 +353,7 @@ func (r *fsSegment) MatchAll() (postings.MutableList, error) { return nil, errReaderClosed } - pl := r.opts.PostingsListPool.Get() + pl := r.opts.PostingsListPool().Get() pl.AddRange(r.startInclusive, r.endExclusive) return pl, nil @@ -406,32 +403,6 @@ func (r *fsSegment) retrievePostingsListWithRLock(postingsOffset uint64) (postin return pilosa.Unmarshal(postingsBytes, roaring.NewPostingsList) } -func (r *fsSegment) allKeys(fst *vellum.FST) ([][]byte, error) { - num := fst.Len() - keys := make([][]byte, 0, num) - - iter, iterErr := fst.Iterator(minByteKey, maxByteKey) - safeCloser := x.NewSafeCloser(iter) - defer safeCloser.Close() - - for { - if iterErr == vellum.ErrIteratorDone { - break - } - if iterErr != nil { - return nil, iterErr - } - key, _ := iter.Current() - keys = append(keys, r.copyBytes(key)) - iterErr = iter.Next() - } - - if err := safeCloser.Close(); err != nil { - return nil, err - } - return keys, nil -} - func (r *fsSegment) retrieveTermsFSTWithRLock(field []byte) (*vellum.FST, error) { termsFSTOffset, exists, err := r.fieldsFST.Get(field) if err != nil { @@ -571,14 +542,3 @@ func (sr *fsSegmentReader) Close() error { sr.closed = true return nil } - -// copyBytes returns a copy of the provided bytes. We need to do this as the bytes -// backing the fsSegment are mmap-ed, and maintain their lifecycle exclusive from those -// owned by users. -// FOLLOWUP(prateek): return iterator types at all exit points of Reader, and -// then we can pool the bytes below. -func (r *fsSegment) copyBytes(b []byte) []byte { - copied := make([]byte, len(b)) - copy(copied, b) - return copied -} diff --git a/index/segment/fs/writer.go b/index/segment/fs/writer.go index dcd2dbe..65b0a52 100644 --- a/index/segment/fs/writer.go +++ b/index/segment/fs/writer.go @@ -21,11 +21,9 @@ package fs import ( - "bytes" "errors" "fmt" "io" - "sort" "github.com/m3db/m3ninx/doc" "github.com/m3db/m3ninx/generated/proto/fswriter" @@ -192,7 +190,9 @@ func (w *writer) WritePostingsOffsets(iow io.Writer) error { } // for each known field - for _, f := range fields { + for fields.Next() { + f := fields.Current() + // retrieve known terms for current field terms, err := w.seg.Terms(f) if err != nil { @@ -200,7 +200,8 @@ func (w *writer) WritePostingsOffsets(iow io.Writer) error { } // for each term corresponding to the current field - for _, t := range terms { + for terms.Next() { + t := terms.Current() // retrieve the postings list for this (field, term) combination pl, err := w.segReader.MatchTerm(f, t) if err != nil { @@ -225,6 +226,22 @@ func (w *writer) WritePostingsOffsets(iow io.Writer) error { // track current offset as the offset for the current field/term w.addPostingsOffset(currentOffset, f, t) } + + if err := terms.Err(); err != nil { + return err + } + + if err := terms.Close(); err != nil { + return err + } + } + + if err := fields.Err(); err != nil { + return err + } + + if err := fields.Close(); err != nil { + return err } w.postingsFileWritten = true @@ -246,7 +263,9 @@ func (w *writer) WriteFSTTerms(iow io.Writer) error { } // build a fst for each field's terms - for _, f := range fields { + for fields.Next() { + f := fields.Current() + // reset writer for this field's fst if err := w.fstWriter.Reset(iow); err != nil { return err @@ -258,11 +277,10 @@ func (w *writer) WriteFSTTerms(iow io.Writer) error { return err } - // inserts into the fst have to be lexicographically ordered - sortSliceOfByteSlices(terms) - // for each term corresponding to this field - for _, t := range terms { + for terms.Next() { + t := terms.Current() + // retieve postsings offset for the current field,term po, err := w.getPostingsOffset(f, t) if err != nil { @@ -274,6 +292,13 @@ func (w *writer) WriteFSTTerms(iow io.Writer) error { return err } } + if err := terms.Err(); err != nil { + return err + } + + if err := terms.Close(); err != nil { + return err + } // retrieve a serialized representation of the field's fst numBytesFST, err := w.fstWriter.Close() @@ -294,6 +319,14 @@ func (w *writer) WriteFSTTerms(iow io.Writer) error { w.addFSTTermsOffset(currentOffset, f) } + if err := fields.Err(); err != nil { + return err + } + + if err := fields.Close(); err != nil { + return err + } + // all good! w.fstTermsFileWritten = true return nil @@ -315,11 +348,9 @@ func (w *writer) WriteFSTFields(iow io.Writer) error { return err } - // inserts into the fst have to be lexicographically ordered - sortSliceOfByteSlices(fields) - // insert each field into fst - for _, f := range fields { + for fields.Next() { + f := fields.Current() // get offset for this field's term fst offset, err := w.getFSTTermsOffset(f) if err != nil { @@ -332,6 +363,14 @@ func (w *writer) WriteFSTFields(iow io.Writer) error { } } + if err := fields.Err(); err != nil { + return err + } + + if err := fields.Close(); err != nil { + return err + } + // flush the fst writer _, err = w.fstWriter.Close() return err @@ -407,12 +446,6 @@ func (w *writer) getPostingsOffset(name, value []byte) (uint64, error) { return offset, nil } -func sortSliceOfByteSlices(b [][]byte) { - sort.Slice(b, func(i, j int) bool { - return bytes.Compare(b[i], b[j]) < 0 - }) -} - func defaultV1Metadata() fswriter.Metadata { return fswriter.Metadata{ PostingsFormat: fswriter.PostingsFormat_PILOSAV1_POSTINGS_FORMAT, diff --git a/index/segment/fs/writer_reader_test.go b/index/segment/fs/writer_reader_test.go index ecaf975..8e64801 100644 --- a/index/segment/fs/writer_reader_test.go +++ b/index/segment/fs/writer_reader_test.go @@ -31,12 +31,13 @@ import ( "github.com/m3db/m3ninx/index/segment/mem" "github.com/m3db/m3ninx/index/util" "github.com/m3db/m3ninx/postings" - "github.com/m3db/m3ninx/postings/roaring" "github.com/stretchr/testify/require" ) var ( + testOptions = NewOptions() + fewTestDocuments = []doc.Document{ doc.Document{ Fields: []doc.Field{ @@ -115,11 +116,13 @@ func TestFieldsEquals(t *testing.T) { t.Run(test.name, func(t *testing.T) { memSeg, fstSeg := newTestSegments(t, test.docs) - memFields, err := memSeg.Fields() + memFieldsIter, err := memSeg.Fields() require.NoError(t, err) + memFields := toSlice(t, memFieldsIter) - fstFields, err := fstSeg.Fields() + fstFieldsIter, err := fstSeg.Fields() require.NoError(t, err) + fstFields := toSlice(t, fstFieldsIter) assertSliceOfByteSlicesEqual(t, memFields, fstFields) @@ -132,17 +135,21 @@ func TestTermEquals(t *testing.T) { t.Run(test.name, func(t *testing.T) { memSeg, fstSeg := newTestSegments(t, test.docs) - memFields, err := memSeg.Fields() + memFieldsIter, err := memSeg.Fields() require.NoError(t, err) - fstFields, err := fstSeg.Fields() + memFields := toSlice(t, memFieldsIter) + fstFieldsIter, err := fstSeg.Fields() require.NoError(t, err) + fstFields := toSlice(t, fstFieldsIter) assertTermEquals := func(fields [][]byte) { for _, f := range fields { - memTerms, err := memSeg.Terms(f) + memTermsIter, err := memSeg.Terms(f) require.NoError(t, err) - fstTerms, err := fstSeg.Terms(f) + memTerms := toSlice(t, memTermsIter) + fstTermsIter, err := fstSeg.Terms(f) require.NoError(t, err) + fstTerms := toSlice(t, fstTermsIter) assertSliceOfByteSlicesEqual(t, memTerms, fstTerms) } } @@ -162,12 +169,14 @@ func TestPostingsListEqualForMatchTerm(t *testing.T) { fstReader, err := fstSeg.Reader() require.NoError(t, err) - memFields, err := memSeg.Fields() + memFieldsIter, err := memSeg.Fields() require.NoError(t, err) + memFields := toSlice(t, memFieldsIter) for _, f := range memFields { - memTerms, err := memSeg.Terms(f) + memTermsIter, err := memSeg.Terms(f) require.NoError(t, err) + memTerms := toSlice(t, memTermsIter) for _, term := range memTerms { memPl, err := memReader.MatchTerm(f, term) @@ -186,8 +195,9 @@ func TestPostingsListContainsID(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { memSeg, fstSeg := newTestSegments(t, test.docs) - memIDs, err := memSeg.Terms(doc.IDReservedFieldName) + memIDsIter, err := memSeg.Terms(doc.IDReservedFieldName) require.NoError(t, err) + memIDs := toSlice(t, memIDsIter) for _, i := range memIDs { ok, err := fstSeg.ContainsID(i) require.NoError(t, err) @@ -201,8 +211,9 @@ func TestPostingsListRegexAll(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { memSeg, fstSeg := newTestSegments(t, test.docs) - fields, err := memSeg.Fields() + fieldsIter, err := memSeg.Fields() require.NoError(t, err) + fields := toSlice(t, fieldsIter) for _, f := range fields { reader, err := memSeg.Reader() require.NoError(t, err) @@ -228,12 +239,14 @@ func TestSegmentDocs(t *testing.T) { fstReader, err := fstSeg.Reader() require.NoError(t, err) - memFields, err := memSeg.Fields() + memFieldsIter, err := memSeg.Fields() require.NoError(t, err) + memFields := toSlice(t, memFieldsIter) for _, f := range memFields { - memTerms, err := memSeg.Terms(f) + memTermsIter, err := memSeg.Terms(f) require.NoError(t, err) + memTerms := toSlice(t, memTermsIter) for _, term := range memTerms { memPl, err := memReader.MatchTerm(f, term) @@ -343,10 +356,7 @@ func newFSTSegment(t *testing.T, s sgmt.MutableSegment) sgmt.Segment { FSTTermsData: fstTermsBuffer.Bytes(), FSTFieldsData: fstFieldsBuffer.Bytes(), } - opts := NewSegmentOpts{ - PostingsListPool: postings.NewPool(nil, roaring.NewPostingsList), - } - reader, err := NewSegment(data, opts) + reader, err := NewSegment(data, testOptions) require.NoError(t, err) return reader @@ -354,8 +364,6 @@ func newFSTSegment(t *testing.T, s sgmt.MutableSegment) sgmt.Segment { func assertSliceOfByteSlicesEqual(t *testing.T, a, b [][]byte) { require.Equal(t, len(a), len(b), fmt.Sprintf("a = [%s], b = [%s]", pprint(a), pprint(b))) - sortSliceOfByteSlices(a) - sortSliceOfByteSlices(b) require.Equal(t, a, b) } @@ -410,3 +418,13 @@ func pprintIter(pl postings.List) string { } return buf.String() } + +func toSlice(t *testing.T, iter sgmt.OrderedBytesSliceIterator) [][]byte { + elems := [][]byte{} + for iter.Next() { + elems = append(elems, iter.Current()) + } + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) + return elems +} diff --git a/index/segment/mem/bytes_slice_iterator.go b/index/segment/mem/bytes_slice_iterator.go new file mode 100644 index 0000000..7a94da3 --- /dev/null +++ b/index/segment/mem/bytes_slice_iterator.go @@ -0,0 +1,84 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package mem + +import ( + "bytes" + "sort" + + sgmt "github.com/m3db/m3ninx/index/segment" +) + +type bytesSliceIter struct { + err error + done bool + + currentIdx int + current []byte + backingSlice [][]byte + opts Options +} + +var _ sgmt.FieldsIterator = &bytesSliceIter{} +var _ sgmt.TermsIterator = &bytesSliceIter{} + +// TODO(prateek): add tests including sorted order guarantees here. +func newBytesSliceIter(slice [][]byte, opts Options) *bytesSliceIter { + sortSliceOfByteSlices(slice) + return &bytesSliceIter{ + currentIdx: -1, + backingSlice: slice, + opts: opts, + } +} + +func (b *bytesSliceIter) Next() bool { + if b.done || b.err != nil { + return false + } + b.currentIdx++ + if b.currentIdx >= len(b.backingSlice) { + b.done = true + return false + } + b.current = b.backingSlice[b.currentIdx] + return true +} + +func (b *bytesSliceIter) Current() []byte { + return b.current +} + +func (b *bytesSliceIter) Err() error { + return nil +} + +func (b *bytesSliceIter) Close() error { + b.current = nil + b.opts.BytesSliceArrayPool().Put(b.backingSlice) + return nil +} + +func sortSliceOfByteSlices(b [][]byte) { + sort.Slice(b, func(i, j int) bool { + return bytes.Compare(b[i], b[j]) < 0 + }) +} diff --git a/index/segment/mem/concurrent_postings_map.go b/index/segment/mem/concurrent_postings_map.go index 4a76516..c4ec05e 100644 --- a/index/segment/mem/concurrent_postings_map.go +++ b/index/segment/mem/concurrent_postings_map.go @@ -78,14 +78,14 @@ func (m *concurrentPostingsMap) Add(key []byte, id postings.ID) { } // Keys returns the keys known to the map. -func (m *concurrentPostingsMap) Keys() [][]byte { +func (m *concurrentPostingsMap) Keys() *bytesSliceIter { m.RLock() defer m.RUnlock() - keys := make([][]byte, 0, m.Len()) + keys := m.opts.BytesSliceArrayPool().Get() for _, entry := range m.Iter() { keys = append(keys, entry.Key()) } - return keys + return newBytesSliceIter(keys, m.opts) } // Get returns the postings.List backing `key`. diff --git a/index/segment/mem/concurrent_postings_map_test.go b/index/segment/mem/concurrent_postings_map_test.go index c545c5a..8aa0011 100644 --- a/index/segment/mem/concurrent_postings_map_test.go +++ b/index/segment/mem/concurrent_postings_map_test.go @@ -26,6 +26,8 @@ import ( "sort" "testing" + sgmt "github.com/m3db/m3ninx/index/segment" + "github.com/stretchr/testify/require" ) @@ -63,7 +65,7 @@ func TestConcurrentPostingsMapKeys(t *testing.T) { opts := NewOptions() pm := newConcurrentPostingsMap(opts) - keys := pm.Keys() + keys := toSlice(t, pm.Keys()) require.Empty(t, keys) var ( @@ -73,20 +75,32 @@ func TestConcurrentPostingsMapKeys(t *testing.T) { ) pm.Add(foo, 1) - keys = pm.Keys() + + keys = toSlice(t, pm.Keys()) require.Equal(t, [][]byte{foo}, sortKeys(keys)) pm.Add(bar, 2) - keys = pm.Keys() + keys = toSlice(t, pm.Keys()) require.Equal(t, [][]byte{bar, foo}, sortKeys(keys)) pm.Add(foo, 3) - keys = pm.Keys() + keys = toSlice(t, pm.Keys()) require.Equal(t, [][]byte{bar, foo}, sortKeys(keys)) pm.Add(baz, 4) - keys = pm.Keys() + keys = toSlice(t, pm.Keys()) require.Equal(t, [][]byte{bar, baz, foo}, sortKeys(keys)) + +} + +func toSlice(t *testing.T, iter sgmt.OrderedBytesSliceIterator) [][]byte { + elems := [][]byte{} + for iter.Next() { + elems = append(elems, iter.Current()) + } + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) + return elems } func sortKeys(keys [][]byte) [][]byte { diff --git a/index/segment/mem/options.go b/index/segment/mem/options.go index b9900c0..4365446 100644 --- a/index/segment/mem/options.go +++ b/index/segment/mem/options.go @@ -24,12 +24,15 @@ import ( "github.com/m3db/m3ninx/index/util" "github.com/m3db/m3ninx/postings" "github.com/m3db/m3ninx/postings/roaring" + "github.com/m3db/m3ninx/x/bytes" "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" ) const ( - defaultInitialCapacity = 1024 + defaultInitialCapacity = 1024 + defaultBytesArrayPoolCapacity = 1024 ) // Options is a collection of knobs for an in-memory segment. @@ -40,6 +43,12 @@ type Options interface { // InstrumentOptions returns the instrument options. InstrumentOptions() instrument.Options + // SetBytesSliceArrayPool sets the bytes slice array pool. + SetBytesSliceArrayPool(value bytes.SliceArrayPool) Options + + // BytesSliceArrayPool returns the bytes slice array pool. + BytesSliceArrayPool() bytes.SliceArrayPool + // SetPostingsListPool sets the postings list pool. SetPostingsListPool(value postings.Pool) Options @@ -60,19 +69,27 @@ type Options interface { } type opts struct { - iopts instrument.Options - postingsPool postings.Pool - initialCapacity int - newUUIDFn util.NewUUIDFn + iopts instrument.Options + bytesSliceArrPool bytes.SliceArrayPool + postingsPool postings.Pool + initialCapacity int + newUUIDFn util.NewUUIDFn } // NewOptions returns new options. func NewOptions() Options { + arrPool := bytes.NewSliceArrayPool(bytes.SliceArrayPoolOpts{ + Capacity: defaultBytesArrayPoolCapacity, + Options: pool.NewObjectPoolOptions(), + }) + arrPool.Init() + return &opts{ - iopts: instrument.NewOptions(), - postingsPool: postings.NewPool(nil, roaring.NewPostingsList), - initialCapacity: defaultInitialCapacity, - newUUIDFn: util.NewUUID, + iopts: instrument.NewOptions(), + bytesSliceArrPool: arrPool, + postingsPool: postings.NewPool(nil, roaring.NewPostingsList), + initialCapacity: defaultInitialCapacity, + newUUIDFn: util.NewUUID, } } @@ -86,6 +103,16 @@ func (o *opts) InstrumentOptions() instrument.Options { return o.iopts } +func (o *opts) SetBytesSliceArrayPool(value bytes.SliceArrayPool) Options { + opts := *o + opts.bytesSliceArrPool = value + return &opts +} + +func (o *opts) BytesSliceArrayPool() bytes.SliceArrayPool { + return o.bytesSliceArrPool +} + func (o *opts) SetPostingsListPool(v postings.Pool) Options { opts := *o opts.postingsPool = v diff --git a/index/segment/mem/segment.go b/index/segment/mem/segment.go index 65f62dc..3e9b8ff 100644 --- a/index/segment/mem/segment.go +++ b/index/segment/mem/segment.go @@ -395,7 +395,7 @@ func (s *segment) Seal() (sgmt.Segment, error) { return s, nil } -func (s *segment) Fields() ([][]byte, error) { +func (s *segment) Fields() (sgmt.FieldsIterator, error) { s.state.RLock() defer s.state.RUnlock() if err := s.checkIsSealedWithRLock(); err != nil { @@ -404,7 +404,7 @@ func (s *segment) Fields() ([][]byte, error) { return s.termsDict.Fields(), nil } -func (s *segment) Terms(name []byte) ([][]byte, error) { +func (s *segment) Terms(name []byte) (sgmt.TermsIterator, error) { s.state.RLock() defer s.state.RUnlock() if err := s.checkIsSealedWithRLock(); err != nil { diff --git a/index/segment/mem/segment_test.go b/index/segment/mem/segment_test.go index 9e6ac39..c7c3ca4 100644 --- a/index/segment/mem/segment_test.go +++ b/index/segment/mem/segment_test.go @@ -741,9 +741,10 @@ func TestSegmentFields(t *testing.T) { seg, err := segment.Seal() require.NoError(t, err) - fields, err := seg.Fields() + fieldsIter, err := seg.Fields() require.NoError(t, err) + fields := toSlice(t, fieldsIter) for _, f := range fields { delete(knownsFields, string(f)) } @@ -772,8 +773,9 @@ func TestSegmentTerms(t *testing.T) { require.NoError(t, err) for field, expectedTerms := range knownsFields { - terms, err := segment.Terms([]byte(field)) + termsIter, err := segment.Terms([]byte(field)) require.NoError(t, err) + terms := toSlice(t, termsIter) for _, term := range terms { delete(expectedTerms, string(term)) } diff --git a/index/segment/mem/terms_dict.go b/index/segment/mem/terms_dict.go index d56a06c..c7cbae3 100644 --- a/index/segment/mem/terms_dict.go +++ b/index/segment/mem/terms_dict.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/m3db/m3ninx/doc" + sgmt "github.com/m3db/m3ninx/index/segment" "github.com/m3db/m3ninx/postings" ) @@ -64,17 +65,17 @@ func (d *termsDict) MatchTerm(field, term []byte) postings.List { return pl } -func (d *termsDict) Fields() [][]byte { +func (d *termsDict) Fields() sgmt.FieldsIterator { d.fields.RLock() defer d.fields.RUnlock() - fields := make([][]byte, 0, d.fields.Len()) + fields := d.opts.BytesSliceArrayPool().Get() for _, entry := range d.fields.Iter() { fields = append(fields, entry.Key()) } - return fields + return newBytesSliceIter(fields, d.opts) } -func (d *termsDict) Terms(field []byte) [][]byte { +func (d *termsDict) Terms(field []byte) sgmt.TermsIterator { d.fields.RLock() defer d.fields.RUnlock() values, ok := d.fields.Get(field) diff --git a/index/segment/mem/terms_dict_test.go b/index/segment/mem/terms_dict_test.go index 0626d45..dead6a9 100644 --- a/index/segment/mem/terms_dict_test.go +++ b/index/segment/mem/terms_dict_test.go @@ -105,7 +105,8 @@ func (t *termsDictionaryTestSuite) TestIterateFields() { t.termsDict.Insert(f, id) expectedFields[string(f.Name)] = struct{}{} } - fields := t.termsDict.Fields() + fieldsIter := t.termsDict.Fields() + fields := toSlice(t.T(), fieldsIter) for _, field := range fields { delete(expectedFields, string(field)) } @@ -137,7 +138,8 @@ func (t *termsDictionaryTestSuite) TestIterateTerms() { } // for each expected combination of fieldName -> []fieldValues, ensure all are present for name, expectedValues := range expectedFields { - values := t.termsDict.Terms([]byte(name)) + valuesIter := t.termsDict.Terms([]byte(name)) + values := toSlice(t.T(), valuesIter) for _, val := range values { delete(expectedValues, string(val)) } diff --git a/index/segment/mem/types.go b/index/segment/mem/types.go index 3b26db4..5a40452 100644 --- a/index/segment/mem/types.go +++ b/index/segment/mem/types.go @@ -24,6 +24,7 @@ import ( re "regexp" "github.com/m3db/m3ninx/doc" + sgmt "github.com/m3db/m3ninx/index/segment" "github.com/m3db/m3ninx/postings" ) @@ -45,10 +46,10 @@ type termsDictionary interface { MatchRegexp(field, regexp []byte, compiled *re.Regexp) postings.List // Fields returns the list of known fields. - Fields() [][]byte + Fields() sgmt.FieldsIterator // Terms returns the list of known terms values for the given field. - Terms(field []byte) [][]byte + Terms(field []byte) sgmt.TermsIterator } // ReadableSegment is an internal interface for reading from a segment. diff --git a/index/segment/segment_mock.go b/index/segment/segment_mock.go index 138ec7e..78a79a7 100644 --- a/index/segment/segment_mock.go +++ b/index/segment/segment_mock.go @@ -82,9 +82,9 @@ func (mr *MockSegmentMockRecorder) ContainsID(arg0 interface{}) *gomock.Call { } // Fields mocks base method -func (m *MockSegment) Fields() ([][]byte, error) { +func (m *MockSegment) Fields() (FieldsIterator, error) { ret := m.ctrl.Call(m, "Fields") - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(FieldsIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -120,9 +120,9 @@ func (mr *MockSegmentMockRecorder) Size() *gomock.Call { } // Terms mocks base method -func (m *MockSegment) Terms(arg0 []byte) ([][]byte, error) { +func (m *MockSegment) Terms(arg0 []byte) (TermsIterator, error) { ret := m.ctrl.Call(m, "Terms", arg0) - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(TermsIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -181,9 +181,9 @@ func (mr *MockMutableSegmentMockRecorder) ContainsID(arg0 interface{}) *gomock.C } // Fields mocks base method -func (m *MockMutableSegment) Fields() ([][]byte, error) { +func (m *MockMutableSegment) Fields() (FieldsIterator, error) { ret := m.ctrl.Call(m, "Fields") - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(FieldsIterator) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -269,9 +269,9 @@ func (mr *MockMutableSegmentMockRecorder) Size() *gomock.Call { } // Terms mocks base method -func (m *MockMutableSegment) Terms(arg0 []byte) ([][]byte, error) { +func (m *MockMutableSegment) Terms(arg0 []byte) (TermsIterator, error) { ret := m.ctrl.Call(m, "Terms", arg0) - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(TermsIterator) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/index/segment/types.go b/index/segment/types.go index acbb795..4c47304 100644 --- a/index/segment/types.go +++ b/index/segment/types.go @@ -44,16 +44,41 @@ type Segment interface { // Reader returns a point-in-time accessor to search the segment. Reader() (index.Reader, error) - // Fields returns the list of known fields. - Fields() ([][]byte, error) + // Fields returns an iterator over the list of known fields. + Fields() (FieldsIterator, error) - // Terms returns the list of known terms values for the given field. - Terms(field []byte) ([][]byte, error) + // Terms returns an iterator over the known terms values for the given field. + Terms(field []byte) (TermsIterator, error) // Close closes the segment and releases any internal resources. Close() error } +// OrderedBytesSliceIterator iterates over a set of bytes slices in lexicographical order. +type OrderedBytesSliceIterator interface { + // Next returns a bool indicating if there are any more elements + Next() bool + + // Current returns the current element. + Current() []byte + + // Err returns any errors encountered during iteration. + Err() error + + // Close releases any resources held by the iterator. + Close() error +} + +// FieldsIterator iterates over all known fields. +type FieldsIterator interface { + OrderedBytesSliceIterator +} + +// TermsIterator iterates over all known terms for the provided field. +type TermsIterator interface { + OrderedBytesSliceIterator +} + // MutableSegment is a segment which can be updated. type MutableSegment interface { Segment diff --git a/persist/reader.go b/persist/reader.go index eb5c020..ee4ef24 100644 --- a/persist/reader.go +++ b/persist/reader.go @@ -27,7 +27,7 @@ import ( ) // NewSegment returns a new fs.Segment backed by the provided fileset. -func NewSegment(fileset IndexSegmentFileSet, opts fs.NewSegmentOpts) (fs.Segment, error) { +func NewSegment(fileset IndexSegmentFileSet, opts fs.Options) (fs.Segment, error) { if t := fileset.SegmentType(); t != FSTIndexSegmentType { return nil, fmt.Errorf("unknown segment type: %s", t) } diff --git a/persist/reader_test.go b/persist/reader_test.go index c9e570f..d179a6b 100644 --- a/persist/reader_test.go +++ b/persist/reader_test.go @@ -36,7 +36,7 @@ func TestReaderValidateType(t *testing.T) { fset := NewMockIndexSegmentFileSet(ctrl) fset.EXPECT().SegmentType().Return(IndexSegmentType("random")) - _, err := NewSegment(fset, fs.NewSegmentOpts{}) + _, err := NewSegment(fset, nil) require.Error(t, err) } @@ -51,7 +51,7 @@ func TestReaderValidateDataSlices(t *testing.T) { fset.EXPECT().SegmentMetadata().Return([]byte{}) fset.EXPECT().Files().Return(nil) - _, err := NewSegment(fset, fs.NewSegmentOpts{}) + _, err := NewSegment(fset, nil) require.Error(t, err) } @@ -70,7 +70,7 @@ func TestReaderValidateByteAccess(t *testing.T) { docsDataFile.EXPECT().Bytes().Return(nil, fmt.Errorf("random")) fset.EXPECT().Files().Return([]IndexSegmentFile{docsDataFile}) - _, err := NewSegment(fset, fs.NewSegmentOpts{}) + _, err := NewSegment(fset, nil) require.Error(t, err) } @@ -88,7 +88,7 @@ func TestReaderValidateSegmentFileType(t *testing.T) { docsDataFile.EXPECT().SegmentFileType().Return(IndexSegmentFileType("rand")) fset.EXPECT().Files().Return([]IndexSegmentFile{docsDataFile}) - _, err := NewSegment(fset, fs.NewSegmentOpts{}) + _, err := NewSegment(fset, nil) require.Error(t, err) } diff --git a/x/bytes/slice_arraypool_gen.go b/x/bytes/slice_arraypool_gen.go new file mode 100644 index 0000000..42ed06a --- /dev/null +++ b/x/bytes/slice_arraypool_gen.go @@ -0,0 +1,107 @@ +// This file was automatically generated by genny. +// Any changes will be lost if this file is regenerated. +// see https://github.com/mauricelam/genny + +package bytes + +import ( + "github.com/m3db/m3x/pool" +) + +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// SliceArrayPool provides a pool for []byte slices. +type SliceArrayPool interface { + // Init initializes the array pool, it needs to be called + // before Get/Put use. + Init() + + // Get returns the a slice from the pool. + Get() [][]byte + + // Put returns the provided slice to the pool. + Put(elems [][]byte) +} + +type SliceFinalizeFn func([][]byte) [][]byte + +type SliceArrayPoolOpts struct { + Options pool.ObjectPoolOptions + Capacity int + MaxCapacity int + FinalizeFn SliceFinalizeFn +} + +type SliceArrPool struct { + opts SliceArrayPoolOpts + pool pool.ObjectPool +} + +func NewSliceArrayPool(opts SliceArrayPoolOpts) SliceArrayPool { + if opts.FinalizeFn == nil { + opts.FinalizeFn = defaultSliceFinalizerFn + } + p := pool.NewObjectPool(opts.Options) + return &SliceArrPool{opts, p} +} + +func (p *SliceArrPool) Init() { + p.pool.Init(func() interface{} { + return make([][]byte, 0, p.opts.Capacity) + }) +} + +func (p *SliceArrPool) Get() [][]byte { + return p.pool.Get().([][]byte) +} + +func (p *SliceArrPool) Put(arr [][]byte) { + arr = p.opts.FinalizeFn(arr) + if max := p.opts.MaxCapacity; max > 0 && cap(arr) > max { + return + } + p.pool.Put(arr) +} + +func defaultSliceFinalizerFn(elems [][]byte) [][]byte { + var empty []byte + for i := range elems { + elems[i] = empty + } + elems = elems[:0] + return elems +} + +type SliceArr [][]byte + +func (elems SliceArr) grow(n int) [][]byte { + if cap(elems) < n { + elems = make([][]byte, n) + } + elems = elems[:n] + // following compiler optimized memcpy impl + // https://github.com/golang/go/wiki/CompilerOptimizations#optimized-memclr + var empty []byte + for i := range elems { + elems[i] = empty + } + return elems +} From 84ae9c98bebdbbad9af0dceb413ad4d3a8b7995a Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Thu, 17 May 2018 01:00:00 -0400 Subject: [PATCH 2/2] deadcode --- index/segment/fs/fst_terms_iterator.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/index/segment/fs/fst_terms_iterator.go b/index/segment/fs/fst_terms_iterator.go index da1c7eb..2d30e59 100644 --- a/index/segment/fs/fst_terms_iterator.go +++ b/index/segment/fs/fst_terms_iterator.go @@ -118,7 +118,3 @@ func (f *fstTermsIter) Close() error { multiErr = multiErr.Add(f.iterOpts.Close()) return multiErr.FinalError() } - -type noOpCloser struct{} - -func (n noOpCloser) Close() error { return nil }