1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mvcc 16 17import ( 18 "context" 19 "encoding/binary" 20 "errors" 21 "fmt" 22 "hash/crc32" 23 "math" 24 "sync" 25 "sync/atomic" 26 "time" 27 28 "go.etcd.io/etcd/lease" 29 "go.etcd.io/etcd/mvcc/backend" 30 "go.etcd.io/etcd/mvcc/mvccpb" 31 "go.etcd.io/etcd/pkg/schedule" 32 "go.etcd.io/etcd/pkg/traceutil" 33 34 "github.com/coreos/pkg/capnslog" 35 "go.uber.org/zap" 36) 37 38var ( 39 keyBucketName = []byte("key") 40 metaBucketName = []byte("meta") 41 42 consistentIndexKeyName = []byte("consistent_index") 43 scheduledCompactKeyName = []byte("scheduledCompactRev") 44 finishedCompactKeyName = []byte("finishedCompactRev") 45 46 ErrCompacted = errors.New("mvcc: required revision has been compacted") 47 ErrFutureRev = errors.New("mvcc: required revision is a future revision") 48 ErrCanceled = errors.New("mvcc: watcher is canceled") 49 ErrClosed = errors.New("mvcc: closed") 50 51 plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc") 52) 53 54const ( 55 // markedRevBytesLen is the byte length of marked revision. 56 // The first `revBytesLen` bytes represents a normal revision. The last 57 // one byte is the mark. 58 markedRevBytesLen = revBytesLen + 1 59 markBytePosition = markedRevBytesLen - 1 60 markTombstone byte = 't' 61) 62 63var restoreChunkKeys = 10000 // non-const for testing 64var defaultCompactBatchLimit = 1000 65 66// ConsistentIndexGetter is an interface that wraps the Get method. 67// Consistent index is the offset of an entry in a consistent replicated log. 68type ConsistentIndexGetter interface { 69 // ConsistentIndex returns the consistent index of current executing entry. 70 ConsistentIndex() uint64 71} 72 73type StoreConfig struct { 74 CompactionBatchLimit int 75} 76 77type store struct { 78 ReadView 79 WriteView 80 81 // consistentIndex caches the "consistent_index" key's value. Accessed 82 // through atomics so must be 64-bit aligned. 83 consistentIndex uint64 84 85 cfg StoreConfig 86 87 // mu read locks for txns and write locks for non-txn store changes. 88 mu sync.RWMutex 89 90 ig ConsistentIndexGetter 91 92 b backend.Backend 93 kvindex index 94 95 le lease.Lessor 96 97 // revMuLock protects currentRev and compactMainRev. 98 // Locked at end of write txn and released after write txn unlock lock. 99 // Locked before locking read txn and released after locking. 100 revMu sync.RWMutex 101 // currentRev is the revision of the last completed transaction. 102 currentRev int64 103 // compactMainRev is the main revision of the last compaction. 104 compactMainRev int64 105 106 // bytesBuf8 is a byte slice of length 8 107 // to avoid a repetitive allocation in saveIndex. 108 bytesBuf8 []byte 109 110 fifoSched schedule.Scheduler 111 112 stopc chan struct{} 113 114 lg *zap.Logger 115} 116 117// NewStore returns a new store. It is useful to create a store inside 118// mvcc pkg. It should only be used for testing externally. 119func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { 120 if cfg.CompactionBatchLimit == 0 { 121 cfg.CompactionBatchLimit = defaultCompactBatchLimit 122 } 123 s := &store{ 124 cfg: cfg, 125 b: b, 126 ig: ig, 127 kvindex: newTreeIndex(lg), 128 129 le: le, 130 131 currentRev: 1, 132 compactMainRev: -1, 133 134 bytesBuf8: make([]byte, 8), 135 fifoSched: schedule.NewFIFOScheduler(), 136 137 stopc: make(chan struct{}), 138 139 lg: lg, 140 } 141 s.ReadView = &readView{s} 142 s.WriteView = &writeView{s} 143 if s.le != nil { 144 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) 145 } 146 147 tx := s.b.BatchTx() 148 tx.Lock() 149 tx.UnsafeCreateBucket(keyBucketName) 150 tx.UnsafeCreateBucket(metaBucketName) 151 tx.Unlock() 152 s.b.ForceCommit() 153 154 s.mu.Lock() 155 defer s.mu.Unlock() 156 if err := s.restore(); err != nil { 157 // TODO: return the error instead of panic here? 158 panic("failed to recover store from backend") 159 } 160 161 return s 162} 163 164func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { 165 if ctx == nil || ctx.Err() != nil { 166 select { 167 case <-s.stopc: 168 default: 169 // fix deadlock in mvcc,for more information, please refer to pr 11817. 170 // s.stopc is only updated in restore operation, which is called by apply 171 // snapshot call, compaction and apply snapshot requests are serialized by 172 // raft, and do not happen at the same time. 173 s.mu.Lock() 174 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 175 s.fifoSched.Schedule(f) 176 s.mu.Unlock() 177 } 178 return 179 } 180 close(ch) 181} 182 183func (s *store) Hash() (hash uint32, revision int64, err error) { 184 start := time.Now() 185 186 s.b.ForceCommit() 187 h, err := s.b.Hash(DefaultIgnores) 188 189 hashSec.Observe(time.Since(start).Seconds()) 190 return h, s.currentRev, err 191} 192 193func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { 194 start := time.Now() 195 196 s.mu.RLock() 197 s.revMu.RLock() 198 compactRev, currentRev = s.compactMainRev, s.currentRev 199 s.revMu.RUnlock() 200 201 if rev > 0 && rev <= compactRev { 202 s.mu.RUnlock() 203 return 0, 0, compactRev, ErrCompacted 204 } else if rev > 0 && rev > currentRev { 205 s.mu.RUnlock() 206 return 0, currentRev, 0, ErrFutureRev 207 } 208 209 if rev == 0 { 210 rev = currentRev 211 } 212 keep := s.kvindex.Keep(rev) 213 214 tx := s.b.ReadTx() 215 tx.RLock() 216 defer tx.RUnlock() 217 s.mu.RUnlock() 218 219 upper := revision{main: rev + 1} 220 lower := revision{main: compactRev + 1} 221 h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 222 223 h.Write(keyBucketName) 224 err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { 225 kr := bytesToRev(k) 226 if !upper.GreaterThan(kr) { 227 return nil 228 } 229 // skip revisions that are scheduled for deletion 230 // due to compacting; don't skip if there isn't one. 231 if lower.GreaterThan(kr) && len(keep) > 0 { 232 if _, ok := keep[kr]; !ok { 233 return nil 234 } 235 } 236 h.Write(k) 237 h.Write(v) 238 return nil 239 }) 240 hash = h.Sum32() 241 242 hashRevSec.Observe(time.Since(start).Seconds()) 243 return hash, currentRev, compactRev, err 244} 245 246func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { 247 s.revMu.Lock() 248 if rev <= s.compactMainRev { 249 ch := make(chan struct{}) 250 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 251 s.fifoSched.Schedule(f) 252 s.revMu.Unlock() 253 return ch, ErrCompacted 254 } 255 if rev > s.currentRev { 256 s.revMu.Unlock() 257 return nil, ErrFutureRev 258 } 259 260 s.compactMainRev = rev 261 262 rbytes := newRevBytes() 263 revToBytes(revision{main: rev}, rbytes) 264 265 tx := s.b.BatchTx() 266 tx.Lock() 267 tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) 268 tx.Unlock() 269 // ensure that desired compaction is persisted 270 s.b.ForceCommit() 271 272 s.revMu.Unlock() 273 274 return nil, nil 275} 276 277func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { 278 ch := make(chan struct{}) 279 var j = func(ctx context.Context) { 280 if ctx.Err() != nil { 281 s.compactBarrier(ctx, ch) 282 return 283 } 284 start := time.Now() 285 keep := s.kvindex.Compact(rev) 286 indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) 287 if !s.scheduleCompaction(rev, keep) { 288 s.compactBarrier(nil, ch) 289 return 290 } 291 close(ch) 292 } 293 294 s.fifoSched.Schedule(j) 295 trace.Step("schedule compaction") 296 return ch, nil 297} 298 299func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { 300 ch, err := s.updateCompactRev(rev) 301 if nil != err { 302 return ch, err 303 } 304 305 return s.compact(traceutil.TODO(), rev) 306} 307 308func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { 309 s.mu.Lock() 310 311 ch, err := s.updateCompactRev(rev) 312 trace.Step("check and update compact revision") 313 if err != nil { 314 s.mu.Unlock() 315 return ch, err 316 } 317 s.mu.Unlock() 318 319 return s.compact(trace, rev) 320} 321 322// DefaultIgnores is a map of keys to ignore in hash checking. 323var DefaultIgnores map[backend.IgnoreKey]struct{} 324 325func init() { 326 DefaultIgnores = map[backend.IgnoreKey]struct{}{ 327 // consistent index might be changed due to v2 internal sync, which 328 // is not controllable by the user. 329 {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {}, 330 } 331} 332 333func (s *store) Commit() { 334 s.mu.Lock() 335 defer s.mu.Unlock() 336 337 tx := s.b.BatchTx() 338 tx.Lock() 339 s.saveIndex(tx) 340 tx.Unlock() 341 s.b.ForceCommit() 342} 343 344func (s *store) Restore(b backend.Backend) error { 345 s.mu.Lock() 346 defer s.mu.Unlock() 347 348 close(s.stopc) 349 s.fifoSched.Stop() 350 351 atomic.StoreUint64(&s.consistentIndex, 0) 352 s.b = b 353 s.kvindex = newTreeIndex(s.lg) 354 s.currentRev = 1 355 s.compactMainRev = -1 356 s.fifoSched = schedule.NewFIFOScheduler() 357 s.stopc = make(chan struct{}) 358 359 return s.restore() 360} 361 362func (s *store) restore() error { 363 s.setupMetricsReporter() 364 365 min, max := newRevBytes(), newRevBytes() 366 revToBytes(revision{main: 1}, min) 367 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) 368 369 keyToLease := make(map[string]lease.LeaseID) 370 371 // restore index 372 tx := s.b.BatchTx() 373 tx.Lock() 374 375 _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) 376 if len(finishedCompactBytes) != 0 { 377 s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main 378 379 if s.lg != nil { 380 s.lg.Info( 381 "restored last compact revision", 382 zap.String("meta-bucket-name", string(metaBucketName)), 383 zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), 384 zap.Int64("restored-compact-revision", s.compactMainRev), 385 ) 386 } else { 387 plog.Printf("restore compact to %d", s.compactMainRev) 388 } 389 } 390 _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) 391 scheduledCompact := int64(0) 392 if len(scheduledCompactBytes) != 0 { 393 scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main 394 } 395 396 // index keys concurrently as they're loaded in from tx 397 keysGauge.Set(0) 398 rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) 399 for { 400 keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) 401 if len(keys) == 0 { 402 break 403 } 404 // rkvc blocks if the total pending keys exceeds the restore 405 // chunk size to keep keys from consuming too much memory. 406 restoreChunk(s.lg, rkvc, keys, vals, keyToLease) 407 if len(keys) < restoreChunkKeys { 408 // partial set implies final set 409 break 410 } 411 // next set begins after where this one ended 412 newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) 413 newMin.sub++ 414 revToBytes(newMin, min) 415 } 416 close(rkvc) 417 s.currentRev = <-revc 418 419 // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. 420 // the correct revision should be set to compaction revision in the case, not the largest revision 421 // we have seen. 422 if s.currentRev < s.compactMainRev { 423 s.currentRev = s.compactMainRev 424 } 425 if scheduledCompact <= s.compactMainRev { 426 scheduledCompact = 0 427 } 428 429 for key, lid := range keyToLease { 430 if s.le == nil { 431 panic("no lessor to attach lease") 432 } 433 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) 434 if err != nil { 435 if s.lg != nil { 436 s.lg.Warn( 437 "failed to attach a lease", 438 zap.String("lease-id", fmt.Sprintf("%016x", lid)), 439 zap.Error(err), 440 ) 441 } else { 442 plog.Errorf("unexpected Attach error: %v", err) 443 } 444 } 445 } 446 447 tx.Unlock() 448 449 if scheduledCompact != 0 { 450 s.compactLockfree(scheduledCompact) 451 452 if s.lg != nil { 453 s.lg.Info( 454 "resume scheduled compaction", 455 zap.String("meta-bucket-name", string(metaBucketName)), 456 zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), 457 zap.Int64("scheduled-compact-revision", scheduledCompact), 458 ) 459 } else { 460 plog.Printf("resume scheduled compaction at %d", scheduledCompact) 461 } 462 } 463 464 return nil 465} 466 467type revKeyValue struct { 468 key []byte 469 kv mvccpb.KeyValue 470 kstr string 471} 472 473func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { 474 rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) 475 go func() { 476 currentRev := int64(1) 477 defer func() { revc <- currentRev }() 478 // restore the tree index from streaming the unordered index. 479 kiCache := make(map[string]*keyIndex, restoreChunkKeys) 480 for rkv := range rkvc { 481 ki, ok := kiCache[rkv.kstr] 482 // purge kiCache if many keys but still missing in the cache 483 if !ok && len(kiCache) >= restoreChunkKeys { 484 i := 10 485 for k := range kiCache { 486 delete(kiCache, k) 487 if i--; i == 0 { 488 break 489 } 490 } 491 } 492 // cache miss, fetch from tree index if there 493 if !ok { 494 ki = &keyIndex{key: rkv.kv.Key} 495 if idxKey := idx.KeyIndex(ki); idxKey != nil { 496 kiCache[rkv.kstr], ki = idxKey, idxKey 497 ok = true 498 } 499 } 500 rev := bytesToRev(rkv.key) 501 currentRev = rev.main 502 if ok { 503 if isTombstone(rkv.key) { 504 ki.tombstone(lg, rev.main, rev.sub) 505 continue 506 } 507 ki.put(lg, rev.main, rev.sub) 508 } else if !isTombstone(rkv.key) { 509 ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) 510 idx.Insert(ki) 511 kiCache[rkv.kstr] = ki 512 } 513 } 514 }() 515 return rkvc, revc 516} 517 518func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { 519 for i, key := range keys { 520 rkv := revKeyValue{key: key} 521 if err := rkv.kv.Unmarshal(vals[i]); err != nil { 522 if lg != nil { 523 lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) 524 } else { 525 plog.Fatalf("cannot unmarshal event: %v", err) 526 } 527 } 528 rkv.kstr = string(rkv.kv.Key) 529 if isTombstone(key) { 530 delete(keyToLease, rkv.kstr) 531 } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { 532 keyToLease[rkv.kstr] = lid 533 } else { 534 delete(keyToLease, rkv.kstr) 535 } 536 kvc <- rkv 537 } 538} 539 540func (s *store) Close() error { 541 close(s.stopc) 542 s.fifoSched.Stop() 543 return nil 544} 545 546func (s *store) saveIndex(tx backend.BatchTx) { 547 if s.ig == nil { 548 return 549 } 550 bs := s.bytesBuf8 551 ci := s.ig.ConsistentIndex() 552 binary.BigEndian.PutUint64(bs, ci) 553 // put the index into the underlying backend 554 // tx has been locked in TxnBegin, so there is no need to lock it again 555 tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) 556 atomic.StoreUint64(&s.consistentIndex, ci) 557} 558 559func (s *store) ConsistentIndex() uint64 { 560 if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 { 561 return ci 562 } 563 tx := s.b.BatchTx() 564 tx.Lock() 565 defer tx.Unlock() 566 _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) 567 if len(vs) == 0 { 568 return 0 569 } 570 v := binary.BigEndian.Uint64(vs[0]) 571 atomic.StoreUint64(&s.consistentIndex, v) 572 return v 573} 574 575func (s *store) setupMetricsReporter() { 576 b := s.b 577 reportDbTotalSizeInBytesMu.Lock() 578 reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } 579 reportDbTotalSizeInBytesMu.Unlock() 580 reportDbTotalSizeInBytesDebugMu.Lock() 581 reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } 582 reportDbTotalSizeInBytesDebugMu.Unlock() 583 reportDbTotalSizeInUseInBytesMu.Lock() 584 reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } 585 reportDbTotalSizeInUseInBytesMu.Unlock() 586 reportDbOpenReadTxNMu.Lock() 587 reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } 588 reportDbOpenReadTxNMu.Unlock() 589 reportCurrentRevMu.Lock() 590 reportCurrentRev = func() float64 { 591 s.revMu.RLock() 592 defer s.revMu.RUnlock() 593 return float64(s.currentRev) 594 } 595 reportCurrentRevMu.Unlock() 596 reportCompactRevMu.Lock() 597 reportCompactRev = func() float64 { 598 s.revMu.RLock() 599 defer s.revMu.RUnlock() 600 return float64(s.compactMainRev) 601 } 602 reportCompactRevMu.Unlock() 603} 604 605// appendMarkTombstone appends tombstone mark to normal revision bytes. 606func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { 607 if len(b) != revBytesLen { 608 if lg != nil { 609 lg.Panic( 610 "cannot append tombstone mark to non-normal revision bytes", 611 zap.Int("expected-revision-bytes-size", revBytesLen), 612 zap.Int("given-revision-bytes-size", len(b)), 613 ) 614 } else { 615 plog.Panicf("cannot append mark to non normal revision bytes") 616 } 617 } 618 return append(b, markTombstone) 619} 620 621// isTombstone checks whether the revision bytes is a tombstone. 622func isTombstone(b []byte) bool { 623 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone 624} 625