1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"encoding/binary"
19	"errors"
20	"math"
21	"math/rand"
22	"sync"
23	"time"
24
25	"github.com/coreos/etcd/lease"
26	"github.com/coreos/etcd/mvcc/backend"
27	"github.com/coreos/etcd/mvcc/mvccpb"
28	"github.com/coreos/etcd/pkg/schedule"
29	"github.com/coreos/pkg/capnslog"
30	"golang.org/x/net/context"
31)
32
33var (
34	keyBucketName  = []byte("key")
35	metaBucketName = []byte("meta")
36
37	// markedRevBytesLen is the byte length of marked revision.
38	// The first `revBytesLen` bytes represents a normal revision. The last
39	// one byte is the mark.
40	markedRevBytesLen      = revBytesLen + 1
41	markBytePosition       = markedRevBytesLen - 1
42	markTombstone     byte = 't'
43
44	consistentIndexKeyName  = []byte("consistent_index")
45	scheduledCompactKeyName = []byte("scheduledCompactRev")
46	finishedCompactKeyName  = []byte("finishedCompactRev")
47
48	ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch")
49	ErrCompacted     = errors.New("mvcc: required revision has been compacted")
50	ErrFutureRev     = errors.New("mvcc: required revision is a future revision")
51	ErrCanceled      = errors.New("mvcc: watcher is canceled")
52
53	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
54)
55
56// ConsistentIndexGetter is an interface that wraps the Get method.
57// Consistent index is the offset of an entry in a consistent replicated log.
58type ConsistentIndexGetter interface {
59	// ConsistentIndex returns the consistent index of current executing entry.
60	ConsistentIndex() uint64
61}
62
63type store struct {
64	mu sync.Mutex // guards the following
65
66	ig ConsistentIndexGetter
67
68	b       backend.Backend
69	kvindex index
70
71	le lease.Lessor
72
73	currentRev revision
74	// the main revision of the last compaction
75	compactMainRev int64
76
77	tx        backend.BatchTx
78	txnID     int64 // tracks the current txnID to verify txn operations
79	txnModify bool
80
81	// bytesBuf8 is a byte slice of length 8
82	// to avoid a repetitive allocation in saveIndex.
83	bytesBuf8 []byte
84
85	changes   []mvccpb.KeyValue
86	fifoSched schedule.Scheduler
87
88	stopc chan struct{}
89}
90
91// NewStore returns a new store. It is useful to create a store inside
92// mvcc pkg. It should only be used for testing externally.
93func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
94	s := &store{
95		b:       b,
96		ig:      ig,
97		kvindex: newTreeIndex(),
98
99		le: le,
100
101		currentRev:     revision{main: 1},
102		compactMainRev: -1,
103
104		bytesBuf8: make([]byte, 8, 8),
105		fifoSched: schedule.NewFIFOScheduler(),
106
107		stopc: make(chan struct{}),
108	}
109
110	if s.le != nil {
111		s.le.SetRangeDeleter(s)
112	}
113
114	tx := s.b.BatchTx()
115	tx.Lock()
116	tx.UnsafeCreateBucket(keyBucketName)
117	tx.UnsafeCreateBucket(metaBucketName)
118	tx.Unlock()
119	s.b.ForceCommit()
120
121	if err := s.restore(); err != nil {
122		// TODO: return the error instead of panic here?
123		panic("failed to recover store from backend")
124	}
125
126	return s
127}
128
129func (s *store) Rev() int64 {
130	s.mu.Lock()
131	defer s.mu.Unlock()
132
133	return s.currentRev.main
134}
135
136func (s *store) FirstRev() int64 {
137	s.mu.Lock()
138	defer s.mu.Unlock()
139
140	return s.compactMainRev
141}
142
143func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
144	id := s.TxnBegin()
145	s.put(key, value, lease)
146	s.txnEnd(id)
147
148	putCounter.Inc()
149
150	return int64(s.currentRev.main)
151}
152
153func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
154	id := s.TxnBegin()
155	kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
156	s.txnEnd(id)
157
158	rangeCounter.Inc()
159
160	r = &RangeResult{
161		KVs:   kvs,
162		Count: count,
163		Rev:   rev,
164	}
165
166	return r, err
167}
168
169func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
170	id := s.TxnBegin()
171	n = s.deleteRange(key, end)
172	s.txnEnd(id)
173
174	deleteCounter.Inc()
175
176	return n, int64(s.currentRev.main)
177}
178
179func (s *store) TxnBegin() int64 {
180	s.mu.Lock()
181	s.currentRev.sub = 0
182	s.tx = s.b.BatchTx()
183	s.tx.Lock()
184
185	s.txnID = rand.Int63()
186	return s.txnID
187}
188
189func (s *store) TxnEnd(txnID int64) error {
190	err := s.txnEnd(txnID)
191	if err != nil {
192		return err
193	}
194
195	txnCounter.Inc()
196	return nil
197}
198
199// txnEnd is used for unlocking an internal txn. It does
200// not increase the txnCounter.
201func (s *store) txnEnd(txnID int64) error {
202	if txnID != s.txnID {
203		return ErrTxnIDMismatch
204	}
205
206	// only update index if the txn modifies the mvcc state.
207	// read only txn might execute with one write txn concurrently,
208	// it should not write its index to mvcc.
209	if s.txnModify {
210		s.saveIndex()
211	}
212	s.txnModify = false
213
214	s.tx.Unlock()
215	if s.currentRev.sub != 0 {
216		s.currentRev.main += 1
217	}
218	s.currentRev.sub = 0
219
220	s.mu.Unlock()
221	return nil
222}
223
224func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
225	if txnID != s.txnID {
226		return nil, ErrTxnIDMismatch
227	}
228
229	kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count)
230
231	r = &RangeResult{
232		KVs:   kvs,
233		Count: count,
234		Rev:   rev,
235	}
236	return r, err
237}
238
239func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
240	if txnID != s.txnID {
241		return 0, ErrTxnIDMismatch
242	}
243
244	s.put(key, value, lease)
245	return int64(s.currentRev.main + 1), nil
246}
247
248func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
249	if txnID != s.txnID {
250		return 0, 0, ErrTxnIDMismatch
251	}
252
253	n = s.deleteRange(key, end)
254	if n != 0 || s.currentRev.sub != 0 {
255		rev = int64(s.currentRev.main + 1)
256	} else {
257		rev = int64(s.currentRev.main)
258	}
259	return n, rev, nil
260}
261
262func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
263	if ctx == nil || ctx.Err() != nil {
264		s.mu.Lock()
265		select {
266		case <-s.stopc:
267		default:
268			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
269			s.fifoSched.Schedule(f)
270		}
271		s.mu.Unlock()
272		return
273	}
274	close(ch)
275}
276
277func (s *store) Compact(rev int64) (<-chan struct{}, error) {
278	s.mu.Lock()
279	defer s.mu.Unlock()
280	if rev <= s.compactMainRev {
281		ch := make(chan struct{})
282		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
283		s.fifoSched.Schedule(f)
284		return ch, ErrCompacted
285	}
286	if rev > s.currentRev.main {
287		return nil, ErrFutureRev
288	}
289
290	start := time.Now()
291
292	s.compactMainRev = rev
293
294	rbytes := newRevBytes()
295	revToBytes(revision{main: rev}, rbytes)
296
297	tx := s.b.BatchTx()
298	tx.Lock()
299	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
300	tx.Unlock()
301	// ensure that desired compaction is persisted
302	s.b.ForceCommit()
303
304	keep := s.kvindex.Compact(rev)
305	ch := make(chan struct{})
306	var j = func(ctx context.Context) {
307		if ctx.Err() != nil {
308			s.compactBarrier(ctx, ch)
309			return
310		}
311		if !s.scheduleCompaction(rev, keep) {
312			s.compactBarrier(nil, ch)
313			return
314		}
315		close(ch)
316	}
317
318	s.fifoSched.Schedule(j)
319
320	indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
321	return ch, nil
322}
323
324// DefaultIgnores is a map of keys to ignore in hash checking.
325var DefaultIgnores map[backend.IgnoreKey]struct{}
326
327func init() {
328	DefaultIgnores = map[backend.IgnoreKey]struct{}{
329		// consistent index might be changed due to v2 internal sync, which
330		// is not controllable by the user.
331		{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
332	}
333}
334
335func (s *store) Hash() (uint32, int64, error) {
336	s.mu.Lock()
337	defer s.mu.Unlock()
338
339	start := time.Now()
340
341	s.b.ForceCommit()
342
343	h, err := s.b.Hash(DefaultIgnores)
344
345	hashDurations.Observe(time.Since(start).Seconds())
346	rev := s.currentRev.main
347	return h, rev, err
348}
349
350func (s *store) Commit() {
351	s.mu.Lock()
352	defer s.mu.Unlock()
353
354	s.tx = s.b.BatchTx()
355	s.tx.Lock()
356	s.saveIndex()
357	s.tx.Unlock()
358	s.b.ForceCommit()
359}
360
361func (s *store) Restore(b backend.Backend) error {
362	s.mu.Lock()
363	defer s.mu.Unlock()
364
365	close(s.stopc)
366	s.fifoSched.Stop()
367
368	s.b = b
369	s.kvindex = newTreeIndex()
370	s.currentRev = revision{main: 1}
371	s.compactMainRev = -1
372	s.tx = b.BatchTx()
373	s.txnID = -1
374	s.fifoSched = schedule.NewFIFOScheduler()
375	s.stopc = make(chan struct{})
376
377	return s.restore()
378}
379
380func (s *store) restore() error {
381	reportDbTotalSizeInBytesMu.Lock()
382	reportDbTotalSizeInBytes = func() float64 { return float64(s.b.Size()) }
383	reportDbTotalSizeInBytesMu.Unlock()
384	reportDbTotalSizeInUseInBytesMu.Lock()
385	reportDbTotalSizeInUseInBytes = func() float64 { return float64(s.b.SizeInUse()) }
386	reportDbTotalSizeInUseInBytesMu.Unlock()
387
388	min, max := newRevBytes(), newRevBytes()
389	revToBytes(revision{main: 1}, min)
390	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
391
392	keyToLease := make(map[string]lease.LeaseID)
393
394	// use an unordered map to hold the temp index data to speed up
395	// the initial key index recovery.
396	// we will convert this unordered map into the tree index later.
397	unordered := make(map[string]*keyIndex, 100000)
398
399	// restore index
400	tx := s.b.BatchTx()
401	tx.Lock()
402	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
403	if len(finishedCompactBytes) != 0 {
404		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
405		plog.Printf("restore compact to %d", s.compactMainRev)
406	}
407
408	// TODO: limit N to reduce max memory usage
409	keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
410	for i, key := range keys {
411		var kv mvccpb.KeyValue
412		if err := kv.Unmarshal(vals[i]); err != nil {
413			plog.Fatalf("cannot unmarshal event: %v", err)
414		}
415
416		rev := bytesToRev(key[:revBytesLen])
417
418		// restore index
419		switch {
420		case isTombstone(key):
421			if ki, ok := unordered[string(kv.Key)]; ok {
422				ki.tombstone(rev.main, rev.sub)
423			}
424			delete(keyToLease, string(kv.Key))
425
426		default:
427			ki, ok := unordered[string(kv.Key)]
428			if ok {
429				ki.put(rev.main, rev.sub)
430			} else {
431				ki = &keyIndex{key: kv.Key}
432				ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
433				unordered[string(kv.Key)] = ki
434			}
435
436			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
437				keyToLease[string(kv.Key)] = lid
438			} else {
439				delete(keyToLease, string(kv.Key))
440			}
441		}
442
443		// update revision
444		s.currentRev = rev
445	}
446
447	// restore the tree index from the unordered index.
448	for _, v := range unordered {
449		s.kvindex.Insert(v)
450	}
451
452	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
453	// the correct revision should be set to compaction revision in the case, not the largest revision
454	// we have seen.
455	if s.currentRev.main < s.compactMainRev {
456		s.currentRev.main = s.compactMainRev
457	}
458
459	for key, lid := range keyToLease {
460		if s.le == nil {
461			panic("no lessor to attach lease")
462		}
463		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
464		if err != nil {
465			plog.Errorf("unexpected Attach error: %v", err)
466		}
467	}
468
469	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
470	scheduledCompact := int64(0)
471	if len(scheduledCompactBytes) != 0 {
472		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
473		if scheduledCompact <= s.compactMainRev {
474			scheduledCompact = 0
475		}
476	}
477
478	tx.Unlock()
479
480	if scheduledCompact != 0 {
481		s.Compact(scheduledCompact)
482		plog.Printf("resume scheduled compaction at %d", scheduledCompact)
483	}
484
485	return nil
486}
487
488func (s *store) Close() error {
489	close(s.stopc)
490	s.fifoSched.Stop()
491	return nil
492}
493
494func (a *store) Equal(b *store) bool {
495	if a.currentRev != b.currentRev {
496		return false
497	}
498	if a.compactMainRev != b.compactMainRev {
499		return false
500	}
501	return a.kvindex.Equal(b.kvindex)
502}
503
504// range is a keyword in Go, add Keys suffix.
505func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) {
506	curRev = int64(s.currentRev.main)
507	if s.currentRev.sub > 0 {
508		curRev += 1
509	}
510
511	if rangeRev > curRev {
512		return nil, -1, s.currentRev.main, ErrFutureRev
513	}
514	var rev int64
515	if rangeRev <= 0 {
516		rev = curRev
517	} else {
518		rev = rangeRev
519	}
520	if rev < s.compactMainRev {
521		return nil, -1, 0, ErrCompacted
522	}
523
524	_, revpairs := s.kvindex.Range(key, end, int64(rev))
525	if len(revpairs) == 0 {
526		return nil, 0, curRev, nil
527	}
528	if countOnly {
529		return nil, len(revpairs), curRev, nil
530	}
531
532	for _, revpair := range revpairs {
533		start, end := revBytesRange(revpair)
534
535		_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
536		if len(vs) != 1 {
537			plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
538		}
539
540		var kv mvccpb.KeyValue
541		if err := kv.Unmarshal(vs[0]); err != nil {
542			plog.Fatalf("cannot unmarshal event: %v", err)
543		}
544		kvs = append(kvs, kv)
545		if limit > 0 && len(kvs) >= int(limit) {
546			break
547		}
548	}
549	return kvs, len(revpairs), curRev, nil
550}
551
552func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
553	s.txnModify = true
554
555	rev := s.currentRev.main + 1
556	c := rev
557	oldLease := lease.NoLease
558
559	// if the key exists before, use its previous created and
560	// get its previous leaseID
561	_, created, ver, err := s.kvindex.Get(key, rev)
562	if err == nil {
563		c = created.main
564		oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)})
565	}
566
567	ibytes := newRevBytes()
568	revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
569
570	ver = ver + 1
571	kv := mvccpb.KeyValue{
572		Key:            key,
573		Value:          value,
574		CreateRevision: c,
575		ModRevision:    rev,
576		Version:        ver,
577		Lease:          int64(leaseID),
578	}
579
580	d, err := kv.Marshal()
581	if err != nil {
582		plog.Fatalf("cannot marshal event: %v", err)
583	}
584
585	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
586	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
587	s.changes = append(s.changes, kv)
588	s.currentRev.sub += 1
589
590	if oldLease != lease.NoLease {
591		if s.le == nil {
592			panic("no lessor to detach lease")
593		}
594
595		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
596		if err != nil {
597			plog.Errorf("unexpected error from lease detach: %v", err)
598		}
599	}
600
601	if leaseID != lease.NoLease {
602		if s.le == nil {
603			panic("no lessor to attach lease")
604		}
605
606		err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
607		if err != nil {
608			panic("unexpected error from lease Attach")
609		}
610	}
611}
612
613func (s *store) deleteRange(key, end []byte) int64 {
614	s.txnModify = true
615
616	rrev := s.currentRev.main
617	if s.currentRev.sub > 0 {
618		rrev += 1
619	}
620	keys, revs := s.kvindex.Range(key, end, rrev)
621
622	if len(keys) == 0 {
623		return 0
624	}
625
626	for i, key := range keys {
627		s.delete(key, revs[i])
628	}
629	return int64(len(keys))
630}
631
632func (s *store) delete(key []byte, rev revision) {
633	mainrev := s.currentRev.main + 1
634
635	ibytes := newRevBytes()
636	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
637	ibytes = appendMarkTombstone(ibytes)
638
639	kv := mvccpb.KeyValue{
640		Key: key,
641	}
642
643	d, err := kv.Marshal()
644	if err != nil {
645		plog.Fatalf("cannot marshal event: %v", err)
646	}
647
648	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
649	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
650	if err != nil {
651		plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
652	}
653	s.changes = append(s.changes, kv)
654	s.currentRev.sub += 1
655
656	item := lease.LeaseItem{Key: string(key)}
657	leaseID := s.le.GetLease(item)
658
659	if leaseID != lease.NoLease {
660		err = s.le.Detach(leaseID, []lease.LeaseItem{item})
661		if err != nil {
662			plog.Errorf("cannot detach %v", err)
663		}
664	}
665}
666
667func (s *store) getChanges() []mvccpb.KeyValue {
668	changes := s.changes
669	s.changes = make([]mvccpb.KeyValue, 0, 4)
670	return changes
671}
672
673func (s *store) saveIndex() {
674	if s.ig == nil {
675		return
676	}
677	tx := s.tx
678	bs := s.bytesBuf8
679	binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
680	// put the index into the underlying backend
681	// tx has been locked in TxnBegin, so there is no need to lock it again
682	tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
683}
684
685func (s *store) ConsistentIndex() uint64 {
686	// TODO: cache index in a uint64 field?
687	tx := s.b.BatchTx()
688	tx.Lock()
689	defer tx.Unlock()
690	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
691	if len(vs) == 0 {
692		return 0
693	}
694	return binary.BigEndian.Uint64(vs[0])
695}
696
697// appendMarkTombstone appends tombstone mark to normal revision bytes.
698func appendMarkTombstone(b []byte) []byte {
699	if len(b) != revBytesLen {
700		plog.Panicf("cannot append mark to non normal revision bytes")
701	}
702	return append(b, markTombstone)
703}
704
705// isTombstone checks whether the revision bytes is a tombstone.
706func isTombstone(b []byte) bool {
707	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
708}
709
710// revBytesRange returns the range of revision bytes at
711// the given revision.
712func revBytesRange(rev revision) (start, end []byte) {
713	start = newRevBytes()
714	revToBytes(rev, start)
715
716	end = newRevBytes()
717	endRev := revision{main: rev.main, sub: rev.sub + 1}
718	revToBytes(endRev, end)
719
720	return start, end
721}
722