From 55c2e2c2db9e6bd51de4077a4ae9e745c79db1db Mon Sep 17 00:00:00 2001 From: Jerome Froelich Date: Tue, 24 Apr 2018 15:58:08 -0400 Subject: [PATCH] Support partial updates --- .excludemetalint | 2 +- index/batch.go | 122 ++++++++++++++++++ index/batch_test.go | 70 ++++++++++ index/segment/mem/segment.go | 65 ++++++---- index/segment/mem/segment_test.go | 204 ++++++++++++++++++++++++++---- index/segment/types.go | 10 +- index/types.go | 19 ++- 7 files changed, 431 insertions(+), 61 deletions(-) create mode 100644 index/batch.go create mode 100644 index/batch_test.go diff --git a/.excludemetalint b/.excludemetalint index 8448c57..fafe738 100644 --- a/.excludemetalint +++ b/.excludemetalint @@ -2,4 +2,4 @@ vendor/ generated/ _mock.go .pb.go -index/segment/mem/idsgen/map_gen.go \ No newline at end of file +index/segment/mem/idsgen/map_gen.go diff --git a/index/batch.go b/index/batch.go new file mode 100644 index 0000000..f4b1de7 --- /dev/null +++ b/index/batch.go @@ -0,0 +1,122 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "bytes" + "fmt" + + "github.com/m3db/m3ninx/doc" +) + +// Batch represents a batch of documents that should be inserted into the index. +type Batch struct { + Docs []doc.Document + + // If AllowPartialUpdates is true the index will continue to index documents in the batch + // even if it encounters an error attempting to index a previous document in the batch. + // If true, on the other hand, then any errors encountered indexing a document will cause + // the entire batch to fail and none of the documents in the batch will be indexed. + AllowPartialUpdates bool +} + +// BatchOption is an option for a Batch. +type BatchOption interface { + apply(Batch) Batch +} + +// batchOptionFunc is an adaptor to allow the use of functions as BatchOptions. +type batchOptionFunc func(Batch) Batch + +func (f batchOptionFunc) apply(b Batch) Batch { + return f(b) +} + +// AllowPartialUpdates permits an index to continue indexing documents in a batch even if +// it encountered an error inserting a prior document. +func AllowPartialUpdates() BatchOption { + return batchOptionFunc(func(b Batch) Batch { + b.AllowPartialUpdates = true + return b + }) +} + +// NewBatch returns a Batch of documents. +func NewBatch(docs []doc.Document, opts ...BatchOption) Batch { + b := Batch{Docs: docs} + + for _, opt := range opts { + b = opt.apply(b) + } + + return b +} + +// BatchPartialError indicates an error was encountered inserting some documents in a batch. +// It is not safe for concurrent use. +type BatchPartialError struct { + errs []error + idxs []int +} + +// NewBatchPartialError returns a new BatchPartialError. +func NewBatchPartialError() *BatchPartialError { + return &BatchPartialError{ + errs: make([]error, 0), + idxs: make([]int, 0), + } +} + +func (e *BatchPartialError) Error() string { + var b bytes.Buffer + for i := range e.errs { + b.WriteString(fmt.Sprintf("failed to insert document at index %v in batch: %v", e.idxs[i], e.errs[i])) + if i != len(e.errs)-1 { + b.WriteString("\n") + } + } + return b.String() +} + +// Add adds an error to e. Any nil errors are ignored. +func (e *BatchPartialError) Add(err error, idx int) { + if err == nil { + return + } + e.errs = append(e.errs, err) + e.idxs = append(e.idxs, idx) +} + +// Indices returns the indices of the documents in the batch which were not indexed. +func (e *BatchPartialError) Indices() []int { + return e.idxs +} + +// IsEmpty returns a bool indicating whether e is empty or not. +func (e *BatchPartialError) IsEmpty() bool { + return len(e.errs) == 0 +} + +// IsBatchPartialError returns a bool indicating whether err is a BatchPartialError or not. +func IsBatchPartialError(err error) bool { + _, ok := err.(*BatchPartialError) + return ok +} diff --git a/index/batch_test.go b/index/batch_test.go new file mode 100644 index 0000000..edfd0d7 --- /dev/null +++ b/index/batch_test.go @@ -0,0 +1,70 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBatchAllowPartialUpdates(t *testing.T) { + tests := []struct { + name string + batch Batch + expected bool + }{ + { + name: "off", + batch: NewBatch(nil), + expected: false, + }, + { + name: "on", + batch: NewBatch(nil, AllowPartialUpdates()), + expected: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require.Equal(t, test.expected, test.batch.AllowPartialUpdates) + }) + } +} + +func TestBatchPartialError(t *testing.T) { + var ( + idxs = []int{3, 7, 13} + err = NewBatchPartialError() + ) + require.True(t, err.IsEmpty()) + + for _, idx := range idxs { + err.Add(errors.New("error"), idx) + } + require.False(t, err.IsEmpty()) + require.Equal(t, idxs, err.Indices()) + + require.True(t, IsBatchPartialError(err)) + require.False(t, IsBatchPartialError(errors.New("error"))) +} diff --git a/index/segment/mem/segment.go b/index/segment/mem/segment.go index fcfe122..4513e64 100644 --- a/index/segment/mem/segment.go +++ b/index/segment/mem/segment.go @@ -92,13 +92,13 @@ func (s *segment) Insert(d doc.Document) ([]byte, error) { { s.writer.Lock() - ds := []doc.Document{d} - if err := s.prepareDocsWithLocks(ds); err != nil { + b := index.NewBatch([]doc.Document{d}) + if err := s.prepareDocsWithLocks(b); err != nil { return nil, err } // Update the document in case we generated a UUID for it. - d = ds[0] + d = b.Docs[0] s.insertDocWithLocks(d) s.readerID.Inc() @@ -109,73 +109,94 @@ func (s *segment) Insert(d doc.Document) ([]byte, error) { return d.ID, nil } -func (s *segment) InsertBatch(ds []doc.Document) error { +func (s *segment) InsertBatch(b index.Batch) error { s.state.RLock() defer s.state.RUnlock() if s.state.closed { return sgmt.ErrClosed } + var err error { s.writer.Lock() - if err := s.prepareDocsWithLocks(ds); err != nil { + err = s.prepareDocsWithLocks(b) + if err != nil && !index.IsBatchPartialError(err) { return err } - for _, d := range ds { + for _, d := range b.Docs { s.insertDocWithLocks(d) } - s.readerID.Add(uint32(len(ds))) + s.readerID.Add(uint32(len(b.Docs))) s.writer.Unlock() } - return nil + return err } // prepareDocsWithLocks ensures the given documents can be inserted into the index. It // must be called with the state and writer locks. -func (s *segment) prepareDocsWithLocks(ds []doc.Document) error { +func (s *segment) prepareDocsWithLocks(b index.Batch) error { s.writer.idSet.Reset() - for i := 0; i < len(ds); { - d := ds[i] - err := d.Validate() - if err != nil { - return err + + batchErr := index.NewBatchPartialError() + for i := 0; i < len(b.Docs); i++ { + d := b.Docs[i] + if err := d.Validate(); err != nil { + if !b.AllowPartialUpdates { + return err + } + batchErr.Add(err, i) + continue } if d.HasID() { if s.termsDict.ContainsTerm(doc.IDReservedFieldName, d.ID) { // The segment already contains this document so we can remove it from those // we need to index. - ds[i], ds[len(ds)] = ds[len(ds)], ds[i] - ds = ds[:len(ds)-1] + b.Docs[i], b.Docs[len(b.Docs)] = b.Docs[len(b.Docs)], b.Docs[i] + b.Docs = b.Docs[:len(b.Docs)-1] + + // Decrement the loop variable since we just removed this document. + i-- continue } if _, ok := s.writer.idSet.Get(d.ID); ok { - return errDuplicateID + if !b.AllowPartialUpdates { + return errDuplicateID + } + batchErr.Add(errDuplicateID, i) + continue } } else { id, err := s.newUUIDFn() if err != nil { - return err + if !b.AllowPartialUpdates { + return err + } + batchErr.Add(err, i) + continue } + d.ID = id - // Update the document since we added an ID. - ds[i] = d + // Update the document in the batch since we added an ID to it. + b.Docs[i] = d } s.writer.idSet.SetUnsafe(d.ID, struct{}{}, idsgen.SetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: true, }) - i++ } - return nil + if batchErr.IsEmpty() { + return nil + } + return batchErr } // insertDocWithLocks inserts a document into the index. It must be called with the diff --git a/index/segment/mem/segment_test.go b/index/segment/mem/segment_test.go index 18251a7..5fb5cff 100644 --- a/index/segment/mem/segment_test.go +++ b/index/segment/mem/segment_test.go @@ -146,40 +146,101 @@ func TestSegmentInsertDuplicateID(t *testing.T) { require.NoError(t, segment.Close()) } -func TestSegmentBatch(t *testing.T) { +func TestSegmentInsertBatch(t *testing.T) { tests := []struct { name string - input []doc.Document + input index.Batch }{ { name: "valid batch", - input: []doc.Document{ - doc.Document{ - Fields: []doc.Field{ - doc.Field{ - Name: []byte("fruit"), - Value: []byte("apple"), + input: index.NewBatch( + []doc.Document{ + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("apple"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("red"), + }, }, - doc.Field{ - Name: []byte("color"), - Value: []byte("red"), + }, + doc.Document{ + ID: []byte("831992"), + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("banana"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, }, }, }, - doc.Document{ - ID: []byte("831992"), - Fields: []doc.Field{ - doc.Field{ - Name: []byte("fruit"), - Value: []byte("banana"), + ), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + segment, err := NewSegment(0, NewOptions()) + require.NoError(t, err) + + err = segment.InsertBatch(test.input) + require.NoError(t, err) + + r, err := segment.Reader() + require.NoError(t, err) + + for _, doc := range test.input.Docs { + testDocument(t, doc, r) + } + + require.NoError(t, r.Close()) + require.NoError(t, segment.Close()) + }) + } +} + +func TestSegmentInsertBatchError(t *testing.T) { + tests := []struct { + name string + input index.Batch + }{ + { + name: "invalid document", + input: index.NewBatch( + []doc.Document{ + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("apple"), + }, + doc.Field{ + Name: []byte("color\xff"), + Value: []byte("red"), + }, }, - doc.Field{ - Name: []byte("color"), - Value: []byte("yellow"), + }, + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("banana"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, }, }, }, - }, + ), }, } @@ -189,12 +250,111 @@ func TestSegmentBatch(t *testing.T) { require.NoError(t, err) err = segment.InsertBatch(test.input) + require.Error(t, err) + require.False(t, index.IsBatchPartialError(err)) + }) + } +} + +func TestSegmentInsertBatchPartialError(t *testing.T) { + tests := []struct { + name string + input index.Batch + }{ + { + name: "invalid document", + input: index.NewBatch( + []doc.Document{ + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("apple"), + }, + doc.Field{ + Name: []byte("color\xff"), + Value: []byte("red"), + }, + }, + }, + doc.Document{ + + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("banana"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, + }, + }, + }, + index.AllowPartialUpdates(), + ), + }, + { + name: "duplicate ID", + input: index.NewBatch( + []doc.Document{ + doc.Document{ + ID: []byte("831992"), + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("apple"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("red"), + }, + }, + }, + doc.Document{ + ID: []byte("831992"), + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("banana"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, + }, + }, + }, + index.AllowPartialUpdates(), + ), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + segment, err := NewSegment(0, NewOptions()) require.NoError(t, err) + err = segment.InsertBatch(test.input) + require.Error(t, err) + require.True(t, index.IsBatchPartialError(err)) + + batchErr := err.(*index.BatchPartialError) + idxs := batchErr.Indices() + failedDocs := make(map[int]struct{}, len(idxs)) + for _, idx := range idxs { + failedDocs[idx] = struct{}{} + } + r, err := segment.Reader() require.NoError(t, err) - for _, doc := range test.input { + for i, doc := range test.input.Docs { + _, ok := failedDocs[i] + if ok { + // Don't test documents which were not indexed. + continue + } testDocument(t, doc, r) } diff --git a/index/segment/types.go b/index/segment/types.go index 0d32ddf..4d97939 100644 --- a/index/segment/types.go +++ b/index/segment/types.go @@ -23,7 +23,6 @@ package segment import ( "errors" - "github.com/m3db/m3ninx/doc" "github.com/m3db/m3ninx/index" ) @@ -45,12 +44,5 @@ type Segment interface { // MutableSegment is a segment which can be updated. type MutableSegment interface { Segment - - // Insert inserts the given document into the segment and returns its ID. The - // document is guaranteed to be searchable once the Insert method returns. - Insert(d doc.Document) ([]byte, error) - - // InsertBatch inserts a batch of documents into the segment. The documents are - // guaranteed to be searchable all at once when the Batch method returns. - InsertBatch(ds []doc.Document) error + index.Writer } diff --git a/index/types.go b/index/types.go index cf052a1..f781156 100644 --- a/index/types.go +++ b/index/types.go @@ -31,13 +31,7 @@ import ( // Index is a collection of searchable documents. type Index interface { - // Insert inserts the given document into the index and returns its ID. The document - // is guaranteed to be searchable once the Insert method returns. - Insert(d doc.Document) ([]byte, error) - - // InsertBatch inserts a batch of metrics into the index. The documents are guaranteed - // to be searchable all at once when the Batch method returns. - InsertBatch(d []doc.Document) error + Writer // Readers returns a set of readers representing a point-in-time snapshot of the index. Readers() (Readers, error) @@ -46,6 +40,17 @@ type Index interface { Close() error } +// Writer is used to insert documents into an index. +type Writer interface { + // Insert inserts the given document into the index and returns its ID. The document + // is guaranteed to be searchable once the Insert method returns. + Insert(d doc.Document) ([]byte, error) + + // InsertBatch inserts a batch of metrics into the index. The documents are guaranteed + // to be searchable all at once when the Batch method returns. + InsertBatch(b Batch) error +} + // Reader provides a point-in-time accessor to the documents in an index. type Reader interface { // MatchTerm returns a postings list over all documents which match the given term.