1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"context"
19	"encoding/binary"
20	"errors"
21	"fmt"
22	"hash/crc32"
23	"math"
24	"sync"
25	"sync/atomic"
26	"time"
27
28	"go.etcd.io/etcd/lease"
29	"go.etcd.io/etcd/mvcc/backend"
30	"go.etcd.io/etcd/mvcc/mvccpb"
31	"go.etcd.io/etcd/pkg/schedule"
32	"go.etcd.io/etcd/pkg/traceutil"
33
34	"github.com/coreos/pkg/capnslog"
35	"go.uber.org/zap"
36)
37
38var (
39	keyBucketName  = []byte("key")
40	metaBucketName = []byte("meta")
41
42	consistentIndexKeyName  = []byte("consistent_index")
43	scheduledCompactKeyName = []byte("scheduledCompactRev")
44	finishedCompactKeyName  = []byte("finishedCompactRev")
45
46	ErrCompacted = errors.New("mvcc: required revision has been compacted")
47	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
48	ErrCanceled  = errors.New("mvcc: watcher is canceled")
49	ErrClosed    = errors.New("mvcc: closed")
50
51	plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc")
52)
53
54const (
55	// markedRevBytesLen is the byte length of marked revision.
56	// The first `revBytesLen` bytes represents a normal revision. The last
57	// one byte is the mark.
58	markedRevBytesLen      = revBytesLen + 1
59	markBytePosition       = markedRevBytesLen - 1
60	markTombstone     byte = 't'
61)
62
63var restoreChunkKeys = 10000 // non-const for testing
64var defaultCompactBatchLimit = 1000
65
66// ConsistentIndexGetter is an interface that wraps the Get method.
67// Consistent index is the offset of an entry in a consistent replicated log.
68type ConsistentIndexGetter interface {
69	// ConsistentIndex returns the consistent index of current executing entry.
70	ConsistentIndex() uint64
71}
72
73type StoreConfig struct {
74	CompactionBatchLimit int
75}
76
77type store struct {
78	ReadView
79	WriteView
80
81	// consistentIndex caches the "consistent_index" key's value. Accessed
82	// through atomics so must be 64-bit aligned.
83	consistentIndex uint64
84
85	cfg StoreConfig
86
87	// mu read locks for txns and write locks for non-txn store changes.
88	mu sync.RWMutex
89
90	ig ConsistentIndexGetter
91
92	b       backend.Backend
93	kvindex index
94
95	le lease.Lessor
96
97	// revMuLock protects currentRev and compactMainRev.
98	// Locked at end of write txn and released after write txn unlock lock.
99	// Locked before locking read txn and released after locking.
100	revMu sync.RWMutex
101	// currentRev is the revision of the last completed transaction.
102	currentRev int64
103	// compactMainRev is the main revision of the last compaction.
104	compactMainRev int64
105
106	// bytesBuf8 is a byte slice of length 8
107	// to avoid a repetitive allocation in saveIndex.
108	bytesBuf8 []byte
109
110	fifoSched schedule.Scheduler
111
112	stopc chan struct{}
113
114	lg *zap.Logger
115}
116
117// NewStore returns a new store. It is useful to create a store inside
118// mvcc pkg. It should only be used for testing externally.
119func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
120	if cfg.CompactionBatchLimit == 0 {
121		cfg.CompactionBatchLimit = defaultCompactBatchLimit
122	}
123	s := &store{
124		cfg:     cfg,
125		b:       b,
126		ig:      ig,
127		kvindex: newTreeIndex(lg),
128
129		le: le,
130
131		currentRev:     1,
132		compactMainRev: -1,
133
134		bytesBuf8: make([]byte, 8),
135		fifoSched: schedule.NewFIFOScheduler(),
136
137		stopc: make(chan struct{}),
138
139		lg: lg,
140	}
141	s.ReadView = &readView{s}
142	s.WriteView = &writeView{s}
143	if s.le != nil {
144		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
145	}
146
147	tx := s.b.BatchTx()
148	tx.Lock()
149	tx.UnsafeCreateBucket(keyBucketName)
150	tx.UnsafeCreateBucket(metaBucketName)
151	tx.Unlock()
152	s.b.ForceCommit()
153
154	s.mu.Lock()
155	defer s.mu.Unlock()
156	if err := s.restore(); err != nil {
157		// TODO: return the error instead of panic here?
158		panic("failed to recover store from backend")
159	}
160
161	return s
162}
163
164func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
165	if ctx == nil || ctx.Err() != nil {
166		select {
167		case <-s.stopc:
168		default:
169			// fix deadlock in mvcc,for more information, please refer to pr 11817.
170			// s.stopc is only updated in restore operation, which is called by apply
171			// snapshot call, compaction and apply snapshot requests are serialized by
172			// raft, and do not happen at the same time.
173			s.mu.Lock()
174			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
175			s.fifoSched.Schedule(f)
176			s.mu.Unlock()
177		}
178		return
179	}
180	close(ch)
181}
182
183func (s *store) Hash() (hash uint32, revision int64, err error) {
184	start := time.Now()
185
186	s.b.ForceCommit()
187	h, err := s.b.Hash(DefaultIgnores)
188
189	hashSec.Observe(time.Since(start).Seconds())
190	return h, s.currentRev, err
191}
192
193func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
194	start := time.Now()
195
196	s.mu.RLock()
197	s.revMu.RLock()
198	compactRev, currentRev = s.compactMainRev, s.currentRev
199	s.revMu.RUnlock()
200
201	if rev > 0 && rev <= compactRev {
202		s.mu.RUnlock()
203		return 0, 0, compactRev, ErrCompacted
204	} else if rev > 0 && rev > currentRev {
205		s.mu.RUnlock()
206		return 0, currentRev, 0, ErrFutureRev
207	}
208
209	if rev == 0 {
210		rev = currentRev
211	}
212	keep := s.kvindex.Keep(rev)
213
214	tx := s.b.ReadTx()
215	tx.RLock()
216	defer tx.RUnlock()
217	s.mu.RUnlock()
218
219	upper := revision{main: rev + 1}
220	lower := revision{main: compactRev + 1}
221	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
222
223	h.Write(keyBucketName)
224	err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
225		kr := bytesToRev(k)
226		if !upper.GreaterThan(kr) {
227			return nil
228		}
229		// skip revisions that are scheduled for deletion
230		// due to compacting; don't skip if there isn't one.
231		if lower.GreaterThan(kr) && len(keep) > 0 {
232			if _, ok := keep[kr]; !ok {
233				return nil
234			}
235		}
236		h.Write(k)
237		h.Write(v)
238		return nil
239	})
240	hash = h.Sum32()
241
242	hashRevSec.Observe(time.Since(start).Seconds())
243	return hash, currentRev, compactRev, err
244}
245
246func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
247	s.revMu.Lock()
248	if rev <= s.compactMainRev {
249		ch := make(chan struct{})
250		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
251		s.fifoSched.Schedule(f)
252		s.revMu.Unlock()
253		return ch, ErrCompacted
254	}
255	if rev > s.currentRev {
256		s.revMu.Unlock()
257		return nil, ErrFutureRev
258	}
259
260	s.compactMainRev = rev
261
262	rbytes := newRevBytes()
263	revToBytes(revision{main: rev}, rbytes)
264
265	tx := s.b.BatchTx()
266	tx.Lock()
267	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
268	tx.Unlock()
269	// ensure that desired compaction is persisted
270	s.b.ForceCommit()
271
272	s.revMu.Unlock()
273
274	return nil, nil
275}
276
277func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
278	ch := make(chan struct{})
279	var j = func(ctx context.Context) {
280		if ctx.Err() != nil {
281			s.compactBarrier(ctx, ch)
282			return
283		}
284		start := time.Now()
285		keep := s.kvindex.Compact(rev)
286		indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
287		if !s.scheduleCompaction(rev, keep) {
288			s.compactBarrier(nil, ch)
289			return
290		}
291		close(ch)
292	}
293
294	s.fifoSched.Schedule(j)
295	trace.Step("schedule compaction")
296	return ch, nil
297}
298
299func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
300	ch, err := s.updateCompactRev(rev)
301	if nil != err {
302		return ch, err
303	}
304
305	return s.compact(traceutil.TODO(), rev)
306}
307
308func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
309	s.mu.Lock()
310
311	ch, err := s.updateCompactRev(rev)
312	trace.Step("check and update compact revision")
313	if err != nil {
314		s.mu.Unlock()
315		return ch, err
316	}
317	s.mu.Unlock()
318
319	return s.compact(trace, rev)
320}
321
322// DefaultIgnores is a map of keys to ignore in hash checking.
323var DefaultIgnores map[backend.IgnoreKey]struct{}
324
325func init() {
326	DefaultIgnores = map[backend.IgnoreKey]struct{}{
327		// consistent index might be changed due to v2 internal sync, which
328		// is not controllable by the user.
329		{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
330	}
331}
332
333func (s *store) Commit() {
334	s.mu.Lock()
335	defer s.mu.Unlock()
336
337	tx := s.b.BatchTx()
338	tx.Lock()
339	s.saveIndex(tx)
340	tx.Unlock()
341	s.b.ForceCommit()
342}
343
344func (s *store) Restore(b backend.Backend) error {
345	s.mu.Lock()
346	defer s.mu.Unlock()
347
348	close(s.stopc)
349	s.fifoSched.Stop()
350
351	atomic.StoreUint64(&s.consistentIndex, 0)
352	s.b = b
353	s.kvindex = newTreeIndex(s.lg)
354	s.currentRev = 1
355	s.compactMainRev = -1
356	s.fifoSched = schedule.NewFIFOScheduler()
357	s.stopc = make(chan struct{})
358
359	return s.restore()
360}
361
362func (s *store) restore() error {
363	s.setupMetricsReporter()
364
365	min, max := newRevBytes(), newRevBytes()
366	revToBytes(revision{main: 1}, min)
367	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
368
369	keyToLease := make(map[string]lease.LeaseID)
370
371	// restore index
372	tx := s.b.BatchTx()
373	tx.Lock()
374
375	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
376	if len(finishedCompactBytes) != 0 {
377		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
378
379		if s.lg != nil {
380			s.lg.Info(
381				"restored last compact revision",
382				zap.String("meta-bucket-name", string(metaBucketName)),
383				zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
384				zap.Int64("restored-compact-revision", s.compactMainRev),
385			)
386		} else {
387			plog.Printf("restore compact to %d", s.compactMainRev)
388		}
389	}
390	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
391	scheduledCompact := int64(0)
392	if len(scheduledCompactBytes) != 0 {
393		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
394	}
395
396	// index keys concurrently as they're loaded in from tx
397	keysGauge.Set(0)
398	rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
399	for {
400		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
401		if len(keys) == 0 {
402			break
403		}
404		// rkvc blocks if the total pending keys exceeds the restore
405		// chunk size to keep keys from consuming too much memory.
406		restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
407		if len(keys) < restoreChunkKeys {
408			// partial set implies final set
409			break
410		}
411		// next set begins after where this one ended
412		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
413		newMin.sub++
414		revToBytes(newMin, min)
415	}
416	close(rkvc)
417	s.currentRev = <-revc
418
419	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
420	// the correct revision should be set to compaction revision in the case, not the largest revision
421	// we have seen.
422	if s.currentRev < s.compactMainRev {
423		s.currentRev = s.compactMainRev
424	}
425	if scheduledCompact <= s.compactMainRev {
426		scheduledCompact = 0
427	}
428
429	for key, lid := range keyToLease {
430		if s.le == nil {
431			panic("no lessor to attach lease")
432		}
433		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
434		if err != nil {
435			if s.lg != nil {
436				s.lg.Warn(
437					"failed to attach a lease",
438					zap.String("lease-id", fmt.Sprintf("%016x", lid)),
439					zap.Error(err),
440				)
441			} else {
442				plog.Errorf("unexpected Attach error: %v", err)
443			}
444		}
445	}
446
447	tx.Unlock()
448
449	if scheduledCompact != 0 {
450		s.compactLockfree(scheduledCompact)
451
452		if s.lg != nil {
453			s.lg.Info(
454				"resume scheduled compaction",
455				zap.String("meta-bucket-name", string(metaBucketName)),
456				zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
457				zap.Int64("scheduled-compact-revision", scheduledCompact),
458			)
459		} else {
460			plog.Printf("resume scheduled compaction at %d", scheduledCompact)
461		}
462	}
463
464	return nil
465}
466
467type revKeyValue struct {
468	key  []byte
469	kv   mvccpb.KeyValue
470	kstr string
471}
472
473func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
474	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
475	go func() {
476		currentRev := int64(1)
477		defer func() { revc <- currentRev }()
478		// restore the tree index from streaming the unordered index.
479		kiCache := make(map[string]*keyIndex, restoreChunkKeys)
480		for rkv := range rkvc {
481			ki, ok := kiCache[rkv.kstr]
482			// purge kiCache if many keys but still missing in the cache
483			if !ok && len(kiCache) >= restoreChunkKeys {
484				i := 10
485				for k := range kiCache {
486					delete(kiCache, k)
487					if i--; i == 0 {
488						break
489					}
490				}
491			}
492			// cache miss, fetch from tree index if there
493			if !ok {
494				ki = &keyIndex{key: rkv.kv.Key}
495				if idxKey := idx.KeyIndex(ki); idxKey != nil {
496					kiCache[rkv.kstr], ki = idxKey, idxKey
497					ok = true
498				}
499			}
500			rev := bytesToRev(rkv.key)
501			currentRev = rev.main
502			if ok {
503				if isTombstone(rkv.key) {
504					ki.tombstone(lg, rev.main, rev.sub)
505					continue
506				}
507				ki.put(lg, rev.main, rev.sub)
508			} else if !isTombstone(rkv.key) {
509				ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
510				idx.Insert(ki)
511				kiCache[rkv.kstr] = ki
512			}
513		}
514	}()
515	return rkvc, revc
516}
517
518func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
519	for i, key := range keys {
520		rkv := revKeyValue{key: key}
521		if err := rkv.kv.Unmarshal(vals[i]); err != nil {
522			if lg != nil {
523				lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
524			} else {
525				plog.Fatalf("cannot unmarshal event: %v", err)
526			}
527		}
528		rkv.kstr = string(rkv.kv.Key)
529		if isTombstone(key) {
530			delete(keyToLease, rkv.kstr)
531		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
532			keyToLease[rkv.kstr] = lid
533		} else {
534			delete(keyToLease, rkv.kstr)
535		}
536		kvc <- rkv
537	}
538}
539
540func (s *store) Close() error {
541	close(s.stopc)
542	s.fifoSched.Stop()
543	return nil
544}
545
546func (s *store) saveIndex(tx backend.BatchTx) {
547	if s.ig == nil {
548		return
549	}
550	bs := s.bytesBuf8
551	ci := s.ig.ConsistentIndex()
552	binary.BigEndian.PutUint64(bs, ci)
553	// put the index into the underlying backend
554	// tx has been locked in TxnBegin, so there is no need to lock it again
555	tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
556	atomic.StoreUint64(&s.consistentIndex, ci)
557}
558
559func (s *store) ConsistentIndex() uint64 {
560	if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
561		return ci
562	}
563	tx := s.b.BatchTx()
564	tx.Lock()
565	defer tx.Unlock()
566	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
567	if len(vs) == 0 {
568		return 0
569	}
570	v := binary.BigEndian.Uint64(vs[0])
571	atomic.StoreUint64(&s.consistentIndex, v)
572	return v
573}
574
575func (s *store) setupMetricsReporter() {
576	b := s.b
577	reportDbTotalSizeInBytesMu.Lock()
578	reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
579	reportDbTotalSizeInBytesMu.Unlock()
580	reportDbTotalSizeInBytesDebugMu.Lock()
581	reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
582	reportDbTotalSizeInBytesDebugMu.Unlock()
583	reportDbTotalSizeInUseInBytesMu.Lock()
584	reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
585	reportDbTotalSizeInUseInBytesMu.Unlock()
586	reportDbOpenReadTxNMu.Lock()
587	reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
588	reportDbOpenReadTxNMu.Unlock()
589	reportCurrentRevMu.Lock()
590	reportCurrentRev = func() float64 {
591		s.revMu.RLock()
592		defer s.revMu.RUnlock()
593		return float64(s.currentRev)
594	}
595	reportCurrentRevMu.Unlock()
596	reportCompactRevMu.Lock()
597	reportCompactRev = func() float64 {
598		s.revMu.RLock()
599		defer s.revMu.RUnlock()
600		return float64(s.compactMainRev)
601	}
602	reportCompactRevMu.Unlock()
603}
604
605// appendMarkTombstone appends tombstone mark to normal revision bytes.
606func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
607	if len(b) != revBytesLen {
608		if lg != nil {
609			lg.Panic(
610				"cannot append tombstone mark to non-normal revision bytes",
611				zap.Int("expected-revision-bytes-size", revBytesLen),
612				zap.Int("given-revision-bytes-size", len(b)),
613			)
614		} else {
615			plog.Panicf("cannot append mark to non normal revision bytes")
616		}
617	}
618	return append(b, markTombstone)
619}
620
621// isTombstone checks whether the revision bytes is a tombstone.
622func isTombstone(b []byte) bool {
623	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
624}
625