1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mvcc 16 17import ( 18 "encoding/binary" 19 "errors" 20 "math" 21 "math/rand" 22 "sync" 23 "time" 24 25 "github.com/coreos/etcd/lease" 26 "github.com/coreos/etcd/mvcc/backend" 27 "github.com/coreos/etcd/mvcc/mvccpb" 28 "github.com/coreos/etcd/pkg/schedule" 29 "github.com/coreos/pkg/capnslog" 30 "golang.org/x/net/context" 31) 32 33var ( 34 keyBucketName = []byte("key") 35 metaBucketName = []byte("meta") 36 37 // markedRevBytesLen is the byte length of marked revision. 38 // The first `revBytesLen` bytes represents a normal revision. The last 39 // one byte is the mark. 40 markedRevBytesLen = revBytesLen + 1 41 markBytePosition = markedRevBytesLen - 1 42 markTombstone byte = 't' 43 44 consistentIndexKeyName = []byte("consistent_index") 45 scheduledCompactKeyName = []byte("scheduledCompactRev") 46 finishedCompactKeyName = []byte("finishedCompactRev") 47 48 ErrTxnIDMismatch = errors.New("mvcc: txn id mismatch") 49 ErrCompacted = errors.New("mvcc: required revision has been compacted") 50 ErrFutureRev = errors.New("mvcc: required revision is a future revision") 51 ErrCanceled = errors.New("mvcc: watcher is canceled") 52 53 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc") 54) 55 56// ConsistentIndexGetter is an interface that wraps the Get method. 57// Consistent index is the offset of an entry in a consistent replicated log. 58type ConsistentIndexGetter interface { 59 // ConsistentIndex returns the consistent index of current executing entry. 60 ConsistentIndex() uint64 61} 62 63type store struct { 64 mu sync.Mutex // guards the following 65 66 ig ConsistentIndexGetter 67 68 b backend.Backend 69 kvindex index 70 71 le lease.Lessor 72 73 currentRev revision 74 // the main revision of the last compaction 75 compactMainRev int64 76 77 tx backend.BatchTx 78 txnID int64 // tracks the current txnID to verify txn operations 79 txnModify bool 80 81 // bytesBuf8 is a byte slice of length 8 82 // to avoid a repetitive allocation in saveIndex. 83 bytesBuf8 []byte 84 85 changes []mvccpb.KeyValue 86 fifoSched schedule.Scheduler 87 88 stopc chan struct{} 89} 90 91// NewStore returns a new store. It is useful to create a store inside 92// mvcc pkg. It should only be used for testing externally. 93func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store { 94 s := &store{ 95 b: b, 96 ig: ig, 97 kvindex: newTreeIndex(), 98 99 le: le, 100 101 currentRev: revision{main: 1}, 102 compactMainRev: -1, 103 104 bytesBuf8: make([]byte, 8, 8), 105 fifoSched: schedule.NewFIFOScheduler(), 106 107 stopc: make(chan struct{}), 108 } 109 110 if s.le != nil { 111 s.le.SetRangeDeleter(s) 112 } 113 114 tx := s.b.BatchTx() 115 tx.Lock() 116 tx.UnsafeCreateBucket(keyBucketName) 117 tx.UnsafeCreateBucket(metaBucketName) 118 tx.Unlock() 119 s.b.ForceCommit() 120 121 if err := s.restore(); err != nil { 122 // TODO: return the error instead of panic here? 123 panic("failed to recover store from backend") 124 } 125 126 return s 127} 128 129func (s *store) Rev() int64 { 130 s.mu.Lock() 131 defer s.mu.Unlock() 132 133 return s.currentRev.main 134} 135 136func (s *store) FirstRev() int64 { 137 s.mu.Lock() 138 defer s.mu.Unlock() 139 140 return s.compactMainRev 141} 142 143func (s *store) Put(key, value []byte, lease lease.LeaseID) int64 { 144 id := s.TxnBegin() 145 s.put(key, value, lease) 146 s.txnEnd(id) 147 148 putCounter.Inc() 149 150 return int64(s.currentRev.main) 151} 152 153func (s *store) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { 154 id := s.TxnBegin() 155 kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count) 156 s.txnEnd(id) 157 158 rangeCounter.Inc() 159 160 r = &RangeResult{ 161 KVs: kvs, 162 Count: count, 163 Rev: rev, 164 } 165 166 return r, err 167} 168 169func (s *store) DeleteRange(key, end []byte) (n, rev int64) { 170 id := s.TxnBegin() 171 n = s.deleteRange(key, end) 172 s.txnEnd(id) 173 174 deleteCounter.Inc() 175 176 return n, int64(s.currentRev.main) 177} 178 179func (s *store) TxnBegin() int64 { 180 s.mu.Lock() 181 s.currentRev.sub = 0 182 s.tx = s.b.BatchTx() 183 s.tx.Lock() 184 185 s.txnID = rand.Int63() 186 return s.txnID 187} 188 189func (s *store) TxnEnd(txnID int64) error { 190 err := s.txnEnd(txnID) 191 if err != nil { 192 return err 193 } 194 195 txnCounter.Inc() 196 return nil 197} 198 199// txnEnd is used for unlocking an internal txn. It does 200// not increase the txnCounter. 201func (s *store) txnEnd(txnID int64) error { 202 if txnID != s.txnID { 203 return ErrTxnIDMismatch 204 } 205 206 // only update index if the txn modifies the mvcc state. 207 // read only txn might execute with one write txn concurrently, 208 // it should not write its index to mvcc. 209 if s.txnModify { 210 s.saveIndex() 211 } 212 s.txnModify = false 213 214 s.tx.Unlock() 215 if s.currentRev.sub != 0 { 216 s.currentRev.main += 1 217 } 218 s.currentRev.sub = 0 219 220 s.mu.Unlock() 221 return nil 222} 223 224func (s *store) TxnRange(txnID int64, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { 225 if txnID != s.txnID { 226 return nil, ErrTxnIDMismatch 227 } 228 229 kvs, count, rev, err := s.rangeKeys(key, end, ro.Limit, ro.Rev, ro.Count) 230 231 r = &RangeResult{ 232 KVs: kvs, 233 Count: count, 234 Rev: rev, 235 } 236 return r, err 237} 238 239func (s *store) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { 240 if txnID != s.txnID { 241 return 0, ErrTxnIDMismatch 242 } 243 244 s.put(key, value, lease) 245 return int64(s.currentRev.main + 1), nil 246} 247 248func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { 249 if txnID != s.txnID { 250 return 0, 0, ErrTxnIDMismatch 251 } 252 253 n = s.deleteRange(key, end) 254 if n != 0 || s.currentRev.sub != 0 { 255 rev = int64(s.currentRev.main + 1) 256 } else { 257 rev = int64(s.currentRev.main) 258 } 259 return n, rev, nil 260} 261 262func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { 263 if ctx == nil || ctx.Err() != nil { 264 s.mu.Lock() 265 select { 266 case <-s.stopc: 267 default: 268 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 269 s.fifoSched.Schedule(f) 270 } 271 s.mu.Unlock() 272 return 273 } 274 close(ch) 275} 276 277func (s *store) Compact(rev int64) (<-chan struct{}, error) { 278 s.mu.Lock() 279 defer s.mu.Unlock() 280 if rev <= s.compactMainRev { 281 ch := make(chan struct{}) 282 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 283 s.fifoSched.Schedule(f) 284 return ch, ErrCompacted 285 } 286 if rev > s.currentRev.main { 287 return nil, ErrFutureRev 288 } 289 290 start := time.Now() 291 292 s.compactMainRev = rev 293 294 rbytes := newRevBytes() 295 revToBytes(revision{main: rev}, rbytes) 296 297 tx := s.b.BatchTx() 298 tx.Lock() 299 tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) 300 tx.Unlock() 301 // ensure that desired compaction is persisted 302 s.b.ForceCommit() 303 304 keep := s.kvindex.Compact(rev) 305 ch := make(chan struct{}) 306 var j = func(ctx context.Context) { 307 if ctx.Err() != nil { 308 s.compactBarrier(ctx, ch) 309 return 310 } 311 if !s.scheduleCompaction(rev, keep) { 312 s.compactBarrier(nil, ch) 313 return 314 } 315 close(ch) 316 } 317 318 s.fifoSched.Schedule(j) 319 320 indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond)) 321 return ch, nil 322} 323 324// DefaultIgnores is a map of keys to ignore in hash checking. 325var DefaultIgnores map[backend.IgnoreKey]struct{} 326 327func init() { 328 DefaultIgnores = map[backend.IgnoreKey]struct{}{ 329 // consistent index might be changed due to v2 internal sync, which 330 // is not controllable by the user. 331 {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {}, 332 } 333} 334 335func (s *store) Hash() (uint32, int64, error) { 336 s.mu.Lock() 337 defer s.mu.Unlock() 338 339 start := time.Now() 340 341 s.b.ForceCommit() 342 343 h, err := s.b.Hash(DefaultIgnores) 344 345 hashDurations.Observe(time.Since(start).Seconds()) 346 rev := s.currentRev.main 347 return h, rev, err 348} 349 350func (s *store) Commit() { 351 s.mu.Lock() 352 defer s.mu.Unlock() 353 354 s.tx = s.b.BatchTx() 355 s.tx.Lock() 356 s.saveIndex() 357 s.tx.Unlock() 358 s.b.ForceCommit() 359} 360 361func (s *store) Restore(b backend.Backend) error { 362 s.mu.Lock() 363 defer s.mu.Unlock() 364 365 close(s.stopc) 366 s.fifoSched.Stop() 367 368 s.b = b 369 s.kvindex = newTreeIndex() 370 s.currentRev = revision{main: 1} 371 s.compactMainRev = -1 372 s.tx = b.BatchTx() 373 s.txnID = -1 374 s.fifoSched = schedule.NewFIFOScheduler() 375 s.stopc = make(chan struct{}) 376 377 return s.restore() 378} 379 380func (s *store) restore() error { 381 reportDbTotalSizeInBytesMu.Lock() 382 reportDbTotalSizeInBytes = func() float64 { return float64(s.b.Size()) } 383 reportDbTotalSizeInBytesMu.Unlock() 384 reportDbTotalSizeInUseInBytesMu.Lock() 385 reportDbTotalSizeInUseInBytes = func() float64 { return float64(s.b.SizeInUse()) } 386 reportDbTotalSizeInUseInBytesMu.Unlock() 387 388 min, max := newRevBytes(), newRevBytes() 389 revToBytes(revision{main: 1}, min) 390 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) 391 392 keyToLease := make(map[string]lease.LeaseID) 393 394 // use an unordered map to hold the temp index data to speed up 395 // the initial key index recovery. 396 // we will convert this unordered map into the tree index later. 397 unordered := make(map[string]*keyIndex, 100000) 398 399 // restore index 400 tx := s.b.BatchTx() 401 tx.Lock() 402 _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) 403 if len(finishedCompactBytes) != 0 { 404 s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main 405 plog.Printf("restore compact to %d", s.compactMainRev) 406 } 407 408 // TODO: limit N to reduce max memory usage 409 keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) 410 for i, key := range keys { 411 var kv mvccpb.KeyValue 412 if err := kv.Unmarshal(vals[i]); err != nil { 413 plog.Fatalf("cannot unmarshal event: %v", err) 414 } 415 416 rev := bytesToRev(key[:revBytesLen]) 417 418 // restore index 419 switch { 420 case isTombstone(key): 421 if ki, ok := unordered[string(kv.Key)]; ok { 422 ki.tombstone(rev.main, rev.sub) 423 } 424 delete(keyToLease, string(kv.Key)) 425 426 default: 427 ki, ok := unordered[string(kv.Key)] 428 if ok { 429 ki.put(rev.main, rev.sub) 430 } else { 431 ki = &keyIndex{key: kv.Key} 432 ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version) 433 unordered[string(kv.Key)] = ki 434 } 435 436 if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { 437 keyToLease[string(kv.Key)] = lid 438 } else { 439 delete(keyToLease, string(kv.Key)) 440 } 441 } 442 443 // update revision 444 s.currentRev = rev 445 } 446 447 // restore the tree index from the unordered index. 448 for _, v := range unordered { 449 s.kvindex.Insert(v) 450 } 451 452 // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. 453 // the correct revision should be set to compaction revision in the case, not the largest revision 454 // we have seen. 455 if s.currentRev.main < s.compactMainRev { 456 s.currentRev.main = s.compactMainRev 457 } 458 459 for key, lid := range keyToLease { 460 if s.le == nil { 461 panic("no lessor to attach lease") 462 } 463 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) 464 if err != nil { 465 plog.Errorf("unexpected Attach error: %v", err) 466 } 467 } 468 469 _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) 470 scheduledCompact := int64(0) 471 if len(scheduledCompactBytes) != 0 { 472 scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main 473 if scheduledCompact <= s.compactMainRev { 474 scheduledCompact = 0 475 } 476 } 477 478 tx.Unlock() 479 480 if scheduledCompact != 0 { 481 s.Compact(scheduledCompact) 482 plog.Printf("resume scheduled compaction at %d", scheduledCompact) 483 } 484 485 return nil 486} 487 488func (s *store) Close() error { 489 close(s.stopc) 490 s.fifoSched.Stop() 491 return nil 492} 493 494func (a *store) Equal(b *store) bool { 495 if a.currentRev != b.currentRev { 496 return false 497 } 498 if a.compactMainRev != b.compactMainRev { 499 return false 500 } 501 return a.kvindex.Equal(b.kvindex) 502} 503 504// range is a keyword in Go, add Keys suffix. 505func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64, countOnly bool) (kvs []mvccpb.KeyValue, count int, curRev int64, err error) { 506 curRev = int64(s.currentRev.main) 507 if s.currentRev.sub > 0 { 508 curRev += 1 509 } 510 511 if rangeRev > curRev { 512 return nil, -1, s.currentRev.main, ErrFutureRev 513 } 514 var rev int64 515 if rangeRev <= 0 { 516 rev = curRev 517 } else { 518 rev = rangeRev 519 } 520 if rev < s.compactMainRev { 521 return nil, -1, 0, ErrCompacted 522 } 523 524 _, revpairs := s.kvindex.Range(key, end, int64(rev)) 525 if len(revpairs) == 0 { 526 return nil, 0, curRev, nil 527 } 528 if countOnly { 529 return nil, len(revpairs), curRev, nil 530 } 531 532 for _, revpair := range revpairs { 533 start, end := revBytesRange(revpair) 534 535 _, vs := s.tx.UnsafeRange(keyBucketName, start, end, 0) 536 if len(vs) != 1 { 537 plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub) 538 } 539 540 var kv mvccpb.KeyValue 541 if err := kv.Unmarshal(vs[0]); err != nil { 542 plog.Fatalf("cannot unmarshal event: %v", err) 543 } 544 kvs = append(kvs, kv) 545 if limit > 0 && len(kvs) >= int(limit) { 546 break 547 } 548 } 549 return kvs, len(revpairs), curRev, nil 550} 551 552func (s *store) put(key, value []byte, leaseID lease.LeaseID) { 553 s.txnModify = true 554 555 rev := s.currentRev.main + 1 556 c := rev 557 oldLease := lease.NoLease 558 559 // if the key exists before, use its previous created and 560 // get its previous leaseID 561 _, created, ver, err := s.kvindex.Get(key, rev) 562 if err == nil { 563 c = created.main 564 oldLease = s.le.GetLease(lease.LeaseItem{Key: string(key)}) 565 } 566 567 ibytes := newRevBytes() 568 revToBytes(revision{main: rev, sub: s.currentRev.sub}, ibytes) 569 570 ver = ver + 1 571 kv := mvccpb.KeyValue{ 572 Key: key, 573 Value: value, 574 CreateRevision: c, 575 ModRevision: rev, 576 Version: ver, 577 Lease: int64(leaseID), 578 } 579 580 d, err := kv.Marshal() 581 if err != nil { 582 plog.Fatalf("cannot marshal event: %v", err) 583 } 584 585 s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) 586 s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) 587 s.changes = append(s.changes, kv) 588 s.currentRev.sub += 1 589 590 if oldLease != lease.NoLease { 591 if s.le == nil { 592 panic("no lessor to detach lease") 593 } 594 595 err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) 596 if err != nil { 597 plog.Errorf("unexpected error from lease detach: %v", err) 598 } 599 } 600 601 if leaseID != lease.NoLease { 602 if s.le == nil { 603 panic("no lessor to attach lease") 604 } 605 606 err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) 607 if err != nil { 608 panic("unexpected error from lease Attach") 609 } 610 } 611} 612 613func (s *store) deleteRange(key, end []byte) int64 { 614 s.txnModify = true 615 616 rrev := s.currentRev.main 617 if s.currentRev.sub > 0 { 618 rrev += 1 619 } 620 keys, revs := s.kvindex.Range(key, end, rrev) 621 622 if len(keys) == 0 { 623 return 0 624 } 625 626 for i, key := range keys { 627 s.delete(key, revs[i]) 628 } 629 return int64(len(keys)) 630} 631 632func (s *store) delete(key []byte, rev revision) { 633 mainrev := s.currentRev.main + 1 634 635 ibytes := newRevBytes() 636 revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) 637 ibytes = appendMarkTombstone(ibytes) 638 639 kv := mvccpb.KeyValue{ 640 Key: key, 641 } 642 643 d, err := kv.Marshal() 644 if err != nil { 645 plog.Fatalf("cannot marshal event: %v", err) 646 } 647 648 s.tx.UnsafeSeqPut(keyBucketName, ibytes, d) 649 err = s.kvindex.Tombstone(key, revision{main: mainrev, sub: s.currentRev.sub}) 650 if err != nil { 651 plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) 652 } 653 s.changes = append(s.changes, kv) 654 s.currentRev.sub += 1 655 656 item := lease.LeaseItem{Key: string(key)} 657 leaseID := s.le.GetLease(item) 658 659 if leaseID != lease.NoLease { 660 err = s.le.Detach(leaseID, []lease.LeaseItem{item}) 661 if err != nil { 662 plog.Errorf("cannot detach %v", err) 663 } 664 } 665} 666 667func (s *store) getChanges() []mvccpb.KeyValue { 668 changes := s.changes 669 s.changes = make([]mvccpb.KeyValue, 0, 4) 670 return changes 671} 672 673func (s *store) saveIndex() { 674 if s.ig == nil { 675 return 676 } 677 tx := s.tx 678 bs := s.bytesBuf8 679 binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex()) 680 // put the index into the underlying backend 681 // tx has been locked in TxnBegin, so there is no need to lock it again 682 tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) 683} 684 685func (s *store) ConsistentIndex() uint64 { 686 // TODO: cache index in a uint64 field? 687 tx := s.b.BatchTx() 688 tx.Lock() 689 defer tx.Unlock() 690 _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) 691 if len(vs) == 0 { 692 return 0 693 } 694 return binary.BigEndian.Uint64(vs[0]) 695} 696 697// appendMarkTombstone appends tombstone mark to normal revision bytes. 698func appendMarkTombstone(b []byte) []byte { 699 if len(b) != revBytesLen { 700 plog.Panicf("cannot append mark to non normal revision bytes") 701 } 702 return append(b, markTombstone) 703} 704 705// isTombstone checks whether the revision bytes is a tombstone. 706func isTombstone(b []byte) bool { 707 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone 708} 709 710// revBytesRange returns the range of revision bytes at 711// the given revision. 712func revBytesRange(rev revision) (start, end []byte) { 713 start = newRevBytes() 714 revToBytes(rev, start) 715 716 end = newRevBytes() 717 endRev := revision{main: rev.main, sub: rev.sub + 1} 718 revToBytes(endRev, end) 719 720 return start, end 721} 722