1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"hash/crc32"
22	"math"
23	"sync"
24	"time"
25
26	"go.etcd.io/etcd/api/v3/mvccpb"
27	"go.etcd.io/etcd/pkg/v3/schedule"
28	"go.etcd.io/etcd/pkg/v3/traceutil"
29	"go.etcd.io/etcd/server/v3/etcdserver/cindex"
30	"go.etcd.io/etcd/server/v3/lease"
31	"go.etcd.io/etcd/server/v3/mvcc/backend"
32
33	"go.uber.org/zap"
34)
35
36var (
37	keyBucketName  = []byte("key")
38	metaBucketName = []byte("meta")
39
40	consistentIndexKeyName  = []byte("consistent_index")
41	scheduledCompactKeyName = []byte("scheduledCompactRev")
42	finishedCompactKeyName  = []byte("finishedCompactRev")
43
44	ErrCompacted = errors.New("mvcc: required revision has been compacted")
45	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
46	ErrCanceled  = errors.New("mvcc: watcher is canceled")
47)
48
49const (
50	// markedRevBytesLen is the byte length of marked revision.
51	// The first `revBytesLen` bytes represents a normal revision. The last
52	// one byte is the mark.
53	markedRevBytesLen      = revBytesLen + 1
54	markBytePosition       = markedRevBytesLen - 1
55	markTombstone     byte = 't'
56)
57
58var restoreChunkKeys = 10000 // non-const for testing
59var defaultCompactBatchLimit = 1000
60
61type StoreConfig struct {
62	CompactionBatchLimit int
63}
64
65type store struct {
66	ReadView
67	WriteView
68
69	cfg StoreConfig
70
71	// mu read locks for txns and write locks for non-txn store changes.
72	mu sync.RWMutex
73
74	ci cindex.ConsistentIndexer
75
76	b       backend.Backend
77	kvindex index
78
79	le lease.Lessor
80
81	// revMuLock protects currentRev and compactMainRev.
82	// Locked at end of write txn and released after write txn unlock lock.
83	// Locked before locking read txn and released after locking.
84	revMu sync.RWMutex
85	// currentRev is the revision of the last completed transaction.
86	currentRev int64
87	// compactMainRev is the main revision of the last compaction.
88	compactMainRev int64
89
90	fifoSched schedule.Scheduler
91
92	stopc chan struct{}
93
94	lg *zap.Logger
95}
96
97// NewStore returns a new store. It is useful to create a store inside
98// mvcc pkg. It should only be used for testing externally.
99func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store {
100	if lg == nil {
101		lg = zap.NewNop()
102	}
103	if cfg.CompactionBatchLimit == 0 {
104		cfg.CompactionBatchLimit = defaultCompactBatchLimit
105	}
106	s := &store{
107		cfg:     cfg,
108		b:       b,
109		ci:      ci,
110		kvindex: newTreeIndex(lg),
111
112		le: le,
113
114		currentRev:     1,
115		compactMainRev: -1,
116
117		fifoSched: schedule.NewFIFOScheduler(),
118
119		stopc: make(chan struct{}),
120
121		lg: lg,
122	}
123	s.ReadView = &readView{s}
124	s.WriteView = &writeView{s}
125	if s.le != nil {
126		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
127	}
128
129	tx := s.b.BatchTx()
130	tx.Lock()
131	tx.UnsafeCreateBucket(keyBucketName)
132	tx.UnsafeCreateBucket(metaBucketName)
133	tx.Unlock()
134	s.b.ForceCommit()
135
136	s.mu.Lock()
137	defer s.mu.Unlock()
138	if err := s.restore(); err != nil {
139		// TODO: return the error instead of panic here?
140		panic("failed to recover store from backend")
141	}
142
143	return s
144}
145
146func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
147	if ctx == nil || ctx.Err() != nil {
148		select {
149		case <-s.stopc:
150		default:
151			// fix deadlock in mvcc,for more information, please refer to pr 11817.
152			// s.stopc is only updated in restore operation, which is called by apply
153			// snapshot call, compaction and apply snapshot requests are serialized by
154			// raft, and do not happen at the same time.
155			s.mu.Lock()
156			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
157			s.fifoSched.Schedule(f)
158			s.mu.Unlock()
159		}
160		return
161	}
162	close(ch)
163}
164
165func (s *store) Hash() (hash uint32, revision int64, err error) {
166	// TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
167	start := time.Now()
168
169	s.b.ForceCommit()
170	h, err := s.b.Hash(DefaultIgnores)
171
172	hashSec.Observe(time.Since(start).Seconds())
173	return h, s.currentRev, err
174}
175
176func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
177	start := time.Now()
178
179	s.mu.RLock()
180	s.revMu.RLock()
181	compactRev, currentRev = s.compactMainRev, s.currentRev
182	s.revMu.RUnlock()
183
184	if rev > 0 && rev <= compactRev {
185		s.mu.RUnlock()
186		return 0, 0, compactRev, ErrCompacted
187	} else if rev > 0 && rev > currentRev {
188		s.mu.RUnlock()
189		return 0, currentRev, 0, ErrFutureRev
190	}
191
192	if rev == 0 {
193		rev = currentRev
194	}
195	keep := s.kvindex.Keep(rev)
196
197	tx := s.b.ReadTx()
198	tx.RLock()
199	defer tx.RUnlock()
200	s.mu.RUnlock()
201
202	upper := revision{main: rev + 1}
203	lower := revision{main: compactRev + 1}
204	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
205
206	h.Write(keyBucketName)
207	err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
208		kr := bytesToRev(k)
209		if !upper.GreaterThan(kr) {
210			return nil
211		}
212		// skip revisions that are scheduled for deletion
213		// due to compacting; don't skip if there isn't one.
214		if lower.GreaterThan(kr) && len(keep) > 0 {
215			if _, ok := keep[kr]; !ok {
216				return nil
217			}
218		}
219		h.Write(k)
220		h.Write(v)
221		return nil
222	})
223	hash = h.Sum32()
224
225	hashRevSec.Observe(time.Since(start).Seconds())
226	return hash, currentRev, compactRev, err
227}
228
229func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
230	s.revMu.Lock()
231	if rev <= s.compactMainRev {
232		ch := make(chan struct{})
233		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
234		s.fifoSched.Schedule(f)
235		s.revMu.Unlock()
236		return ch, ErrCompacted
237	}
238	if rev > s.currentRev {
239		s.revMu.Unlock()
240		return nil, ErrFutureRev
241	}
242
243	s.compactMainRev = rev
244
245	rbytes := newRevBytes()
246	revToBytes(revision{main: rev}, rbytes)
247
248	tx := s.b.BatchTx()
249	tx.Lock()
250	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
251	tx.Unlock()
252	// ensure that desired compaction is persisted
253	s.b.ForceCommit()
254
255	s.revMu.Unlock()
256
257	return nil, nil
258}
259
260func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
261	ch := make(chan struct{})
262	var j = func(ctx context.Context) {
263		if ctx.Err() != nil {
264			s.compactBarrier(ctx, ch)
265			return
266		}
267		start := time.Now()
268		keep := s.kvindex.Compact(rev)
269		indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
270		if !s.scheduleCompaction(rev, keep) {
271			s.compactBarrier(nil, ch)
272			return
273		}
274		close(ch)
275	}
276
277	s.fifoSched.Schedule(j)
278	trace.Step("schedule compaction")
279	return ch, nil
280}
281
282func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
283	ch, err := s.updateCompactRev(rev)
284	if err != nil {
285		return ch, err
286	}
287
288	return s.compact(traceutil.TODO(), rev)
289}
290
291func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
292	s.mu.Lock()
293
294	ch, err := s.updateCompactRev(rev)
295	trace.Step("check and update compact revision")
296	if err != nil {
297		s.mu.Unlock()
298		return ch, err
299	}
300	s.mu.Unlock()
301
302	return s.compact(trace, rev)
303}
304
305// DefaultIgnores is a map of keys to ignore in hash checking.
306var DefaultIgnores map[backend.IgnoreKey]struct{}
307
308func init() {
309	DefaultIgnores = map[backend.IgnoreKey]struct{}{
310		// consistent index might be changed due to v2 internal sync, which
311		// is not controllable by the user.
312		{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
313	}
314}
315
316func (s *store) Commit() {
317	s.mu.Lock()
318	defer s.mu.Unlock()
319
320	tx := s.b.BatchTx()
321	tx.Lock()
322	s.saveIndex(tx)
323	tx.Unlock()
324	s.b.ForceCommit()
325}
326
327func (s *store) Restore(b backend.Backend) error {
328	s.mu.Lock()
329	defer s.mu.Unlock()
330
331	close(s.stopc)
332	s.fifoSched.Stop()
333
334	s.b = b
335	s.kvindex = newTreeIndex(s.lg)
336
337	{
338		// During restore the metrics might report 'special' values
339		s.revMu.Lock()
340		s.currentRev = 1
341		s.compactMainRev = -1
342		s.revMu.Unlock()
343	}
344
345	s.fifoSched = schedule.NewFIFOScheduler()
346	s.stopc = make(chan struct{})
347	s.ci.SetBatchTx(b.BatchTx())
348	s.ci.SetConsistentIndex(0)
349
350	return s.restore()
351}
352
353func (s *store) restore() error {
354	s.setupMetricsReporter()
355
356	min, max := newRevBytes(), newRevBytes()
357	revToBytes(revision{main: 1}, min)
358	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
359
360	keyToLease := make(map[string]lease.LeaseID)
361
362	// restore index
363	tx := s.b.BatchTx()
364	tx.Lock()
365
366	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
367	if len(finishedCompactBytes) != 0 {
368		s.revMu.Lock()
369		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
370
371		s.lg.Info(
372			"restored last compact revision",
373			zap.String("meta-bucket-name", string(metaBucketName)),
374			zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
375			zap.Int64("restored-compact-revision", s.compactMainRev),
376		)
377		s.revMu.Unlock()
378	}
379	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
380	scheduledCompact := int64(0)
381	if len(scheduledCompactBytes) != 0 {
382		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
383	}
384
385	// index keys concurrently as they're loaded in from tx
386	keysGauge.Set(0)
387	rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
388	for {
389		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
390		if len(keys) == 0 {
391			break
392		}
393		// rkvc blocks if the total pending keys exceeds the restore
394		// chunk size to keep keys from consuming too much memory.
395		restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
396		if len(keys) < restoreChunkKeys {
397			// partial set implies final set
398			break
399		}
400		// next set begins after where this one ended
401		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
402		newMin.sub++
403		revToBytes(newMin, min)
404	}
405	close(rkvc)
406
407	{
408		s.revMu.Lock()
409		s.currentRev = <-revc
410
411		// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
412		// the correct revision should be set to compaction revision in the case, not the largest revision
413		// we have seen.
414		if s.currentRev < s.compactMainRev {
415			s.currentRev = s.compactMainRev
416		}
417		s.revMu.Unlock()
418	}
419
420	if scheduledCompact <= s.compactMainRev {
421		scheduledCompact = 0
422	}
423
424	for key, lid := range keyToLease {
425		if s.le == nil {
426			tx.Unlock()
427			panic("no lessor to attach lease")
428		}
429		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
430		if err != nil {
431			s.lg.Error(
432				"failed to attach a lease",
433				zap.String("lease-id", fmt.Sprintf("%016x", lid)),
434				zap.Error(err),
435			)
436		}
437	}
438
439	tx.Unlock()
440
441	if scheduledCompact != 0 {
442		if _, err := s.compactLockfree(scheduledCompact); err != nil {
443			s.lg.Warn("compaction encountered error", zap.Error(err))
444		}
445
446		s.lg.Info(
447			"resume scheduled compaction",
448			zap.String("meta-bucket-name", string(metaBucketName)),
449			zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
450			zap.Int64("scheduled-compact-revision", scheduledCompact),
451		)
452	}
453
454	return nil
455}
456
457type revKeyValue struct {
458	key  []byte
459	kv   mvccpb.KeyValue
460	kstr string
461}
462
463func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
464	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
465	go func() {
466		currentRev := int64(1)
467		defer func() { revc <- currentRev }()
468		// restore the tree index from streaming the unordered index.
469		kiCache := make(map[string]*keyIndex, restoreChunkKeys)
470		for rkv := range rkvc {
471			ki, ok := kiCache[rkv.kstr]
472			// purge kiCache if many keys but still missing in the cache
473			if !ok && len(kiCache) >= restoreChunkKeys {
474				i := 10
475				for k := range kiCache {
476					delete(kiCache, k)
477					if i--; i == 0 {
478						break
479					}
480				}
481			}
482			// cache miss, fetch from tree index if there
483			if !ok {
484				ki = &keyIndex{key: rkv.kv.Key}
485				if idxKey := idx.KeyIndex(ki); idxKey != nil {
486					kiCache[rkv.kstr], ki = idxKey, idxKey
487					ok = true
488				}
489			}
490			rev := bytesToRev(rkv.key)
491			currentRev = rev.main
492			if ok {
493				if isTombstone(rkv.key) {
494					if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
495						lg.Warn("tombstone encountered error", zap.Error(err))
496					}
497					continue
498				}
499				ki.put(lg, rev.main, rev.sub)
500			} else if !isTombstone(rkv.key) {
501				ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
502				idx.Insert(ki)
503				kiCache[rkv.kstr] = ki
504			}
505		}
506	}()
507	return rkvc, revc
508}
509
510func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
511	for i, key := range keys {
512		rkv := revKeyValue{key: key}
513		if err := rkv.kv.Unmarshal(vals[i]); err != nil {
514			lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
515		}
516		rkv.kstr = string(rkv.kv.Key)
517		if isTombstone(key) {
518			delete(keyToLease, rkv.kstr)
519		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
520			keyToLease[rkv.kstr] = lid
521		} else {
522			delete(keyToLease, rkv.kstr)
523		}
524		kvc <- rkv
525	}
526}
527
528func (s *store) Close() error {
529	close(s.stopc)
530	s.fifoSched.Stop()
531	return nil
532}
533
534func (s *store) saveIndex(tx backend.BatchTx) {
535	if s.ci != nil {
536		s.ci.UnsafeSave(tx)
537	}
538}
539
540func (s *store) ConsistentIndex() uint64 {
541	if s.ci != nil {
542		return s.ci.ConsistentIndex()
543	}
544	return 0
545}
546
547func (s *store) setupMetricsReporter() {
548	b := s.b
549	reportDbTotalSizeInBytesMu.Lock()
550	reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
551	reportDbTotalSizeInBytesMu.Unlock()
552	reportDbTotalSizeInBytesDebugMu.Lock()
553	reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
554	reportDbTotalSizeInBytesDebugMu.Unlock()
555	reportDbTotalSizeInUseInBytesMu.Lock()
556	reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
557	reportDbTotalSizeInUseInBytesMu.Unlock()
558	reportDbOpenReadTxNMu.Lock()
559	reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
560	reportDbOpenReadTxNMu.Unlock()
561	reportCurrentRevMu.Lock()
562	reportCurrentRev = func() float64 {
563		s.revMu.RLock()
564		defer s.revMu.RUnlock()
565		return float64(s.currentRev)
566	}
567	reportCurrentRevMu.Unlock()
568	reportCompactRevMu.Lock()
569	reportCompactRev = func() float64 {
570		s.revMu.RLock()
571		defer s.revMu.RUnlock()
572		return float64(s.compactMainRev)
573	}
574	reportCompactRevMu.Unlock()
575}
576
577// appendMarkTombstone appends tombstone mark to normal revision bytes.
578func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
579	if len(b) != revBytesLen {
580		lg.Panic(
581			"cannot append tombstone mark to non-normal revision bytes",
582			zap.Int("expected-revision-bytes-size", revBytesLen),
583			zap.Int("given-revision-bytes-size", len(b)),
584		)
585	}
586	return append(b, markTombstone)
587}
588
589// isTombstone checks whether the revision bytes is a tombstone.
590func isTombstone(b []byte) bool {
591	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
592}
593