1// Copyright 2015 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"errors"
19	"log"
20	"math"
21	"math/rand"
22	"sync"
23	"time"
24
25	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
26	"github.com/coreos/etcd/lease"
27	"github.com/coreos/etcd/pkg/schedule"
28	"github.com/coreos/etcd/storage/backend"
29	"github.com/coreos/etcd/storage/storagepb"
30)
31
32var (
33	keyBucketName  = []byte("key")
34	metaBucketName = []byte("meta")
35
36	// markedRevBytesLen is the byte length of marked revision.
37	// The first `revBytesLen` bytes represents a normal revision. The last
38	// one byte is the mark.
39	markedRevBytesLen      = revBytesLen + 1
40	markBytePosition       = markedRevBytesLen - 1
41	markTombstone     byte = 't'
42
43	scheduledCompactKeyName = []byte("scheduledCompactRev")
44	finishedCompactKeyName  = []byte("finishedCompactRev")
45
46	ErrTxnIDMismatch = errors.New("storage: txn id mismatch")
47	ErrCompacted     = errors.New("storage: required revision has been compacted")
48	ErrFutureRev     = errors.New("storage: required revision is a future revision")
49	ErrCanceled      = errors.New("storage: watcher is canceled")
50)
51
52type store struct {
53	mu sync.Mutex // guards the following
54
55	b       backend.Backend
56	kvindex index
57
58	le lease.Lessor
59
60	currentRev revision
61	// the main revision of the last compaction
62	compactMainRev int64
63
64	tx    backend.BatchTx
65	txnID int64 // tracks the current txnID to verify txn operations
66
67	changes   []storagepb.KeyValue
68	fifoSched schedule.Scheduler
69
70	stopc chan struct{}
71}
72
73// NewStore returns a new store. It is useful to create a store inside
74// storage pkg. It should only be used for testing externally.
75func NewStore(b backend.Backend, le lease.Lessor) *store {
76	s := &store{
77		b:       b,
78		kvindex: newTreeIndex(),
79
80		le: le,
81
82		currentRev:     revision{main: 1},
83		compactMainRev: -1,
84
85		fifoSched: schedule.NewFIFOScheduler(),
86
87		stopc: make(chan struct{}),
88	}
89
90	if s.le != nil {
91		s.le.SetRangeDeleter(s)
92	}
93
94	tx := s.b.BatchTx()
95	tx.Lock()
96	tx.UnsafeCreateBucket(keyBucketName)
97	tx.UnsafeCreateBucket(metaBucketName)
98	tx.Unlock()
99	s.b.ForceCommit()
100
101	if err := s.restore(); err != nil {
102		// TODO: return the error instead of panic here?
103		panic("failed to recover store from backend")
104	}
105
106	return s
107}
108
109func (s *store) Rev() int64 {
110	s.mu.Lock()
111	defer s.mu.Unlock()
112
113	return s.currentRev.main
114}
115
116func (s *store) FirstRev() int64 {
117	s.mu.Lock()
118	defer s.mu.Unlock()
119
120	return s.compactMainRev
121}
122
123func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 {
124	id := s.TxnBegin()
125	s.put(key, value, lease)
126	s.txnEnd(id)
127
128	putCounter.Inc()
129
130	return int64(s.currentRev.main)
131}
132
133func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
134	id := s.TxnBegin()
135	kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
136	s.txnEnd(id)
137
138	rangeCounter.Inc()
139
140	return kvs, rev, err
141}
142
143func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
144	id := s.TxnBegin()
145	n = s.deleteRange(key, end)
146	s.txnEnd(id)
147
148	deleteCounter.Inc()
149
150	return n, int64(s.currentRev.main)
151}
152
153func (s *store) TxnBegin() int64 {
154	s.mu.Lock()
155	s.currentRev.sub = 0
156	s.tx = s.b.BatchTx()
157	s.tx.Lock()
158
159	s.txnID = rand.Int63()
160	return s.txnID
161}
162
163func (s *store) TxnEnd(txnID int64) error {
164	err := s.txnEnd(txnID)
165	if err != nil {
166		return err
167	}
168
169	txnCounter.Inc()
170	return nil
171}
172
173// txnEnd is used for unlocking an internal txn. It does
174// not increase the txnCounter.
175func (s *store) txnEnd(txnID int64) error {
176	if txnID != s.txnID {
177		return ErrTxnIDMismatch
178	}
179
180	s.tx.Unlock()
181	if s.currentRev.sub != 0 {
182		s.currentRev.main += 1
183	}
184	s.currentRev.sub = 0
185
186	dbTotalSize.Set(float64(s.b.Size()))
187	s.mu.Unlock()
188	return nil
189}
190
191func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
192	if txnID != s.txnID {
193		return nil, 0, ErrTxnIDMismatch
194	}
195	return s.rangeKeys(key, end, limit, rangeRev)
196}
197
198func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
199	if txnID != s.txnID {
200		return 0, ErrTxnIDMismatch
201	}
202
203	s.put(key, value, lease)
204	return int64(s.currentRev.main + 1), nil
205}
206
207func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
208	if txnID != s.txnID {
209		return 0, 0, ErrTxnIDMismatch
210	}
211
212	n = s.deleteRange(key, end)
213	if n != 0 || s.currentRev.sub != 0 {
214		rev = int64(s.currentRev.main + 1)
215	} else {
216		rev = int64(s.currentRev.main)
217	}
218	return n, rev, nil
219}
220
221func (s *store) Compact(rev int64) error {
222	s.mu.Lock()
223	defer s.mu.Unlock()
224	if rev <= s.compactMainRev {
225		return ErrCompacted
226	}
227	if rev > s.currentRev.main {
228		return ErrFutureRev
229	}
230
231	start := time.Now()
232
233	s.compactMainRev = rev
234
235	rbytes := newRevBytes()
236	revToBytes(revision{main: rev}, rbytes)
237
238	tx := s.b.BatchTx()
239	tx.Lock()
240	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
241	tx.Unlock()
242	// ensure that desired compaction is persisted
243	s.b.ForceCommit()
244
245	keep := s.kvindex.Compact(rev)
246
247	var j = func(ctx context.Context) {
248		select {
249		case <-ctx.Done():
250			return
251		default:
252		}
253		s.scheduleCompaction(rev, keep)
254	}
255
256	s.fifoSched.Schedule(j)
257
258	indexCompactionPauseDurations.Observe(float64(time.Now().Sub(start) / time.Millisecond))
259	return nil
260}
261
262func (s *store) Hash() (uint32, error) {
263	s.b.ForceCommit()
264	return s.b.Hash()
265}
266
267func (s *store) Commit() { s.b.ForceCommit() }
268
269func (s *store) Restore(b backend.Backend) error {
270	s.mu.Lock()
271	defer s.mu.Unlock()
272
273	close(s.stopc)
274	s.fifoSched.Stop()
275
276	s.b = b
277	s.kvindex = newTreeIndex()
278	s.currentRev = revision{main: 1}
279	s.compactMainRev = -1
280	s.tx = b.BatchTx()
281	s.txnID = -1
282	s.fifoSched = schedule.NewFIFOScheduler()
283	s.stopc = make(chan struct{})
284
285	return s.restore()
286}
287
288func (s *store) restore() error {
289	min, max := newRevBytes(), newRevBytes()
290	revToBytes(revision{main: 1}, min)
291	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
292
293	// restore index
294	tx := s.b.BatchTx()
295	tx.Lock()
296	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
297	if len(finishedCompactBytes) != 0 {
298		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
299		log.Printf("storage: restore compact to %d", s.compactMainRev)
300	}
301
302	// TODO: limit N to reduce max memory usage
303	keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
304	for i, key := range keys {
305		var kv storagepb.KeyValue
306		if err := kv.Unmarshal(vals[i]); err != nil {
307			log.Fatalf("storage: cannot unmarshal event: %v", err)
308		}
309
310		rev := bytesToRev(key[:revBytesLen])
311
312		// restore index
313		switch {
314		case isTombstone(key):
315			s.kvindex.Tombstone(kv.Key, rev)
316			if lease.LeaseID(kv.Lease) != lease.NoLease {
317				err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
318				if err != nil && err != lease.ErrLeaseNotFound {
319					log.Fatalf("storage: unexpected Detach error %v", err)
320				}
321			}
322		default:
323			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
324			if lease.LeaseID(kv.Lease) != lease.NoLease {
325				if s.le == nil {
326					panic("no lessor to attach lease")
327				}
328				err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
329				// We are walking through the kv history here. It is possible that we attached a key to
330				// the lease and the lease was revoked later.
331				// Thus attaching an old version of key to a none existing lease is possible here, and
332				// we should just ignore the error.
333				if err != nil && err != lease.ErrLeaseNotFound {
334					panic("unexpected Attach error")
335				}
336			}
337		}
338
339		// update revision
340		s.currentRev = rev
341	}
342
343	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
344	if len(scheduledCompactBytes) != 0 {
345		scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main
346		if scheduledCompact > s.compactMainRev {
347			log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
348			go s.Compact(scheduledCompact)
349		}
350	}
351
352	tx.Unlock()
353
354	return nil
355}
356
357func (s *store) Close() error {
358	close(s.stopc)
359	s.fifoSched.Stop()
360	return nil
361}
362
363func (a *store) Equal(b *store) bool {
364	if a.currentRev != b.currentRev {
365		return false
366	}
367	if a.compactMainRev != b.compactMainRev {
368		return false
369	}
370	return a.kvindex.Equal(b.kvindex)
371}
372
373// range is a keyword in Go, add Keys suffix.
374func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, curRev int64, err error) {
375	curRev = int64(s.currentRev.main)
376	if s.currentRev.sub > 0 {
377		curRev += 1
378	}
379
380	if rangeRev > curRev {
381		return nil, s.currentRev.main, ErrFutureRev
382	}
383	var rev int64
384	if rangeRev <= 0 {
385		rev = curRev
386	} else {
387		rev = rangeRev
388	}
389	if rev <= s.compactMainRev {
390		return nil, 0, ErrCompacted
391	}
392
393	_, revpairs := s.kvindex.Range(key, end, int64(rev))
394	if len(revpairs) == 0 {
395		return nil, curRev, nil
396	}
397
398	for _, revpair := range revpairs {
399		start, end := revBytesRange(revpair)
400
401		_, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0)
402		if len(vs) != 1 {
403			log.Fatalf("storage: range cannot find rev (%d,%d)", revpair.main, revpair.sub)
404		}
405
406		var kv storagepb.KeyValue
407		if err := kv.Unmarshal(vs[0]); err != nil {
408			log.Fatalf("storage: cannot unmarshal event: %v", err)
409		}
410		kvs = append(kvs, kv)
411		if limit > 0 && len(kvs) >= int(limit) {
412			break
413		}
414	}
415	return kvs, curRev, nil
416}
417
418func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
419	rev := s.currentRev.main + 1
420	c := rev
421	oldLease := lease.NoLease
422
423	// if the key exists before, use its previous created and
424	// get its previous leaseID
425	grev, created, ver, err := s.kvindex.Get(key, rev)
426	if err == nil {
427		c = created.main
428		ibytes := newRevBytes()
429		revToBytes(grev, ibytes)
430		_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
431		var kv storagepb.KeyValue
432		if err = kv.Unmarshal(vs[0]); err != nil {
433			log.Fatalf("storage: cannot unmarshal value: %v", err)
434		}
435		oldLease = lease.LeaseID(kv.Lease)
436	}
437
438	ibytes := newRevBytes()
439	revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes)
440
441	ver = ver + 1
442	kv := storagepb.KeyValue{
443		Key:            key,
444		Value:          value,
445		CreateRevision: c,
446		ModRevision:    rev,
447		Version:        ver,
448		Lease:          int64(leaseID),
449	}
450
451	d, err := kv.Marshal()
452	if err != nil {
453		log.Fatalf("storage: cannot marshal event: %v", err)
454	}
455
456	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
457	s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
458	s.changes = append(s.changes, kv)
459	s.currentRev.sub += 1
460
461	if oldLease != lease.NoLease {
462		if s.le == nil {
463			panic("no lessor to detach lease")
464		}
465
466		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
467		if err != nil {
468			panic("unexpected error from lease detach")
469		}
470	}
471
472	if leaseID != lease.NoLease {
473		if s.le == nil {
474			panic("no lessor to attach lease")
475		}
476
477		err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
478		if err != nil {
479			panic("unexpected error from lease Attach")
480		}
481	}
482}
483
484func (s *store) deleteRange(key, end []byte) int64 {
485	rrev := s.currentRev.main
486	if s.currentRev.sub > 0 {
487		rrev += 1
488	}
489	keys, revs := s.kvindex.Range(key, end, rrev)
490
491	if len(keys) == 0 {
492		return 0
493	}
494
495	for i, key := range keys {
496		s.delete(key, revs[i])
497	}
498	return int64(len(keys))
499}
500
501func (s *store) delete(key []byte, rev revision) {
502	mainrev := s.currentRev.main + 1
503
504	ibytes := newRevBytes()
505	revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
506	ibytes = appendMarkTombstone(ibytes)
507
508	kv := storagepb.KeyValue{
509		Key: key,
510	}
511
512	d, err := kv.Marshal()
513	if err != nil {
514		log.Fatalf("storage: cannot marshal event: %v", err)
515	}
516
517	s.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
518	err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub})
519	if err != nil {
520		log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
521	}
522	s.changes = append(s.changes, kv)
523	s.currentRev.sub += 1
524
525	ibytes = newRevBytes()
526	revToBytes(rev, ibytes)
527	_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
528
529	kv.Reset()
530	if err = kv.Unmarshal(vs[0]); err != nil {
531		log.Fatalf("storage: cannot unmarshal value: %v", err)
532	}
533
534	if lease.LeaseID(kv.Lease) != lease.NoLease {
535		err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
536		if err != nil {
537			log.Fatalf("storage: cannot detach %v", err)
538		}
539	}
540}
541
542func (s *store) getChanges() []storagepb.KeyValue {
543	changes := s.changes
544	s.changes = make([]storagepb.KeyValue, 0, 128)
545	return changes
546}
547
548// appendMarkTombstone appends tombstone mark to normal revision bytes.
549func appendMarkTombstone(b []byte) []byte {
550	if len(b) != revBytesLen {
551		log.Panicf("cannot append mark to non normal revision bytes")
552	}
553	return append(b, markTombstone)
554}
555
556// isTombstone checks whether the revision bytes is a tombstone.
557func isTombstone(b []byte) bool {
558	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
559}
560
561// revBytesRange returns the range of revision bytes at
562// the given revision.
563func revBytesRange(rev revision) (start, end []byte) {
564	start = newRevBytes()
565	revToBytes(rev, start)
566
567	end = newRevBytes()
568	endRev := revision{main: rev.main, sub: rev.sub + 1}
569	revToBytes(endRev, end)
570
571	return start, end
572}
573