Skip to content
This repository has been archived by the owner on Oct 16, 2018. It is now read-only.

Commit

Permalink
Support partial updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerome Froelich committed Apr 24, 2018
1 parent 09f4917 commit af23ed8
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .excludemetalint
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ vendor/
generated/
_mock.go
.pb.go
index/segment/mem/idsgen/map_gen.go
index/segment/mem/idsgen/map_gen.go
122 changes: 122 additions & 0 deletions index/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package index

import (
"bytes"
"fmt"

"github.com/m3db/m3ninx/doc"
)

// Batch represents a batch of documents that should be inserted into the index.
type Batch struct {
Docs []doc.Document

// If AllowPartialUpdates is true the index will continue to index documents in the batch
// even if it encounters an error attempting to index a previous document in the batch.
// If true, on the other hand, then any errors encountered indexing a document will cause
// the entire batch to fail and none of the documents in the batch will be indexed.
AllowPartialUpdates bool
}

// BatchOption is an option for a Batch.
type BatchOption interface {
apply(Batch) Batch
}

// batchOptionFunc is an adaptor to allow the use of functions as BatchOptions.
type batchOptionFunc func(Batch) Batch

func (f batchOptionFunc) apply(b Batch) Batch {
return f(b)
}

// AllowPartialUpdates permits an index to continue indexing documents in a batch even if
// it encountered an error inserting a prior document.
func AllowPartialUpdates() BatchOption {
return batchOptionFunc(func(b Batch) Batch {
b.AllowPartialUpdates = true
return b
})
}

// NewBatch returns a Batch of documents.
func NewBatch(docs []doc.Document, opts ...BatchOption) Batch {
b := Batch{Docs: docs}

for _, opt := range opts {
b = opt.apply(b)
}

return b
}

// BatchPartialError indicates an error was encountered inserting some documents in a batch.
// It is not safe for concurrent use.
type BatchPartialError struct {
errs []error
idxs []int
}

// NewBatchPartialError returns a new BatchPartialError.
func NewBatchPartialError() *BatchPartialError {
return &BatchPartialError{
errs: make([]error, 0),
idxs: make([]int, 0),
}
}

func (e *BatchPartialError) Error() string {
var b bytes.Buffer
for i := range e.errs {
b.WriteString(fmt.Sprintf("failed to insert document at index %v in batch: %v", e.idxs[i], e.errs[i]))
if i != len(e.errs)-1 {
b.WriteString("\n")
}
}
return b.String()
}

// Add adds an error to e. Any nil errors are ignored.
func (e *BatchPartialError) Add(err error, idx int) {
if err == nil {
return
}
e.errs = append(e.errs, err)
e.idxs = append(e.idxs, idx)
}

// Indices returns the indices of the documents in the batch which were not indexed.
func (e *BatchPartialError) Indices() []int {
return e.idxs
}

// IsEmpty returns a bool indicating whether e is empty or not.
func (e *BatchPartialError) IsEmpty() bool {
return len(e.errs) == 0
}

// IsBatchPartialError returns a bool indicating whether err is a BatchPartialError or not.
func IsBatchPartialError(err error) bool {
_, ok := err.(*BatchPartialError)
return ok
}
70 changes: 70 additions & 0 deletions index/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package index

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchAllowPartialUpdates(t *testing.T) {
tests := []struct {
name string
batch Batch
expected bool
}{
{
name: "off",
batch: NewBatch(nil),
expected: false,
},
{
name: "on",
batch: NewBatch(nil, AllowPartialUpdates()),
expected: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
require.Equal(t, test.expected, test.batch.AllowPartialUpdates)
})
}
}

func TestBatchPartialError(t *testing.T) {
var (
idxs = []int{3, 7, 13}
err = NewBatchPartialError()
)
require.True(t, err.IsEmpty())

for _, idx := range idxs {
err.Add(errors.New("error"), idx)
}
require.False(t, err.IsEmpty())
require.Equal(t, idxs, err.Indices())

require.True(t, IsBatchPartialError(err))
require.False(t, IsBatchPartialError(errors.New("error")))
}
52 changes: 32 additions & 20 deletions index/segment/mem/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func (s *segment) Insert(d doc.Document) ([]byte, error) {
{
s.writer.Lock()

ds := []doc.Document{d}
if err := s.prepareDocsWithLocks(ds); err != nil {
b := index.NewBatch([]doc.Document{d})
if err := s.prepareDocsWithLocks(b); err != nil {
return nil, err
}

// Update the document in case we generated a UUID for it.
d = ds[0]
d = b.Docs[0]

s.insertDocWithLocks(d)
s.readerID.Inc()
Expand All @@ -109,48 +109,58 @@ func (s *segment) Insert(d doc.Document) ([]byte, error) {
return d.ID, nil
}

func (s *segment) InsertBatch(ds []doc.Document) error {
func (s *segment) InsertBatch(b index.Batch) error {
s.state.RLock()
defer s.state.RUnlock()
if s.state.closed {
return sgmt.ErrClosed
}

var err error
{
s.writer.Lock()

if err := s.prepareDocsWithLocks(ds); err != nil {
err = s.prepareDocsWithLocks(b)
if err != nil && !index.IsBatchPartialError(err) {
return err
}

for _, d := range ds {
for _, d := range b.Docs {
s.insertDocWithLocks(d)
}
s.readerID.Add(uint32(len(ds)))
s.readerID.Add(uint32(len(b.Docs)))

s.writer.Unlock()
}

return nil
return err
}

// prepareDocsWithLocks ensures the given documents can be inserted into the index. It
// must be called with the state and writer locks.
func (s *segment) prepareDocsWithLocks(ds []doc.Document) error {
func (s *segment) prepareDocsWithLocks(b index.Batch) error {
s.writer.idSet.Reset()
for i := 0; i < len(ds); {
d := ds[i]
err := d.Validate()
if err != nil {
return err

batchErr := index.NewBatchPartialError()
for i := 0; i < len(b.Docs); i++ {
d := b.Docs[i]
if err := d.Validate(); err != nil {
if !b.AllowPartialUpdates {
return err
}
batchErr.Add(err, i)
continue
}

if d.HasID() {
if s.termsDict.ContainsTerm(doc.IDReservedFieldName, d.ID) {
// The segment already contains this document so we can remove it from those
// we need to index.
ds[i], ds[len(ds)] = ds[len(ds)], ds[i]
ds = ds[:len(ds)-1]
b.Docs[i], b.Docs[len(b.Docs)] = b.Docs[len(b.Docs)], b.Docs[i]
b.Docs = b.Docs[:len(b.Docs)-1]

// Decrement the loop variable since we just removed this document.
i--
continue
}

Expand All @@ -164,18 +174,20 @@ func (s *segment) prepareDocsWithLocks(ds []doc.Document) error {
}
d.ID = id

// Update the document since we added an ID.
ds[i] = d
// Update the document in the batch since we added an ID to it.
b.Docs[i] = d
}

s.writer.idSet.SetUnsafe(d.ID, struct{}{}, idsgen.SetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
i++
}

return nil
if batchErr.IsEmpty() {
return nil
}
return batchErr
}

// insertDocWithLocks inserts a document into the index. It must be called with the
Expand Down
Loading

0 comments on commit af23ed8

Please sign in to comment.