1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"hash/crc32"
22	"math"
23	"sync"
24	"time"
25
26	"go.etcd.io/etcd/etcdserver/cindex"
27	"go.etcd.io/etcd/lease"
28	"go.etcd.io/etcd/mvcc/backend"
29	"go.etcd.io/etcd/mvcc/mvccpb"
30	"go.etcd.io/etcd/pkg/schedule"
31	"go.etcd.io/etcd/pkg/traceutil"
32
33	"go.uber.org/zap"
34)
35
36var (
37	keyBucketName  = []byte("key")
38	metaBucketName = []byte("meta")
39
40	consistentIndexKeyName  = []byte("consistent_index")
41	scheduledCompactKeyName = []byte("scheduledCompactRev")
42	finishedCompactKeyName  = []byte("finishedCompactRev")
43
44	ErrCompacted = errors.New("mvcc: required revision has been compacted")
45	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
46	ErrCanceled  = errors.New("mvcc: watcher is canceled")
47)
48
49const (
50	// markedRevBytesLen is the byte length of marked revision.
51	// The first `revBytesLen` bytes represents a normal revision. The last
52	// one byte is the mark.
53	markedRevBytesLen      = revBytesLen + 1
54	markBytePosition       = markedRevBytesLen - 1
55	markTombstone     byte = 't'
56)
57
58var restoreChunkKeys = 10000 // non-const for testing
59var defaultCompactBatchLimit = 1000
60
61type StoreConfig struct {
62	CompactionBatchLimit int
63}
64
65type store struct {
66	ReadView
67	WriteView
68
69	cfg StoreConfig
70
71	// mu read locks for txns and write locks for non-txn store changes.
72	mu sync.RWMutex
73
74	ci cindex.ConsistentIndexer
75
76	b       backend.Backend
77	kvindex index
78
79	le lease.Lessor
80
81	// revMuLock protects currentRev and compactMainRev.
82	// Locked at end of write txn and released after write txn unlock lock.
83	// Locked before locking read txn and released after locking.
84	revMu sync.RWMutex
85	// currentRev is the revision of the last completed transaction.
86	currentRev int64
87	// compactMainRev is the main revision of the last compaction.
88	compactMainRev int64
89
90	fifoSched schedule.Scheduler
91
92	stopc chan struct{}
93
94	lg *zap.Logger
95}
96
97// NewStore returns a new store. It is useful to create a store inside
98// mvcc pkg. It should only be used for testing externally.
99func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store {
100	if lg == nil {
101		lg = zap.NewNop()
102	}
103	if cfg.CompactionBatchLimit == 0 {
104		cfg.CompactionBatchLimit = defaultCompactBatchLimit
105	}
106	s := &store{
107		cfg:     cfg,
108		b:       b,
109		ci:      ci,
110		kvindex: newTreeIndex(lg),
111
112		le: le,
113
114		currentRev:     1,
115		compactMainRev: -1,
116
117		fifoSched: schedule.NewFIFOScheduler(),
118
119		stopc: make(chan struct{}),
120
121		lg: lg,
122	}
123	s.ReadView = &readView{s}
124	s.WriteView = &writeView{s}
125	if s.le != nil {
126		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
127	}
128
129	tx := s.b.BatchTx()
130	tx.Lock()
131	tx.UnsafeCreateBucket(keyBucketName)
132	tx.UnsafeCreateBucket(metaBucketName)
133	tx.Unlock()
134	s.b.ForceCommit()
135
136	s.mu.Lock()
137	defer s.mu.Unlock()
138	if err := s.restore(); err != nil {
139		// TODO: return the error instead of panic here?
140		panic("failed to recover store from backend")
141	}
142
143	return s
144}
145
146func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
147	if ctx == nil || ctx.Err() != nil {
148		s.mu.Lock()
149		select {
150		case <-s.stopc:
151		default:
152			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
153			s.fifoSched.Schedule(f)
154		}
155		s.mu.Unlock()
156		return
157	}
158	close(ch)
159}
160
161func (s *store) Hash() (hash uint32, revision int64, err error) {
162	// TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
163	start := time.Now()
164
165	s.b.ForceCommit()
166	h, err := s.b.Hash(DefaultIgnores)
167
168	hashSec.Observe(time.Since(start).Seconds())
169	return h, s.currentRev, err
170}
171
172func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
173	start := time.Now()
174
175	s.mu.RLock()
176	s.revMu.RLock()
177	compactRev, currentRev = s.compactMainRev, s.currentRev
178	s.revMu.RUnlock()
179
180	if rev > 0 && rev <= compactRev {
181		s.mu.RUnlock()
182		return 0, 0, compactRev, ErrCompacted
183	} else if rev > 0 && rev > currentRev {
184		s.mu.RUnlock()
185		return 0, currentRev, 0, ErrFutureRev
186	}
187
188	if rev == 0 {
189		rev = currentRev
190	}
191	keep := s.kvindex.Keep(rev)
192
193	tx := s.b.ReadTx()
194	tx.RLock()
195	defer tx.RUnlock()
196	s.mu.RUnlock()
197
198	upper := revision{main: rev + 1}
199	lower := revision{main: compactRev + 1}
200	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
201
202	h.Write(keyBucketName)
203	err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
204		kr := bytesToRev(k)
205		if !upper.GreaterThan(kr) {
206			return nil
207		}
208		// skip revisions that are scheduled for deletion
209		// due to compacting; don't skip if there isn't one.
210		if lower.GreaterThan(kr) && len(keep) > 0 {
211			if _, ok := keep[kr]; !ok {
212				return nil
213			}
214		}
215		h.Write(k)
216		h.Write(v)
217		return nil
218	})
219	hash = h.Sum32()
220
221	hashRevSec.Observe(time.Since(start).Seconds())
222	return hash, currentRev, compactRev, err
223}
224
225func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
226	s.revMu.Lock()
227	if rev <= s.compactMainRev {
228		ch := make(chan struct{})
229		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
230		s.fifoSched.Schedule(f)
231		s.revMu.Unlock()
232		return ch, ErrCompacted
233	}
234	if rev > s.currentRev {
235		s.revMu.Unlock()
236		return nil, ErrFutureRev
237	}
238
239	s.compactMainRev = rev
240
241	rbytes := newRevBytes()
242	revToBytes(revision{main: rev}, rbytes)
243
244	tx := s.b.BatchTx()
245	tx.Lock()
246	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
247	tx.Unlock()
248	// ensure that desired compaction is persisted
249	s.b.ForceCommit()
250
251	s.revMu.Unlock()
252
253	return nil, nil
254}
255
256func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
257	ch := make(chan struct{})
258	var j = func(ctx context.Context) {
259		if ctx.Err() != nil {
260			s.compactBarrier(ctx, ch)
261			return
262		}
263		start := time.Now()
264		keep := s.kvindex.Compact(rev)
265		indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
266		if !s.scheduleCompaction(rev, keep) {
267			s.compactBarrier(nil, ch)
268			return
269		}
270		close(ch)
271	}
272
273	s.fifoSched.Schedule(j)
274	trace.Step("schedule compaction")
275	return ch, nil
276}
277
278func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
279	ch, err := s.updateCompactRev(rev)
280	if err != nil {
281		return ch, err
282	}
283
284	return s.compact(traceutil.TODO(), rev)
285}
286
287func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
288	s.mu.Lock()
289
290	ch, err := s.updateCompactRev(rev)
291	trace.Step("check and update compact revision")
292	if err != nil {
293		s.mu.Unlock()
294		return ch, err
295	}
296	s.mu.Unlock()
297
298	return s.compact(trace, rev)
299}
300
301// DefaultIgnores is a map of keys to ignore in hash checking.
302var DefaultIgnores map[backend.IgnoreKey]struct{}
303
304func init() {
305	DefaultIgnores = map[backend.IgnoreKey]struct{}{
306		// consistent index might be changed due to v2 internal sync, which
307		// is not controllable by the user.
308		{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
309	}
310}
311
312func (s *store) Commit() {
313	s.mu.Lock()
314	defer s.mu.Unlock()
315
316	tx := s.b.BatchTx()
317	tx.Lock()
318	s.saveIndex(tx)
319	tx.Unlock()
320	s.b.ForceCommit()
321}
322
323func (s *store) Restore(b backend.Backend) error {
324	s.mu.Lock()
325	defer s.mu.Unlock()
326
327	close(s.stopc)
328	s.fifoSched.Stop()
329
330	s.b = b
331	s.kvindex = newTreeIndex(s.lg)
332	s.currentRev = 1
333	s.compactMainRev = -1
334	s.fifoSched = schedule.NewFIFOScheduler()
335	s.stopc = make(chan struct{})
336	s.ci.SetBatchTx(b.BatchTx())
337	s.ci.SetConsistentIndex(0)
338
339	return s.restore()
340}
341
342func (s *store) restore() error {
343	s.setupMetricsReporter()
344
345	min, max := newRevBytes(), newRevBytes()
346	revToBytes(revision{main: 1}, min)
347	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
348
349	keyToLease := make(map[string]lease.LeaseID)
350
351	// restore index
352	tx := s.b.BatchTx()
353	tx.Lock()
354
355	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
356	if len(finishedCompactBytes) != 0 {
357		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
358
359		s.lg.Info(
360			"restored last compact revision",
361			zap.String("meta-bucket-name", string(metaBucketName)),
362			zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
363			zap.Int64("restored-compact-revision", s.compactMainRev),
364		)
365	}
366	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
367	scheduledCompact := int64(0)
368	if len(scheduledCompactBytes) != 0 {
369		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
370	}
371
372	// index keys concurrently as they're loaded in from tx
373	keysGauge.Set(0)
374	rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
375	for {
376		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
377		if len(keys) == 0 {
378			break
379		}
380		// rkvc blocks if the total pending keys exceeds the restore
381		// chunk size to keep keys from consuming too much memory.
382		restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
383		if len(keys) < restoreChunkKeys {
384			// partial set implies final set
385			break
386		}
387		// next set begins after where this one ended
388		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
389		newMin.sub++
390		revToBytes(newMin, min)
391	}
392	close(rkvc)
393	s.currentRev = <-revc
394
395	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
396	// the correct revision should be set to compaction revision in the case, not the largest revision
397	// we have seen.
398	if s.currentRev < s.compactMainRev {
399		s.currentRev = s.compactMainRev
400	}
401	if scheduledCompact <= s.compactMainRev {
402		scheduledCompact = 0
403	}
404
405	for key, lid := range keyToLease {
406		if s.le == nil {
407			tx.Unlock()
408			panic("no lessor to attach lease")
409		}
410		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
411		if err != nil {
412			s.lg.Error(
413				"failed to attach a lease",
414				zap.String("lease-id", fmt.Sprintf("%016x", lid)),
415				zap.Error(err),
416			)
417		}
418	}
419
420	tx.Unlock()
421
422	if scheduledCompact != 0 {
423		if _, err := s.compactLockfree(scheduledCompact); err != nil {
424			s.lg.Warn("compaction encountered error", zap.Error(err))
425		}
426
427		s.lg.Info(
428			"resume scheduled compaction",
429			zap.String("meta-bucket-name", string(metaBucketName)),
430			zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
431			zap.Int64("scheduled-compact-revision", scheduledCompact),
432		)
433	}
434
435	return nil
436}
437
438type revKeyValue struct {
439	key  []byte
440	kv   mvccpb.KeyValue
441	kstr string
442}
443
444func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
445	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
446	go func() {
447		currentRev := int64(1)
448		defer func() { revc <- currentRev }()
449		// restore the tree index from streaming the unordered index.
450		kiCache := make(map[string]*keyIndex, restoreChunkKeys)
451		for rkv := range rkvc {
452			ki, ok := kiCache[rkv.kstr]
453			// purge kiCache if many keys but still missing in the cache
454			if !ok && len(kiCache) >= restoreChunkKeys {
455				i := 10
456				for k := range kiCache {
457					delete(kiCache, k)
458					if i--; i == 0 {
459						break
460					}
461				}
462			}
463			// cache miss, fetch from tree index if there
464			if !ok {
465				ki = &keyIndex{key: rkv.kv.Key}
466				if idxKey := idx.KeyIndex(ki); idxKey != nil {
467					kiCache[rkv.kstr], ki = idxKey, idxKey
468					ok = true
469				}
470			}
471			rev := bytesToRev(rkv.key)
472			currentRev = rev.main
473			if ok {
474				if isTombstone(rkv.key) {
475					if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
476						lg.Warn("tombstone encountered error", zap.Error(err))
477					}
478					continue
479				}
480				ki.put(lg, rev.main, rev.sub)
481			} else if !isTombstone(rkv.key) {
482				ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
483				idx.Insert(ki)
484				kiCache[rkv.kstr] = ki
485			}
486		}
487	}()
488	return rkvc, revc
489}
490
491func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
492	for i, key := range keys {
493		rkv := revKeyValue{key: key}
494		if err := rkv.kv.Unmarshal(vals[i]); err != nil {
495			lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
496		}
497		rkv.kstr = string(rkv.kv.Key)
498		if isTombstone(key) {
499			delete(keyToLease, rkv.kstr)
500		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
501			keyToLease[rkv.kstr] = lid
502		} else {
503			delete(keyToLease, rkv.kstr)
504		}
505		kvc <- rkv
506	}
507}
508
509func (s *store) Close() error {
510	close(s.stopc)
511	s.fifoSched.Stop()
512	return nil
513}
514
515func (s *store) saveIndex(tx backend.BatchTx) {
516	if s.ci != nil {
517		s.ci.UnsafeSave(tx)
518	}
519}
520
521func (s *store) ConsistentIndex() uint64 {
522	if s.ci != nil {
523		return s.ci.ConsistentIndex()
524	}
525	return 0
526}
527
528func (s *store) setupMetricsReporter() {
529	b := s.b
530	reportDbTotalSizeInBytesMu.Lock()
531	reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
532	reportDbTotalSizeInBytesMu.Unlock()
533	reportDbTotalSizeInBytesDebugMu.Lock()
534	reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
535	reportDbTotalSizeInBytesDebugMu.Unlock()
536	reportDbTotalSizeInUseInBytesMu.Lock()
537	reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
538	reportDbTotalSizeInUseInBytesMu.Unlock()
539	reportDbOpenReadTxNMu.Lock()
540	reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
541	reportDbOpenReadTxNMu.Unlock()
542	reportCurrentRevMu.Lock()
543	reportCurrentRev = func() float64 {
544		s.revMu.RLock()
545		defer s.revMu.RUnlock()
546		return float64(s.currentRev)
547	}
548	reportCurrentRevMu.Unlock()
549	reportCompactRevMu.Lock()
550	reportCompactRev = func() float64 {
551		s.revMu.RLock()
552		defer s.revMu.RUnlock()
553		return float64(s.compactMainRev)
554	}
555	reportCompactRevMu.Unlock()
556}
557
558// appendMarkTombstone appends tombstone mark to normal revision bytes.
559func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
560	if len(b) != revBytesLen {
561		lg.Panic(
562			"cannot append tombstone mark to non-normal revision bytes",
563			zap.Int("expected-revision-bytes-size", revBytesLen),
564			zap.Int("given-revision-bytes-size", len(b)),
565		)
566	}
567	return append(b, markTombstone)
568}
569
570// isTombstone checks whether the revision bytes is a tombstone.
571func isTombstone(b []byte) bool {
572	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
573}
574