1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mvcc 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "hash/crc32" 22 "math" 23 "sync" 24 "time" 25 26 "go.etcd.io/etcd/api/v3/mvccpb" 27 "go.etcd.io/etcd/pkg/v3/schedule" 28 "go.etcd.io/etcd/pkg/v3/traceutil" 29 "go.etcd.io/etcd/server/v3/etcdserver/cindex" 30 "go.etcd.io/etcd/server/v3/lease" 31 "go.etcd.io/etcd/server/v3/mvcc/backend" 32 33 "go.uber.org/zap" 34) 35 36var ( 37 keyBucketName = []byte("key") 38 metaBucketName = []byte("meta") 39 40 consistentIndexKeyName = []byte("consistent_index") 41 scheduledCompactKeyName = []byte("scheduledCompactRev") 42 finishedCompactKeyName = []byte("finishedCompactRev") 43 44 ErrCompacted = errors.New("mvcc: required revision has been compacted") 45 ErrFutureRev = errors.New("mvcc: required revision is a future revision") 46 ErrCanceled = errors.New("mvcc: watcher is canceled") 47) 48 49const ( 50 // markedRevBytesLen is the byte length of marked revision. 51 // The first `revBytesLen` bytes represents a normal revision. The last 52 // one byte is the mark. 53 markedRevBytesLen = revBytesLen + 1 54 markBytePosition = markedRevBytesLen - 1 55 markTombstone byte = 't' 56) 57 58var restoreChunkKeys = 10000 // non-const for testing 59var defaultCompactBatchLimit = 1000 60 61type StoreConfig struct { 62 CompactionBatchLimit int 63} 64 65type store struct { 66 ReadView 67 WriteView 68 69 cfg StoreConfig 70 71 // mu read locks for txns and write locks for non-txn store changes. 72 mu sync.RWMutex 73 74 ci cindex.ConsistentIndexer 75 76 b backend.Backend 77 kvindex index 78 79 le lease.Lessor 80 81 // revMuLock protects currentRev and compactMainRev. 82 // Locked at end of write txn and released after write txn unlock lock. 83 // Locked before locking read txn and released after locking. 84 revMu sync.RWMutex 85 // currentRev is the revision of the last completed transaction. 86 currentRev int64 87 // compactMainRev is the main revision of the last compaction. 88 compactMainRev int64 89 90 fifoSched schedule.Scheduler 91 92 stopc chan struct{} 93 94 lg *zap.Logger 95} 96 97// NewStore returns a new store. It is useful to create a store inside 98// mvcc pkg. It should only be used for testing externally. 99func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store { 100 if lg == nil { 101 lg = zap.NewNop() 102 } 103 if cfg.CompactionBatchLimit == 0 { 104 cfg.CompactionBatchLimit = defaultCompactBatchLimit 105 } 106 s := &store{ 107 cfg: cfg, 108 b: b, 109 ci: ci, 110 kvindex: newTreeIndex(lg), 111 112 le: le, 113 114 currentRev: 1, 115 compactMainRev: -1, 116 117 fifoSched: schedule.NewFIFOScheduler(), 118 119 stopc: make(chan struct{}), 120 121 lg: lg, 122 } 123 s.ReadView = &readView{s} 124 s.WriteView = &writeView{s} 125 if s.le != nil { 126 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) 127 } 128 129 tx := s.b.BatchTx() 130 tx.Lock() 131 tx.UnsafeCreateBucket(keyBucketName) 132 tx.UnsafeCreateBucket(metaBucketName) 133 tx.Unlock() 134 s.b.ForceCommit() 135 136 s.mu.Lock() 137 defer s.mu.Unlock() 138 if err := s.restore(); err != nil { 139 // TODO: return the error instead of panic here? 140 panic("failed to recover store from backend") 141 } 142 143 return s 144} 145 146func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { 147 if ctx == nil || ctx.Err() != nil { 148 select { 149 case <-s.stopc: 150 default: 151 // fix deadlock in mvcc,for more information, please refer to pr 11817. 152 // s.stopc is only updated in restore operation, which is called by apply 153 // snapshot call, compaction and apply snapshot requests are serialized by 154 // raft, and do not happen at the same time. 155 s.mu.Lock() 156 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 157 s.fifoSched.Schedule(f) 158 s.mu.Unlock() 159 } 160 return 161 } 162 close(ch) 163} 164 165func (s *store) Hash() (hash uint32, revision int64, err error) { 166 // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly 167 start := time.Now() 168 169 s.b.ForceCommit() 170 h, err := s.b.Hash(DefaultIgnores) 171 172 hashSec.Observe(time.Since(start).Seconds()) 173 return h, s.currentRev, err 174} 175 176func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { 177 start := time.Now() 178 179 s.mu.RLock() 180 s.revMu.RLock() 181 compactRev, currentRev = s.compactMainRev, s.currentRev 182 s.revMu.RUnlock() 183 184 if rev > 0 && rev <= compactRev { 185 s.mu.RUnlock() 186 return 0, 0, compactRev, ErrCompacted 187 } else if rev > 0 && rev > currentRev { 188 s.mu.RUnlock() 189 return 0, currentRev, 0, ErrFutureRev 190 } 191 192 if rev == 0 { 193 rev = currentRev 194 } 195 keep := s.kvindex.Keep(rev) 196 197 tx := s.b.ReadTx() 198 tx.RLock() 199 defer tx.RUnlock() 200 s.mu.RUnlock() 201 202 upper := revision{main: rev + 1} 203 lower := revision{main: compactRev + 1} 204 h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 205 206 h.Write(keyBucketName) 207 err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { 208 kr := bytesToRev(k) 209 if !upper.GreaterThan(kr) { 210 return nil 211 } 212 // skip revisions that are scheduled for deletion 213 // due to compacting; don't skip if there isn't one. 214 if lower.GreaterThan(kr) && len(keep) > 0 { 215 if _, ok := keep[kr]; !ok { 216 return nil 217 } 218 } 219 h.Write(k) 220 h.Write(v) 221 return nil 222 }) 223 hash = h.Sum32() 224 225 hashRevSec.Observe(time.Since(start).Seconds()) 226 return hash, currentRev, compactRev, err 227} 228 229func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { 230 s.revMu.Lock() 231 if rev <= s.compactMainRev { 232 ch := make(chan struct{}) 233 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 234 s.fifoSched.Schedule(f) 235 s.revMu.Unlock() 236 return ch, ErrCompacted 237 } 238 if rev > s.currentRev { 239 s.revMu.Unlock() 240 return nil, ErrFutureRev 241 } 242 243 s.compactMainRev = rev 244 245 rbytes := newRevBytes() 246 revToBytes(revision{main: rev}, rbytes) 247 248 tx := s.b.BatchTx() 249 tx.Lock() 250 tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) 251 tx.Unlock() 252 // ensure that desired compaction is persisted 253 s.b.ForceCommit() 254 255 s.revMu.Unlock() 256 257 return nil, nil 258} 259 260func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { 261 ch := make(chan struct{}) 262 var j = func(ctx context.Context) { 263 if ctx.Err() != nil { 264 s.compactBarrier(ctx, ch) 265 return 266 } 267 start := time.Now() 268 keep := s.kvindex.Compact(rev) 269 indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) 270 if !s.scheduleCompaction(rev, keep) { 271 s.compactBarrier(nil, ch) 272 return 273 } 274 close(ch) 275 } 276 277 s.fifoSched.Schedule(j) 278 trace.Step("schedule compaction") 279 return ch, nil 280} 281 282func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { 283 ch, err := s.updateCompactRev(rev) 284 if err != nil { 285 return ch, err 286 } 287 288 return s.compact(traceutil.TODO(), rev) 289} 290 291func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { 292 s.mu.Lock() 293 294 ch, err := s.updateCompactRev(rev) 295 trace.Step("check and update compact revision") 296 if err != nil { 297 s.mu.Unlock() 298 return ch, err 299 } 300 s.mu.Unlock() 301 302 return s.compact(trace, rev) 303} 304 305// DefaultIgnores is a map of keys to ignore in hash checking. 306var DefaultIgnores map[backend.IgnoreKey]struct{} 307 308func init() { 309 DefaultIgnores = map[backend.IgnoreKey]struct{}{ 310 // consistent index might be changed due to v2 internal sync, which 311 // is not controllable by the user. 312 {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {}, 313 } 314} 315 316func (s *store) Commit() { 317 s.mu.Lock() 318 defer s.mu.Unlock() 319 320 tx := s.b.BatchTx() 321 tx.Lock() 322 s.saveIndex(tx) 323 tx.Unlock() 324 s.b.ForceCommit() 325} 326 327func (s *store) Restore(b backend.Backend) error { 328 s.mu.Lock() 329 defer s.mu.Unlock() 330 331 close(s.stopc) 332 s.fifoSched.Stop() 333 334 s.b = b 335 s.kvindex = newTreeIndex(s.lg) 336 337 { 338 // During restore the metrics might report 'special' values 339 s.revMu.Lock() 340 s.currentRev = 1 341 s.compactMainRev = -1 342 s.revMu.Unlock() 343 } 344 345 s.fifoSched = schedule.NewFIFOScheduler() 346 s.stopc = make(chan struct{}) 347 s.ci.SetBatchTx(b.BatchTx()) 348 s.ci.SetConsistentIndex(0) 349 350 return s.restore() 351} 352 353func (s *store) restore() error { 354 s.setupMetricsReporter() 355 356 min, max := newRevBytes(), newRevBytes() 357 revToBytes(revision{main: 1}, min) 358 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) 359 360 keyToLease := make(map[string]lease.LeaseID) 361 362 // restore index 363 tx := s.b.BatchTx() 364 tx.Lock() 365 366 _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) 367 if len(finishedCompactBytes) != 0 { 368 s.revMu.Lock() 369 s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main 370 371 s.lg.Info( 372 "restored last compact revision", 373 zap.String("meta-bucket-name", string(metaBucketName)), 374 zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), 375 zap.Int64("restored-compact-revision", s.compactMainRev), 376 ) 377 s.revMu.Unlock() 378 } 379 _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) 380 scheduledCompact := int64(0) 381 if len(scheduledCompactBytes) != 0 { 382 scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main 383 } 384 385 // index keys concurrently as they're loaded in from tx 386 keysGauge.Set(0) 387 rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) 388 for { 389 keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) 390 if len(keys) == 0 { 391 break 392 } 393 // rkvc blocks if the total pending keys exceeds the restore 394 // chunk size to keep keys from consuming too much memory. 395 restoreChunk(s.lg, rkvc, keys, vals, keyToLease) 396 if len(keys) < restoreChunkKeys { 397 // partial set implies final set 398 break 399 } 400 // next set begins after where this one ended 401 newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) 402 newMin.sub++ 403 revToBytes(newMin, min) 404 } 405 close(rkvc) 406 407 { 408 s.revMu.Lock() 409 s.currentRev = <-revc 410 411 // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. 412 // the correct revision should be set to compaction revision in the case, not the largest revision 413 // we have seen. 414 if s.currentRev < s.compactMainRev { 415 s.currentRev = s.compactMainRev 416 } 417 s.revMu.Unlock() 418 } 419 420 if scheduledCompact <= s.compactMainRev { 421 scheduledCompact = 0 422 } 423 424 for key, lid := range keyToLease { 425 if s.le == nil { 426 tx.Unlock() 427 panic("no lessor to attach lease") 428 } 429 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) 430 if err != nil { 431 s.lg.Error( 432 "failed to attach a lease", 433 zap.String("lease-id", fmt.Sprintf("%016x", lid)), 434 zap.Error(err), 435 ) 436 } 437 } 438 439 tx.Unlock() 440 441 if scheduledCompact != 0 { 442 if _, err := s.compactLockfree(scheduledCompact); err != nil { 443 s.lg.Warn("compaction encountered error", zap.Error(err)) 444 } 445 446 s.lg.Info( 447 "resume scheduled compaction", 448 zap.String("meta-bucket-name", string(metaBucketName)), 449 zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), 450 zap.Int64("scheduled-compact-revision", scheduledCompact), 451 ) 452 } 453 454 return nil 455} 456 457type revKeyValue struct { 458 key []byte 459 kv mvccpb.KeyValue 460 kstr string 461} 462 463func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { 464 rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) 465 go func() { 466 currentRev := int64(1) 467 defer func() { revc <- currentRev }() 468 // restore the tree index from streaming the unordered index. 469 kiCache := make(map[string]*keyIndex, restoreChunkKeys) 470 for rkv := range rkvc { 471 ki, ok := kiCache[rkv.kstr] 472 // purge kiCache if many keys but still missing in the cache 473 if !ok && len(kiCache) >= restoreChunkKeys { 474 i := 10 475 for k := range kiCache { 476 delete(kiCache, k) 477 if i--; i == 0 { 478 break 479 } 480 } 481 } 482 // cache miss, fetch from tree index if there 483 if !ok { 484 ki = &keyIndex{key: rkv.kv.Key} 485 if idxKey := idx.KeyIndex(ki); idxKey != nil { 486 kiCache[rkv.kstr], ki = idxKey, idxKey 487 ok = true 488 } 489 } 490 rev := bytesToRev(rkv.key) 491 currentRev = rev.main 492 if ok { 493 if isTombstone(rkv.key) { 494 if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { 495 lg.Warn("tombstone encountered error", zap.Error(err)) 496 } 497 continue 498 } 499 ki.put(lg, rev.main, rev.sub) 500 } else if !isTombstone(rkv.key) { 501 ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) 502 idx.Insert(ki) 503 kiCache[rkv.kstr] = ki 504 } 505 } 506 }() 507 return rkvc, revc 508} 509 510func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { 511 for i, key := range keys { 512 rkv := revKeyValue{key: key} 513 if err := rkv.kv.Unmarshal(vals[i]); err != nil { 514 lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) 515 } 516 rkv.kstr = string(rkv.kv.Key) 517 if isTombstone(key) { 518 delete(keyToLease, rkv.kstr) 519 } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { 520 keyToLease[rkv.kstr] = lid 521 } else { 522 delete(keyToLease, rkv.kstr) 523 } 524 kvc <- rkv 525 } 526} 527 528func (s *store) Close() error { 529 close(s.stopc) 530 s.fifoSched.Stop() 531 return nil 532} 533 534func (s *store) saveIndex(tx backend.BatchTx) { 535 if s.ci != nil { 536 s.ci.UnsafeSave(tx) 537 } 538} 539 540func (s *store) ConsistentIndex() uint64 { 541 if s.ci != nil { 542 return s.ci.ConsistentIndex() 543 } 544 return 0 545} 546 547func (s *store) setupMetricsReporter() { 548 b := s.b 549 reportDbTotalSizeInBytesMu.Lock() 550 reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } 551 reportDbTotalSizeInBytesMu.Unlock() 552 reportDbTotalSizeInBytesDebugMu.Lock() 553 reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } 554 reportDbTotalSizeInBytesDebugMu.Unlock() 555 reportDbTotalSizeInUseInBytesMu.Lock() 556 reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } 557 reportDbTotalSizeInUseInBytesMu.Unlock() 558 reportDbOpenReadTxNMu.Lock() 559 reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } 560 reportDbOpenReadTxNMu.Unlock() 561 reportCurrentRevMu.Lock() 562 reportCurrentRev = func() float64 { 563 s.revMu.RLock() 564 defer s.revMu.RUnlock() 565 return float64(s.currentRev) 566 } 567 reportCurrentRevMu.Unlock() 568 reportCompactRevMu.Lock() 569 reportCompactRev = func() float64 { 570 s.revMu.RLock() 571 defer s.revMu.RUnlock() 572 return float64(s.compactMainRev) 573 } 574 reportCompactRevMu.Unlock() 575} 576 577// appendMarkTombstone appends tombstone mark to normal revision bytes. 578func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { 579 if len(b) != revBytesLen { 580 lg.Panic( 581 "cannot append tombstone mark to non-normal revision bytes", 582 zap.Int("expected-revision-bytes-size", revBytesLen), 583 zap.Int("given-revision-bytes-size", len(b)), 584 ) 585 } 586 return append(b, markTombstone) 587} 588 589// isTombstone checks whether the revision bytes is a tombstone. 590func isTombstone(b []byte) bool { 591 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone 592} 593