1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mvcc 16 17import ( 18 "sync" 19 "time" 20 21 "github.com/coreos/etcd/lease" 22 "github.com/coreos/etcd/mvcc/backend" 23 "github.com/coreos/etcd/mvcc/mvccpb" 24) 25 26const ( 27 // chanBufLen is the length of the buffered chan 28 // for sending out watched events. 29 // TODO: find a good buf value. 1024 is just a random one that 30 // seems to be reasonable. 31 chanBufLen = 1024 32 33 // maxWatchersPerSync is the number of watchers to sync in a single batch 34 maxWatchersPerSync = 512 35) 36 37type watchable interface { 38 watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) 39 progress(w *watcher) 40 rev() int64 41} 42 43type watchableStore struct { 44 mu sync.Mutex 45 46 *store 47 48 // victims are watcher batches that were blocked on the watch channel 49 victims []watcherBatch 50 victimc chan struct{} 51 52 // contains all unsynced watchers that needs to sync with events that have happened 53 unsynced watcherGroup 54 55 // contains all synced watchers that are in sync with the progress of the store. 56 // The key of the map is the key that the watcher watches on. 57 synced watcherGroup 58 59 stopc chan struct{} 60 wg sync.WaitGroup 61} 62 63// cancelFunc updates unsynced and synced maps when running 64// cancel operations. 65type cancelFunc func() 66 67func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { 68 return newWatchableStore(b, le, ig) 69} 70 71func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { 72 s := &watchableStore{ 73 store: NewStore(b, le, ig), 74 victimc: make(chan struct{}, 1), 75 unsynced: newWatcherGroup(), 76 synced: newWatcherGroup(), 77 stopc: make(chan struct{}), 78 } 79 if s.le != nil { 80 // use this store as the deleter so revokes trigger watch events 81 s.le.SetRangeDeleter(s) 82 } 83 s.wg.Add(2) 84 go s.syncWatchersLoop() 85 go s.syncVictimsLoop() 86 return s 87} 88 89func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) { 90 s.mu.Lock() 91 defer s.mu.Unlock() 92 93 rev = s.store.Put(key, value, lease) 94 changes := s.store.getChanges() 95 if len(changes) != 1 { 96 plog.Panicf("unexpected len(changes) != 1 after put") 97 } 98 99 ev := mvccpb.Event{ 100 Type: mvccpb.PUT, 101 Kv: &changes[0], 102 } 103 s.notify(rev, []mvccpb.Event{ev}) 104 return rev 105} 106 107func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { 108 s.mu.Lock() 109 defer s.mu.Unlock() 110 111 n, rev = s.store.DeleteRange(key, end) 112 changes := s.store.getChanges() 113 114 if len(changes) != int(n) { 115 plog.Panicf("unexpected len(changes) != n after deleteRange") 116 } 117 118 if n == 0 { 119 return n, rev 120 } 121 122 evs := make([]mvccpb.Event, n) 123 for i := range changes { 124 evs[i] = mvccpb.Event{ 125 Type: mvccpb.DELETE, 126 Kv: &changes[i]} 127 evs[i].Kv.ModRevision = rev 128 } 129 s.notify(rev, evs) 130 return n, rev 131} 132 133func (s *watchableStore) TxnBegin() int64 { 134 s.mu.Lock() 135 return s.store.TxnBegin() 136} 137 138func (s *watchableStore) TxnEnd(txnID int64) error { 139 err := s.store.TxnEnd(txnID) 140 if err != nil { 141 return err 142 } 143 144 changes := s.getChanges() 145 if len(changes) == 0 { 146 s.mu.Unlock() 147 return nil 148 } 149 150 rev := s.store.Rev() 151 evs := make([]mvccpb.Event, len(changes)) 152 for i, change := range changes { 153 switch change.CreateRevision { 154 case 0: 155 evs[i] = mvccpb.Event{ 156 Type: mvccpb.DELETE, 157 Kv: &changes[i]} 158 evs[i].Kv.ModRevision = rev 159 default: 160 evs[i] = mvccpb.Event{ 161 Type: mvccpb.PUT, 162 Kv: &changes[i]} 163 } 164 } 165 166 s.notify(rev, evs) 167 s.mu.Unlock() 168 169 return nil 170} 171 172func (s *watchableStore) Close() error { 173 close(s.stopc) 174 s.wg.Wait() 175 return s.store.Close() 176} 177 178func (s *watchableStore) NewWatchStream() WatchStream { 179 watchStreamGauge.Inc() 180 return &watchStream{ 181 watchable: s, 182 ch: make(chan WatchResponse, chanBufLen), 183 cancels: make(map[WatchID]cancelFunc), 184 watchers: make(map[WatchID]*watcher), 185 } 186} 187 188func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { 189 s.mu.Lock() 190 defer s.mu.Unlock() 191 192 wa := &watcher{ 193 key: key, 194 end: end, 195 minRev: startRev, 196 id: id, 197 ch: ch, 198 fcs: fcs, 199 } 200 201 s.store.mu.Lock() 202 synced := startRev > s.store.currentRev.main || startRev == 0 203 if synced { 204 wa.minRev = s.store.currentRev.main + 1 205 if startRev > wa.minRev { 206 wa.minRev = startRev 207 } 208 } 209 s.store.mu.Unlock() 210 if synced { 211 s.synced.add(wa) 212 } else { 213 slowWatcherGauge.Inc() 214 s.unsynced.add(wa) 215 } 216 watcherGauge.Inc() 217 218 return wa, func() { s.cancelWatcher(wa) } 219} 220 221// cancelWatcher removes references of the watcher from the watchableStore 222func (s *watchableStore) cancelWatcher(wa *watcher) { 223 for { 224 s.mu.Lock() 225 226 if s.unsynced.delete(wa) { 227 slowWatcherGauge.Dec() 228 break 229 } else if s.synced.delete(wa) { 230 break 231 } else if wa.compacted { 232 break 233 } 234 235 if !wa.victim { 236 panic("watcher not victim but not in watch groups") 237 } 238 239 var victimBatch watcherBatch 240 for _, wb := range s.victims { 241 if wb[wa] != nil { 242 victimBatch = wb 243 break 244 } 245 } 246 if victimBatch != nil { 247 slowWatcherGauge.Dec() 248 delete(victimBatch, wa) 249 break 250 } 251 252 // victim being processed so not accessible; retry 253 s.mu.Unlock() 254 time.Sleep(time.Millisecond) 255 } 256 257 watcherGauge.Dec() 258 s.mu.Unlock() 259} 260 261func (s *watchableStore) Restore(b backend.Backend) error { 262 s.mu.Lock() 263 defer s.mu.Unlock() 264 err := s.store.Restore(b) 265 if err != nil { 266 return err 267 } 268 269 for wa := range s.synced.watchers { 270 wa.restore = true 271 s.unsynced.add(wa) 272 } 273 s.synced = newWatcherGroup() 274 return nil 275} 276 277// syncWatchersLoop syncs the watcher in the unsynced map every 100ms. 278func (s *watchableStore) syncWatchersLoop() { 279 defer s.wg.Done() 280 281 for { 282 s.mu.Lock() 283 st := time.Now() 284 lastUnsyncedWatchers := s.unsynced.size() 285 s.syncWatchers() 286 unsyncedWatchers := s.unsynced.size() 287 s.mu.Unlock() 288 syncDuration := time.Since(st) 289 290 waitDuration := 100 * time.Millisecond 291 // more work pending? 292 if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers { 293 // be fair to other store operations by yielding time taken 294 waitDuration = syncDuration 295 } 296 297 select { 298 case <-time.After(waitDuration): 299 case <-s.stopc: 300 return 301 } 302 } 303} 304 305// syncVictimsLoop tries to write precomputed watcher responses to 306// watchers that had a blocked watcher channel 307func (s *watchableStore) syncVictimsLoop() { 308 defer s.wg.Done() 309 310 for { 311 for s.moveVictims() != 0 { 312 // try to update all victim watchers 313 } 314 s.mu.Lock() 315 isEmpty := len(s.victims) == 0 316 s.mu.Unlock() 317 318 var tickc <-chan time.Time 319 if !isEmpty { 320 tickc = time.After(10 * time.Millisecond) 321 } 322 323 select { 324 case <-tickc: 325 case <-s.victimc: 326 case <-s.stopc: 327 return 328 } 329 } 330} 331 332// moveVictims tries to update watches with already pending event data 333func (s *watchableStore) moveVictims() (moved int) { 334 s.mu.Lock() 335 victims := s.victims 336 s.victims = nil 337 s.mu.Unlock() 338 339 var newVictim watcherBatch 340 for _, wb := range victims { 341 // try to send responses again 342 for w, eb := range wb { 343 // watcher has observed the store up to, but not including, w.minRev 344 rev := w.minRev - 1 345 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { 346 pendingEventsGauge.Add(float64(len(eb.evs))) 347 } else { 348 if newVictim == nil { 349 newVictim = make(watcherBatch) 350 } 351 newVictim[w] = eb 352 continue 353 } 354 moved++ 355 } 356 357 // assign completed victim watchers to unsync/sync 358 s.mu.Lock() 359 s.store.mu.Lock() 360 curRev := s.store.currentRev.main 361 for w, eb := range wb { 362 if newVictim != nil && newVictim[w] != nil { 363 // couldn't send watch response; stays victim 364 continue 365 } 366 w.victim = false 367 if eb.moreRev != 0 { 368 w.minRev = eb.moreRev 369 } 370 if w.minRev <= curRev { 371 s.unsynced.add(w) 372 } else { 373 slowWatcherGauge.Dec() 374 s.synced.add(w) 375 } 376 } 377 s.store.mu.Unlock() 378 s.mu.Unlock() 379 } 380 381 if len(newVictim) > 0 { 382 s.mu.Lock() 383 s.victims = append(s.victims, newVictim) 384 s.mu.Unlock() 385 } 386 387 return moved 388} 389 390// syncWatchers syncs unsynced watchers by: 391// 1. choose a set of watchers from the unsynced watcher group 392// 2. iterate over the set to get the minimum revision and remove compacted watchers 393// 3. use minimum revision to get all key-value pairs and send those events to watchers 394// 4. remove synced watchers in set from unsynced group and move to synced group 395func (s *watchableStore) syncWatchers() { 396 if s.unsynced.size() == 0 { 397 return 398 } 399 400 s.store.mu.Lock() 401 defer s.store.mu.Unlock() 402 403 // in order to find key-value pairs from unsynced watchers, we need to 404 // find min revision index, and these revisions can be used to 405 // query the backend store of key-value pairs 406 curRev := s.store.currentRev.main 407 compactionRev := s.store.compactMainRev 408 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) 409 minBytes, maxBytes := newRevBytes(), newRevBytes() 410 revToBytes(revision{main: minRev}, minBytes) 411 revToBytes(revision{main: curRev + 1}, maxBytes) 412 413 // UnsafeRange returns keys and values. And in boltdb, keys are revisions. 414 // values are actual key-value pairs in backend. 415 tx := s.store.b.BatchTx() 416 tx.Lock() 417 revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) 418 evs := kvsToEvents(wg, revs, vs) 419 tx.Unlock() 420 421 var victims watcherBatch 422 wb := newWatcherBatch(wg, evs) 423 for w := range wg.watchers { 424 w.minRev = curRev + 1 425 426 eb, ok := wb[w] 427 if !ok { 428 // bring un-notified watcher to synced 429 s.synced.add(w) 430 s.unsynced.delete(w) 431 continue 432 } 433 434 if eb.moreRev != 0 { 435 w.minRev = eb.moreRev 436 } 437 438 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) { 439 pendingEventsGauge.Add(float64(len(eb.evs))) 440 } else { 441 if victims == nil { 442 victims = make(watcherBatch) 443 } 444 w.victim = true 445 } 446 447 if w.victim { 448 victims[w] = eb 449 } else { 450 if eb.moreRev != 0 { 451 // stay unsynced; more to read 452 continue 453 } 454 s.synced.add(w) 455 } 456 s.unsynced.delete(w) 457 } 458 s.addVictim(victims) 459 460 vsz := 0 461 for _, v := range s.victims { 462 vsz += len(v) 463 } 464 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) 465} 466 467// kvsToEvents gets all events for the watchers from all key-value pairs 468func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { 469 for i, v := range vals { 470 var kv mvccpb.KeyValue 471 if err := kv.Unmarshal(v); err != nil { 472 plog.Panicf("cannot unmarshal event: %v", err) 473 } 474 475 if !wg.contains(string(kv.Key)) { 476 continue 477 } 478 479 ty := mvccpb.PUT 480 if isTombstone(revs[i]) { 481 ty = mvccpb.DELETE 482 // patch in mod revision so watchers won't skip 483 kv.ModRevision = bytesToRev(revs[i]).main 484 } 485 evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty}) 486 } 487 return evs 488} 489 490// notify notifies the fact that given event at the given rev just happened to 491// watchers that watch on the key of the event. 492func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { 493 var victim watcherBatch 494 for w, eb := range newWatcherBatch(&s.synced, evs) { 495 if eb.revs != 1 { 496 plog.Panicf("unexpected multiple revisions in notification") 497 } 498 499 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { 500 pendingEventsGauge.Add(float64(len(eb.evs))) 501 } else { 502 // move slow watcher to victims 503 w.minRev = rev + 1 504 if victim == nil { 505 victim = make(watcherBatch) 506 } 507 w.victim = true 508 victim[w] = eb 509 s.synced.delete(w) 510 slowWatcherGauge.Inc() 511 } 512 } 513 s.addVictim(victim) 514} 515 516func (s *watchableStore) addVictim(victim watcherBatch) { 517 if victim == nil { 518 return 519 } 520 s.victims = append(s.victims, victim) 521 select { 522 case s.victimc <- struct{}{}: 523 default: 524 } 525} 526 527func (s *watchableStore) rev() int64 { return s.store.Rev() } 528 529func (s *watchableStore) progress(w *watcher) { 530 s.mu.Lock() 531 defer s.mu.Unlock() 532 533 if _, ok := s.synced.watchers[w]; ok { 534 w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) 535 // If the ch is full, this watcher is receiving events. 536 // We do not need to send progress at all. 537 } 538} 539 540type watcher struct { 541 // the watcher key 542 key []byte 543 // end indicates the end of the range to watch. 544 // If end is set, the watcher is on a range. 545 end []byte 546 547 // victim is set when ch is blocked and undergoing victim processing 548 victim bool 549 550 // compacted is set when the watcher is removed because of compaction 551 compacted bool 552 553 // restore is true when the watcher is being restored from leader snapshot 554 // which means that this watcher has just been moved from "synced" to "unsynced" 555 // watcher group, possibly with a future revision when it was first added 556 // to the synced watcher 557 // "unsynced" watcher revision must always be <= current revision, 558 // except when the watcher were to be moved from "synced" watcher group 559 restore bool 560 561 // minRev is the minimum revision update the watcher will accept 562 minRev int64 563 id WatchID 564 565 fcs []FilterFunc 566 // a chan to send out the watch response. 567 // The chan might be shared with other watchers. 568 ch chan<- WatchResponse 569} 570 571func (w *watcher) send(wr WatchResponse) bool { 572 progressEvent := len(wr.Events) == 0 573 574 if len(w.fcs) != 0 { 575 ne := make([]mvccpb.Event, 0, len(wr.Events)) 576 for i := range wr.Events { 577 filtered := false 578 for _, filter := range w.fcs { 579 if filter(wr.Events[i]) { 580 filtered = true 581 break 582 } 583 } 584 if !filtered { 585 ne = append(ne, wr.Events[i]) 586 } 587 } 588 wr.Events = ne 589 } 590 591 // if all events are filtered out, we should send nothing. 592 if !progressEvent && len(wr.Events) == 0 { 593 return true 594 } 595 select { 596 case w.ch <- wr: 597 return true 598 default: 599 return false 600 } 601} 602