1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mvcc 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "hash/crc32" 22 "math" 23 "sync" 24 "time" 25 26 "go.etcd.io/etcd/etcdserver/cindex" 27 "go.etcd.io/etcd/lease" 28 "go.etcd.io/etcd/mvcc/backend" 29 "go.etcd.io/etcd/mvcc/mvccpb" 30 "go.etcd.io/etcd/pkg/schedule" 31 "go.etcd.io/etcd/pkg/traceutil" 32 33 "go.uber.org/zap" 34) 35 36var ( 37 keyBucketName = []byte("key") 38 metaBucketName = []byte("meta") 39 40 consistentIndexKeyName = []byte("consistent_index") 41 scheduledCompactKeyName = []byte("scheduledCompactRev") 42 finishedCompactKeyName = []byte("finishedCompactRev") 43 44 ErrCompacted = errors.New("mvcc: required revision has been compacted") 45 ErrFutureRev = errors.New("mvcc: required revision is a future revision") 46 ErrCanceled = errors.New("mvcc: watcher is canceled") 47) 48 49const ( 50 // markedRevBytesLen is the byte length of marked revision. 51 // The first `revBytesLen` bytes represents a normal revision. The last 52 // one byte is the mark. 53 markedRevBytesLen = revBytesLen + 1 54 markBytePosition = markedRevBytesLen - 1 55 markTombstone byte = 't' 56) 57 58var restoreChunkKeys = 10000 // non-const for testing 59var defaultCompactBatchLimit = 1000 60 61type StoreConfig struct { 62 CompactionBatchLimit int 63} 64 65type store struct { 66 ReadView 67 WriteView 68 69 cfg StoreConfig 70 71 // mu read locks for txns and write locks for non-txn store changes. 72 mu sync.RWMutex 73 74 ci cindex.ConsistentIndexer 75 76 b backend.Backend 77 kvindex index 78 79 le lease.Lessor 80 81 // revMuLock protects currentRev and compactMainRev. 82 // Locked at end of write txn and released after write txn unlock lock. 83 // Locked before locking read txn and released after locking. 84 revMu sync.RWMutex 85 // currentRev is the revision of the last completed transaction. 86 currentRev int64 87 // compactMainRev is the main revision of the last compaction. 88 compactMainRev int64 89 90 fifoSched schedule.Scheduler 91 92 stopc chan struct{} 93 94 lg *zap.Logger 95} 96 97// NewStore returns a new store. It is useful to create a store inside 98// mvcc pkg. It should only be used for testing externally. 99func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store { 100 if lg == nil { 101 lg = zap.NewNop() 102 } 103 if cfg.CompactionBatchLimit == 0 { 104 cfg.CompactionBatchLimit = defaultCompactBatchLimit 105 } 106 s := &store{ 107 cfg: cfg, 108 b: b, 109 ci: ci, 110 kvindex: newTreeIndex(lg), 111 112 le: le, 113 114 currentRev: 1, 115 compactMainRev: -1, 116 117 fifoSched: schedule.NewFIFOScheduler(), 118 119 stopc: make(chan struct{}), 120 121 lg: lg, 122 } 123 s.ReadView = &readView{s} 124 s.WriteView = &writeView{s} 125 if s.le != nil { 126 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) 127 } 128 129 tx := s.b.BatchTx() 130 tx.Lock() 131 tx.UnsafeCreateBucket(keyBucketName) 132 tx.UnsafeCreateBucket(metaBucketName) 133 tx.Unlock() 134 s.b.ForceCommit() 135 136 s.mu.Lock() 137 defer s.mu.Unlock() 138 if err := s.restore(); err != nil { 139 // TODO: return the error instead of panic here? 140 panic("failed to recover store from backend") 141 } 142 143 return s 144} 145 146func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { 147 if ctx == nil || ctx.Err() != nil { 148 s.mu.Lock() 149 select { 150 case <-s.stopc: 151 default: 152 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 153 s.fifoSched.Schedule(f) 154 } 155 s.mu.Unlock() 156 return 157 } 158 close(ch) 159} 160 161func (s *store) Hash() (hash uint32, revision int64, err error) { 162 // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly 163 start := time.Now() 164 165 s.b.ForceCommit() 166 h, err := s.b.Hash(DefaultIgnores) 167 168 hashSec.Observe(time.Since(start).Seconds()) 169 return h, s.currentRev, err 170} 171 172func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { 173 start := time.Now() 174 175 s.mu.RLock() 176 s.revMu.RLock() 177 compactRev, currentRev = s.compactMainRev, s.currentRev 178 s.revMu.RUnlock() 179 180 if rev > 0 && rev <= compactRev { 181 s.mu.RUnlock() 182 return 0, 0, compactRev, ErrCompacted 183 } else if rev > 0 && rev > currentRev { 184 s.mu.RUnlock() 185 return 0, currentRev, 0, ErrFutureRev 186 } 187 188 if rev == 0 { 189 rev = currentRev 190 } 191 keep := s.kvindex.Keep(rev) 192 193 tx := s.b.ReadTx() 194 tx.RLock() 195 defer tx.RUnlock() 196 s.mu.RUnlock() 197 198 upper := revision{main: rev + 1} 199 lower := revision{main: compactRev + 1} 200 h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 201 202 h.Write(keyBucketName) 203 err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { 204 kr := bytesToRev(k) 205 if !upper.GreaterThan(kr) { 206 return nil 207 } 208 // skip revisions that are scheduled for deletion 209 // due to compacting; don't skip if there isn't one. 210 if lower.GreaterThan(kr) && len(keep) > 0 { 211 if _, ok := keep[kr]; !ok { 212 return nil 213 } 214 } 215 h.Write(k) 216 h.Write(v) 217 return nil 218 }) 219 hash = h.Sum32() 220 221 hashRevSec.Observe(time.Since(start).Seconds()) 222 return hash, currentRev, compactRev, err 223} 224 225func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { 226 s.revMu.Lock() 227 if rev <= s.compactMainRev { 228 ch := make(chan struct{}) 229 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) } 230 s.fifoSched.Schedule(f) 231 s.revMu.Unlock() 232 return ch, ErrCompacted 233 } 234 if rev > s.currentRev { 235 s.revMu.Unlock() 236 return nil, ErrFutureRev 237 } 238 239 s.compactMainRev = rev 240 241 rbytes := newRevBytes() 242 revToBytes(revision{main: rev}, rbytes) 243 244 tx := s.b.BatchTx() 245 tx.Lock() 246 tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes) 247 tx.Unlock() 248 // ensure that desired compaction is persisted 249 s.b.ForceCommit() 250 251 s.revMu.Unlock() 252 253 return nil, nil 254} 255 256func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { 257 ch := make(chan struct{}) 258 var j = func(ctx context.Context) { 259 if ctx.Err() != nil { 260 s.compactBarrier(ctx, ch) 261 return 262 } 263 start := time.Now() 264 keep := s.kvindex.Compact(rev) 265 indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) 266 if !s.scheduleCompaction(rev, keep) { 267 s.compactBarrier(nil, ch) 268 return 269 } 270 close(ch) 271 } 272 273 s.fifoSched.Schedule(j) 274 trace.Step("schedule compaction") 275 return ch, nil 276} 277 278func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { 279 ch, err := s.updateCompactRev(rev) 280 if err != nil { 281 return ch, err 282 } 283 284 return s.compact(traceutil.TODO(), rev) 285} 286 287func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { 288 s.mu.Lock() 289 290 ch, err := s.updateCompactRev(rev) 291 trace.Step("check and update compact revision") 292 if err != nil { 293 s.mu.Unlock() 294 return ch, err 295 } 296 s.mu.Unlock() 297 298 return s.compact(trace, rev) 299} 300 301// DefaultIgnores is a map of keys to ignore in hash checking. 302var DefaultIgnores map[backend.IgnoreKey]struct{} 303 304func init() { 305 DefaultIgnores = map[backend.IgnoreKey]struct{}{ 306 // consistent index might be changed due to v2 internal sync, which 307 // is not controllable by the user. 308 {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {}, 309 } 310} 311 312func (s *store) Commit() { 313 s.mu.Lock() 314 defer s.mu.Unlock() 315 316 tx := s.b.BatchTx() 317 tx.Lock() 318 s.saveIndex(tx) 319 tx.Unlock() 320 s.b.ForceCommit() 321} 322 323func (s *store) Restore(b backend.Backend) error { 324 s.mu.Lock() 325 defer s.mu.Unlock() 326 327 close(s.stopc) 328 s.fifoSched.Stop() 329 330 s.b = b 331 s.kvindex = newTreeIndex(s.lg) 332 s.currentRev = 1 333 s.compactMainRev = -1 334 s.fifoSched = schedule.NewFIFOScheduler() 335 s.stopc = make(chan struct{}) 336 s.ci.SetBatchTx(b.BatchTx()) 337 s.ci.SetConsistentIndex(0) 338 339 return s.restore() 340} 341 342func (s *store) restore() error { 343 s.setupMetricsReporter() 344 345 min, max := newRevBytes(), newRevBytes() 346 revToBytes(revision{main: 1}, min) 347 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) 348 349 keyToLease := make(map[string]lease.LeaseID) 350 351 // restore index 352 tx := s.b.BatchTx() 353 tx.Lock() 354 355 _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0) 356 if len(finishedCompactBytes) != 0 { 357 s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main 358 359 s.lg.Info( 360 "restored last compact revision", 361 zap.String("meta-bucket-name", string(metaBucketName)), 362 zap.String("meta-bucket-name-key", string(finishedCompactKeyName)), 363 zap.Int64("restored-compact-revision", s.compactMainRev), 364 ) 365 } 366 _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) 367 scheduledCompact := int64(0) 368 if len(scheduledCompactBytes) != 0 { 369 scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main 370 } 371 372 // index keys concurrently as they're loaded in from tx 373 keysGauge.Set(0) 374 rkvc, revc := restoreIntoIndex(s.lg, s.kvindex) 375 for { 376 keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys)) 377 if len(keys) == 0 { 378 break 379 } 380 // rkvc blocks if the total pending keys exceeds the restore 381 // chunk size to keep keys from consuming too much memory. 382 restoreChunk(s.lg, rkvc, keys, vals, keyToLease) 383 if len(keys) < restoreChunkKeys { 384 // partial set implies final set 385 break 386 } 387 // next set begins after where this one ended 388 newMin := bytesToRev(keys[len(keys)-1][:revBytesLen]) 389 newMin.sub++ 390 revToBytes(newMin, min) 391 } 392 close(rkvc) 393 s.currentRev = <-revc 394 395 // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction. 396 // the correct revision should be set to compaction revision in the case, not the largest revision 397 // we have seen. 398 if s.currentRev < s.compactMainRev { 399 s.currentRev = s.compactMainRev 400 } 401 if scheduledCompact <= s.compactMainRev { 402 scheduledCompact = 0 403 } 404 405 for key, lid := range keyToLease { 406 if s.le == nil { 407 tx.Unlock() 408 panic("no lessor to attach lease") 409 } 410 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) 411 if err != nil { 412 s.lg.Error( 413 "failed to attach a lease", 414 zap.String("lease-id", fmt.Sprintf("%016x", lid)), 415 zap.Error(err), 416 ) 417 } 418 } 419 420 tx.Unlock() 421 422 if scheduledCompact != 0 { 423 if _, err := s.compactLockfree(scheduledCompact); err != nil { 424 s.lg.Warn("compaction encountered error", zap.Error(err)) 425 } 426 427 s.lg.Info( 428 "resume scheduled compaction", 429 zap.String("meta-bucket-name", string(metaBucketName)), 430 zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)), 431 zap.Int64("scheduled-compact-revision", scheduledCompact), 432 ) 433 } 434 435 return nil 436} 437 438type revKeyValue struct { 439 key []byte 440 kv mvccpb.KeyValue 441 kstr string 442} 443 444func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) { 445 rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1) 446 go func() { 447 currentRev := int64(1) 448 defer func() { revc <- currentRev }() 449 // restore the tree index from streaming the unordered index. 450 kiCache := make(map[string]*keyIndex, restoreChunkKeys) 451 for rkv := range rkvc { 452 ki, ok := kiCache[rkv.kstr] 453 // purge kiCache if many keys but still missing in the cache 454 if !ok && len(kiCache) >= restoreChunkKeys { 455 i := 10 456 for k := range kiCache { 457 delete(kiCache, k) 458 if i--; i == 0 { 459 break 460 } 461 } 462 } 463 // cache miss, fetch from tree index if there 464 if !ok { 465 ki = &keyIndex{key: rkv.kv.Key} 466 if idxKey := idx.KeyIndex(ki); idxKey != nil { 467 kiCache[rkv.kstr], ki = idxKey, idxKey 468 ok = true 469 } 470 } 471 rev := bytesToRev(rkv.key) 472 currentRev = rev.main 473 if ok { 474 if isTombstone(rkv.key) { 475 if err := ki.tombstone(lg, rev.main, rev.sub); err != nil { 476 lg.Warn("tombstone encountered error", zap.Error(err)) 477 } 478 continue 479 } 480 ki.put(lg, rev.main, rev.sub) 481 } else if !isTombstone(rkv.key) { 482 ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version) 483 idx.Insert(ki) 484 kiCache[rkv.kstr] = ki 485 } 486 } 487 }() 488 return rkvc, revc 489} 490 491func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) { 492 for i, key := range keys { 493 rkv := revKeyValue{key: key} 494 if err := rkv.kv.Unmarshal(vals[i]); err != nil { 495 lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) 496 } 497 rkv.kstr = string(rkv.kv.Key) 498 if isTombstone(key) { 499 delete(keyToLease, rkv.kstr) 500 } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease { 501 keyToLease[rkv.kstr] = lid 502 } else { 503 delete(keyToLease, rkv.kstr) 504 } 505 kvc <- rkv 506 } 507} 508 509func (s *store) Close() error { 510 close(s.stopc) 511 s.fifoSched.Stop() 512 return nil 513} 514 515func (s *store) saveIndex(tx backend.BatchTx) { 516 if s.ci != nil { 517 s.ci.UnsafeSave(tx) 518 } 519} 520 521func (s *store) ConsistentIndex() uint64 { 522 if s.ci != nil { 523 return s.ci.ConsistentIndex() 524 } 525 return 0 526} 527 528func (s *store) setupMetricsReporter() { 529 b := s.b 530 reportDbTotalSizeInBytesMu.Lock() 531 reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } 532 reportDbTotalSizeInBytesMu.Unlock() 533 reportDbTotalSizeInBytesDebugMu.Lock() 534 reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } 535 reportDbTotalSizeInBytesDebugMu.Unlock() 536 reportDbTotalSizeInUseInBytesMu.Lock() 537 reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } 538 reportDbTotalSizeInUseInBytesMu.Unlock() 539 reportDbOpenReadTxNMu.Lock() 540 reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } 541 reportDbOpenReadTxNMu.Unlock() 542 reportCurrentRevMu.Lock() 543 reportCurrentRev = func() float64 { 544 s.revMu.RLock() 545 defer s.revMu.RUnlock() 546 return float64(s.currentRev) 547 } 548 reportCurrentRevMu.Unlock() 549 reportCompactRevMu.Lock() 550 reportCompactRev = func() float64 { 551 s.revMu.RLock() 552 defer s.revMu.RUnlock() 553 return float64(s.compactMainRev) 554 } 555 reportCompactRevMu.Unlock() 556} 557 558// appendMarkTombstone appends tombstone mark to normal revision bytes. 559func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { 560 if len(b) != revBytesLen { 561 lg.Panic( 562 "cannot append tombstone mark to non-normal revision bytes", 563 zap.Int("expected-revision-bytes-size", revBytesLen), 564 zap.Int("given-revision-bytes-size", len(b)), 565 ) 566 } 567 return append(b, markTombstone) 568} 569 570// isTombstone checks whether the revision bytes is a tombstone. 571func isTombstone(b []byte) bool { 572 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone 573} 574