-
Notifications
You must be signed in to change notification settings - Fork 178
/
fastcache.go
433 lines (383 loc) · 11.5 KB
/
fastcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
// Package fastcache implements fast in-memory cache.
//
// The package has been extracted from https://victoriametrics.com/
package fastcache
import (
"fmt"
"sync"
"sync/atomic"
xxhash "github.com/cespare/xxhash/v2"
)
const bucketsCount = 512
const chunkSize = 64 * 1024
const bucketSizeBits = 40
const genSizeBits = 64 - bucketSizeBits
const maxGen = 1<<genSizeBits - 1
const maxBucketSize uint64 = 1 << bucketSizeBits
// Stats represents cache stats.
//
// Use Cache.UpdateStats for obtaining fresh stats from the cache.
type Stats struct {
// GetCalls is the number of Get calls.
GetCalls uint64
// SetCalls is the number of Set calls.
SetCalls uint64
// Misses is the number of cache misses.
Misses uint64
// Collisions is the number of cache collisions.
//
// Usually the number of collisions must be close to zero.
// High number of collisions suggest something wrong with cache.
Collisions uint64
// Corruptions is the number of detected corruptions of the cache.
//
// Corruptions may occur when corrupted cache is loaded from file.
Corruptions uint64
// EntriesCount is the current number of entries in the cache.
EntriesCount uint64
// BytesSize is the current size of the cache in bytes.
BytesSize uint64
// MaxBytesSize is the maximum allowed size of the cache in bytes (aka capacity).
MaxBytesSize uint64
// BigStats contains stats for GetBig/SetBig methods.
BigStats
}
// Reset resets s, so it may be re-used again in Cache.UpdateStats.
func (s *Stats) Reset() {
*s = Stats{}
}
// BigStats contains stats for GetBig/SetBig methods.
type BigStats struct {
// GetBigCalls is the number of GetBig calls.
GetBigCalls uint64
// SetBigCalls is the number of SetBig calls.
SetBigCalls uint64
// TooBigKeyErrors is the number of calls to SetBig with too big key.
TooBigKeyErrors uint64
// InvalidMetavalueErrors is the number of calls to GetBig resulting
// to invalid metavalue.
InvalidMetavalueErrors uint64
// InvalidValueLenErrors is the number of calls to GetBig resulting
// to a chunk with invalid length.
InvalidValueLenErrors uint64
// InvalidValueHashErrors is the number of calls to GetBig resulting
// to a chunk with invalid hash value.
InvalidValueHashErrors uint64
}
func (bs *BigStats) reset() {
atomic.StoreUint64(&bs.GetBigCalls, 0)
atomic.StoreUint64(&bs.SetBigCalls, 0)
atomic.StoreUint64(&bs.TooBigKeyErrors, 0)
atomic.StoreUint64(&bs.InvalidMetavalueErrors, 0)
atomic.StoreUint64(&bs.InvalidValueLenErrors, 0)
atomic.StoreUint64(&bs.InvalidValueHashErrors, 0)
}
// Cache is a fast thread-safe inmemory cache optimized for big number
// of entries.
//
// It has much lower impact on GC comparing to a simple `map[string][]byte`.
//
// Use New or LoadFromFile* for creating new cache instance.
// Concurrent goroutines may call any Cache methods on the same cache instance.
//
// Call Reset when the cache is no longer needed. This reclaims the allocated
// memory.
type Cache struct {
buckets [bucketsCount]bucket
bigStats BigStats
}
// New returns new cache with the given maxBytes capacity in bytes.
//
// maxBytes must be smaller than the available RAM size for the app,
// since the cache holds data in memory.
//
// If maxBytes is less than 32MB, then the minimum cache capacity is 32MB.
func New(maxBytes int) *Cache {
if maxBytes <= 0 {
panic(fmt.Errorf("maxBytes must be greater than 0; got %d", maxBytes))
}
var c Cache
maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount)
for i := range c.buckets[:] {
c.buckets[i].Init(maxBucketBytes)
}
return &c
}
// Set stores (k, v) in the cache.
//
// Get must be used for reading the stored entry.
//
// The stored entry may be evicted at any time either due to cache
// overflow or due to unlikely hash collision.
// Pass higher maxBytes value to New if the added items disappear
// frequently.
//
// (k, v) entries with summary size exceeding 64KB aren't stored in the cache.
// SetBig can be used for storing entries exceeding 64KB.
//
// k and v contents may be modified after returning from Set.
func (c *Cache) Set(k, v []byte) {
h := xxhash.Sum64(k)
idx := h % bucketsCount
c.buckets[idx].Set(k, v, h)
}
// Get appends value by the key k to dst and returns the result.
//
// Get allocates new byte slice for the returned value if dst is nil.
//
// Get returns only values stored in c via Set.
//
// k contents may be modified after returning from Get.
func (c *Cache) Get(dst, k []byte) []byte {
h := xxhash.Sum64(k)
idx := h % bucketsCount
dst, _ = c.buckets[idx].Get(dst, k, h, true)
return dst
}
// HasGet works identically to Get, but also returns whether the given key
// exists in the cache. This method makes it possible to differentiate between a
// stored nil/empty value versus and non-existing value.
func (c *Cache) HasGet(dst, k []byte) ([]byte, bool) {
h := xxhash.Sum64(k)
idx := h % bucketsCount
return c.buckets[idx].Get(dst, k, h, true)
}
// Has returns true if entry for the given key k exists in the cache.
func (c *Cache) Has(k []byte) bool {
h := xxhash.Sum64(k)
idx := h % bucketsCount
_, ok := c.buckets[idx].Get(nil, k, h, false)
return ok
}
// Del deletes value for the given k from the cache.
//
// k contents may be modified after returning from Del.
func (c *Cache) Del(k []byte) {
h := xxhash.Sum64(k)
idx := h % bucketsCount
c.buckets[idx].Del(h)
}
// Reset removes all the items from the cache.
func (c *Cache) Reset() {
for i := range c.buckets[:] {
c.buckets[i].Reset()
}
c.bigStats.reset()
}
// UpdateStats adds cache stats to s.
//
// Call s.Reset before calling UpdateStats if s is re-used.
func (c *Cache) UpdateStats(s *Stats) {
for i := range c.buckets[:] {
c.buckets[i].UpdateStats(s)
}
s.GetBigCalls += atomic.LoadUint64(&c.bigStats.GetBigCalls)
s.SetBigCalls += atomic.LoadUint64(&c.bigStats.SetBigCalls)
s.TooBigKeyErrors += atomic.LoadUint64(&c.bigStats.TooBigKeyErrors)
s.InvalidMetavalueErrors += atomic.LoadUint64(&c.bigStats.InvalidMetavalueErrors)
s.InvalidValueLenErrors += atomic.LoadUint64(&c.bigStats.InvalidValueLenErrors)
s.InvalidValueHashErrors += atomic.LoadUint64(&c.bigStats.InvalidValueHashErrors)
}
type bucket struct {
mu sync.RWMutex
// chunks is a ring buffer with encoded (k, v) pairs.
// It consists of 64KB chunks.
chunks [][]byte
// m maps hash(k) to idx of (k, v) pair in chunks.
m map[uint64]uint64
// idx points to chunks for writing the next (k, v) pair.
idx uint64
// gen is the generation of chunks.
gen uint64
getCalls uint64
setCalls uint64
misses uint64
collisions uint64
corruptions uint64
}
func (b *bucket) Init(maxBytes uint64) {
if maxBytes == 0 {
panic(fmt.Errorf("maxBytes cannot be zero"))
}
if maxBytes >= maxBucketSize {
panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize))
}
maxChunks := (maxBytes + chunkSize - 1) / chunkSize
b.chunks = make([][]byte, maxChunks)
b.m = make(map[uint64]uint64)
b.Reset()
}
func (b *bucket) Reset() {
b.mu.Lock()
chunks := b.chunks
for i := range chunks {
putChunk(chunks[i])
chunks[i] = nil
}
b.m = make(map[uint64]uint64)
b.idx = 0
b.gen = 1
atomic.StoreUint64(&b.getCalls, 0)
atomic.StoreUint64(&b.setCalls, 0)
atomic.StoreUint64(&b.misses, 0)
atomic.StoreUint64(&b.collisions, 0)
atomic.StoreUint64(&b.corruptions, 0)
b.mu.Unlock()
}
func (b *bucket) cleanLocked() {
bGen := b.gen & ((1 << genSizeBits) - 1)
bIdx := b.idx
bm := b.m
newItems := 0
for _, v := range bm {
gen := v >> bucketSizeBits
idx := v & ((1 << bucketSizeBits) - 1)
if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx || gen == bGen && idx < bIdx {
newItems++
}
}
if newItems < len(bm) {
// Re-create b.m with valid items, which weren't expired yet instead of deleting expired items from b.m.
// This should reduce memory fragmentation and the number Go objects behind b.m.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5379
bmNew := make(map[uint64]uint64, newItems)
for k, v := range bm {
gen := v >> bucketSizeBits
idx := v & ((1 << bucketSizeBits) - 1)
if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx || gen == bGen && idx < bIdx {
bmNew[k] = v
}
}
b.m = bmNew
}
}
func (b *bucket) UpdateStats(s *Stats) {
s.GetCalls += atomic.LoadUint64(&b.getCalls)
s.SetCalls += atomic.LoadUint64(&b.setCalls)
s.Misses += atomic.LoadUint64(&b.misses)
s.Collisions += atomic.LoadUint64(&b.collisions)
s.Corruptions += atomic.LoadUint64(&b.corruptions)
b.mu.RLock()
s.EntriesCount += uint64(len(b.m))
bytesSize := uint64(0)
for _, chunk := range b.chunks {
bytesSize += uint64(cap(chunk))
}
s.BytesSize += bytesSize
s.MaxBytesSize += uint64(len(b.chunks)) * chunkSize
b.mu.RUnlock()
}
func (b *bucket) Set(k, v []byte, h uint64) {
atomic.AddUint64(&b.setCalls, 1)
if len(k) >= (1<<16) || len(v) >= (1<<16) {
// Too big key or value - its length cannot be encoded
// with 2 bytes (see below). Skip the entry.
return
}
var kvLenBuf [4]byte
kvLenBuf[0] = byte(uint16(len(k)) >> 8)
kvLenBuf[1] = byte(len(k))
kvLenBuf[2] = byte(uint16(len(v)) >> 8)
kvLenBuf[3] = byte(len(v))
kvLen := uint64(len(kvLenBuf) + len(k) + len(v))
if kvLen >= chunkSize {
// Do not store too big keys and values, since they do not
// fit a chunk.
return
}
chunks := b.chunks
needClean := false
b.mu.Lock()
idx := b.idx
idxNew := idx + kvLen
chunkIdx := idx / chunkSize
chunkIdxNew := idxNew / chunkSize
if chunkIdxNew > chunkIdx {
if chunkIdxNew >= uint64(len(chunks)) {
idx = 0
idxNew = kvLen
chunkIdx = 0
b.gen++
if b.gen&((1<<genSizeBits)-1) == 0 {
b.gen++
}
needClean = true
} else {
idx = chunkIdxNew * chunkSize
idxNew = idx + kvLen
chunkIdx = chunkIdxNew
}
chunks[chunkIdx] = chunks[chunkIdx][:0]
}
chunk := chunks[chunkIdx]
if chunk == nil {
chunk = getChunk()
chunk = chunk[:0]
}
chunk = append(chunk, kvLenBuf[:]...)
chunk = append(chunk, k...)
chunk = append(chunk, v...)
chunks[chunkIdx] = chunk
b.m[h] = idx | (b.gen << bucketSizeBits)
b.idx = idxNew
if needClean {
b.cleanLocked()
}
b.mu.Unlock()
}
func (b *bucket) Get(dst, k []byte, h uint64, returnDst bool) ([]byte, bool) {
atomic.AddUint64(&b.getCalls, 1)
found := false
chunks := b.chunks
b.mu.RLock()
v := b.m[h]
bGen := b.gen & ((1 << genSizeBits) - 1)
if v > 0 {
gen := v >> bucketSizeBits
idx := v & ((1 << bucketSizeBits) - 1)
if gen == bGen && idx < b.idx || gen+1 == bGen && idx >= b.idx || gen == maxGen && bGen == 1 && idx >= b.idx {
chunkIdx := idx / chunkSize
if chunkIdx >= uint64(len(chunks)) {
// Corrupted data during the load from file. Just skip it.
atomic.AddUint64(&b.corruptions, 1)
goto end
}
chunk := chunks[chunkIdx]
idx %= chunkSize
if idx+4 >= chunkSize {
// Corrupted data during the load from file. Just skip it.
atomic.AddUint64(&b.corruptions, 1)
goto end
}
kvLenBuf := chunk[idx : idx+4]
keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1])
valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3])
idx += 4
if idx+keyLen+valLen >= chunkSize {
// Corrupted data during the load from file. Just skip it.
atomic.AddUint64(&b.corruptions, 1)
goto end
}
if string(k) == string(chunk[idx:idx+keyLen]) {
idx += keyLen
if returnDst {
dst = append(dst, chunk[idx:idx+valLen]...)
}
found = true
} else {
atomic.AddUint64(&b.collisions, 1)
}
}
}
end:
b.mu.RUnlock()
if !found {
atomic.AddUint64(&b.misses, 1)
}
return dst, found
}
func (b *bucket) Del(h uint64) {
b.mu.Lock()
delete(b.m, h)
b.mu.Unlock()
}