1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package v2store 16 17import ( 18 "encoding/json" 19 "fmt" 20 "path" 21 "strconv" 22 "strings" 23 "sync" 24 "time" 25 26 "go.etcd.io/etcd/etcdserver/api/v2error" 27 "go.etcd.io/etcd/pkg/types" 28 29 "github.com/jonboulle/clockwork" 30) 31 32// The default version to set when the store is first initialized. 33const defaultVersion = 2 34 35var minExpireTime time.Time 36 37func init() { 38 minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") 39} 40 41type Store interface { 42 Version() int 43 Index() uint64 44 45 Get(nodePath string, recursive, sorted bool) (*Event, error) 46 Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) 47 Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) 48 Create(nodePath string, dir bool, value string, unique bool, 49 expireOpts TTLOptionSet) (*Event, error) 50 CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, 51 value string, expireOpts TTLOptionSet) (*Event, error) 52 Delete(nodePath string, dir, recursive bool) (*Event, error) 53 CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) 54 55 Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error) 56 57 Save() ([]byte, error) 58 Recovery(state []byte) error 59 60 Clone() Store 61 SaveNoCopy() ([]byte, error) 62 63 JsonStats() []byte 64 DeleteExpiredKeys(cutoff time.Time) 65 66 HasTTLKeys() bool 67} 68 69type TTLOptionSet struct { 70 ExpireTime time.Time 71 Refresh bool 72} 73 74type store struct { 75 Root *node 76 WatcherHub *watcherHub 77 CurrentIndex uint64 78 Stats *Stats 79 CurrentVersion int 80 ttlKeyHeap *ttlKeyHeap // need to recovery manually 81 worldLock sync.RWMutex // stop the world lock 82 clock clockwork.Clock 83 readonlySet types.Set 84} 85 86// New creates a store where the given namespaces will be created as initial directories. 87func New(namespaces ...string) Store { 88 s := newStore(namespaces...) 89 s.clock = clockwork.NewRealClock() 90 return s 91} 92 93func newStore(namespaces ...string) *store { 94 s := new(store) 95 s.CurrentVersion = defaultVersion 96 s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent) 97 for _, namespace := range namespaces { 98 s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent)) 99 } 100 s.Stats = newStats() 101 s.WatcherHub = newWatchHub(1000) 102 s.ttlKeyHeap = newTtlKeyHeap() 103 s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...) 104 return s 105} 106 107// Version retrieves current version of the store. 108func (s *store) Version() int { 109 return s.CurrentVersion 110} 111 112// Index retrieves the current index of the store. 113func (s *store) Index() uint64 { 114 s.worldLock.RLock() 115 defer s.worldLock.RUnlock() 116 return s.CurrentIndex 117} 118 119// Get returns a get event. 120// If recursive is true, it will return all the content under the node path. 121// If sorted is true, it will sort the content by keys. 122func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { 123 var err *v2error.Error 124 125 s.worldLock.RLock() 126 defer s.worldLock.RUnlock() 127 128 defer func() { 129 if err == nil { 130 s.Stats.Inc(GetSuccess) 131 if recursive { 132 reportReadSuccess(GetRecursive) 133 } else { 134 reportReadSuccess(Get) 135 } 136 return 137 } 138 139 s.Stats.Inc(GetFail) 140 if recursive { 141 reportReadFailure(GetRecursive) 142 } else { 143 reportReadFailure(Get) 144 } 145 }() 146 147 n, err := s.internalGet(nodePath) 148 if err != nil { 149 return nil, err 150 } 151 152 e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex) 153 e.EtcdIndex = s.CurrentIndex 154 e.Node.loadInternalNode(n, recursive, sorted, s.clock) 155 156 return e, nil 157} 158 159// Create creates the node at nodePath. Create will help to create intermediate directories with no ttl. 160// If the node has already existed, create will fail. 161// If any node on the path is a file, create will fail. 162func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) { 163 var err *v2error.Error 164 165 s.worldLock.Lock() 166 defer s.worldLock.Unlock() 167 168 defer func() { 169 if err == nil { 170 s.Stats.Inc(CreateSuccess) 171 reportWriteSuccess(Create) 172 return 173 } 174 175 s.Stats.Inc(CreateFail) 176 reportWriteFailure(Create) 177 }() 178 179 e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create) 180 if err != nil { 181 return nil, err 182 } 183 184 e.EtcdIndex = s.CurrentIndex 185 s.WatcherHub.notify(e) 186 187 return e, nil 188} 189 190// Set creates or replace the node at nodePath. 191func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) { 192 var err *v2error.Error 193 194 s.worldLock.Lock() 195 defer s.worldLock.Unlock() 196 197 defer func() { 198 if err == nil { 199 s.Stats.Inc(SetSuccess) 200 reportWriteSuccess(Set) 201 return 202 } 203 204 s.Stats.Inc(SetFail) 205 reportWriteFailure(Set) 206 }() 207 208 // Get prevNode value 209 n, getErr := s.internalGet(nodePath) 210 if getErr != nil && getErr.ErrorCode != v2error.EcodeKeyNotFound { 211 err = getErr 212 return nil, err 213 } 214 215 if expireOpts.Refresh { 216 if getErr != nil { 217 err = getErr 218 return nil, err 219 } 220 value = n.Value 221 } 222 223 // Set new value 224 e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set) 225 if err != nil { 226 return nil, err 227 } 228 e.EtcdIndex = s.CurrentIndex 229 230 // Put prevNode into event 231 if getErr == nil { 232 prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex) 233 prev.Node.loadInternalNode(n, false, false, s.clock) 234 e.PrevNode = prev.Node 235 } 236 237 if !expireOpts.Refresh { 238 s.WatcherHub.notify(e) 239 } else { 240 e.SetRefresh() 241 s.WatcherHub.add(e) 242 } 243 244 return e, nil 245} 246 247// returns user-readable cause of failed comparison 248func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string { 249 switch which { 250 case CompareIndexNotMatch: 251 return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex) 252 case CompareValueNotMatch: 253 return fmt.Sprintf("[%v != %v]", prevValue, n.Value) 254 default: 255 return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex) 256 } 257} 258 259func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64, 260 value string, expireOpts TTLOptionSet) (*Event, error) { 261 262 var err *v2error.Error 263 264 s.worldLock.Lock() 265 defer s.worldLock.Unlock() 266 267 defer func() { 268 if err == nil { 269 s.Stats.Inc(CompareAndSwapSuccess) 270 reportWriteSuccess(CompareAndSwap) 271 return 272 } 273 274 s.Stats.Inc(CompareAndSwapFail) 275 reportWriteFailure(CompareAndSwap) 276 }() 277 278 nodePath = path.Clean(path.Join("/", nodePath)) 279 // we do not allow the user to change "/" 280 if s.readonlySet.Contains(nodePath) { 281 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex) 282 } 283 284 n, err := s.internalGet(nodePath) 285 if err != nil { 286 return nil, err 287 } 288 if n.IsDir() { // can only compare and swap file 289 err = v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex) 290 return nil, err 291 } 292 293 // If both of the prevValue and prevIndex are given, we will test both of them. 294 // Command will be executed, only if both of the tests are successful. 295 if ok, which := n.Compare(prevValue, prevIndex); !ok { 296 cause := getCompareFailCause(n, which, prevValue, prevIndex) 297 err = v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex) 298 return nil, err 299 } 300 301 if expireOpts.Refresh { 302 value = n.Value 303 } 304 305 // update etcd index 306 s.CurrentIndex++ 307 308 e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex) 309 e.EtcdIndex = s.CurrentIndex 310 e.PrevNode = n.Repr(false, false, s.clock) 311 eNode := e.Node 312 313 // if test succeed, write the value 314 n.Write(value, s.CurrentIndex) 315 n.UpdateTTL(expireOpts.ExpireTime) 316 317 // copy the value for safety 318 valueCopy := value 319 eNode.Value = &valueCopy 320 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) 321 322 if !expireOpts.Refresh { 323 s.WatcherHub.notify(e) 324 } else { 325 e.SetRefresh() 326 s.WatcherHub.add(e) 327 } 328 329 return e, nil 330} 331 332// Delete deletes the node at the given path. 333// If the node is a directory, recursive must be true to delete it. 334func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { 335 var err *v2error.Error 336 337 s.worldLock.Lock() 338 defer s.worldLock.Unlock() 339 340 defer func() { 341 if err == nil { 342 s.Stats.Inc(DeleteSuccess) 343 reportWriteSuccess(Delete) 344 return 345 } 346 347 s.Stats.Inc(DeleteFail) 348 reportWriteFailure(Delete) 349 }() 350 351 nodePath = path.Clean(path.Join("/", nodePath)) 352 // we do not allow the user to change "/" 353 if s.readonlySet.Contains(nodePath) { 354 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex) 355 } 356 357 // recursive implies dir 358 if recursive { 359 dir = true 360 } 361 362 n, err := s.internalGet(nodePath) 363 if err != nil { // if the node does not exist, return error 364 return nil, err 365 } 366 367 nextIndex := s.CurrentIndex + 1 368 e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex) 369 e.EtcdIndex = nextIndex 370 e.PrevNode = n.Repr(false, false, s.clock) 371 eNode := e.Node 372 373 if n.IsDir() { 374 eNode.Dir = true 375 } 376 377 callback := func(path string) { // notify function 378 // notify the watchers with deleted set true 379 s.WatcherHub.notifyWatchers(e, path, true) 380 } 381 382 err = n.Remove(dir, recursive, callback) 383 if err != nil { 384 return nil, err 385 } 386 387 // update etcd index 388 s.CurrentIndex++ 389 390 s.WatcherHub.notify(e) 391 392 return e, nil 393} 394 395func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) { 396 var err *v2error.Error 397 398 s.worldLock.Lock() 399 defer s.worldLock.Unlock() 400 401 defer func() { 402 if err == nil { 403 s.Stats.Inc(CompareAndDeleteSuccess) 404 reportWriteSuccess(CompareAndDelete) 405 return 406 } 407 408 s.Stats.Inc(CompareAndDeleteFail) 409 reportWriteFailure(CompareAndDelete) 410 }() 411 412 nodePath = path.Clean(path.Join("/", nodePath)) 413 414 n, err := s.internalGet(nodePath) 415 if err != nil { // if the node does not exist, return error 416 return nil, err 417 } 418 if n.IsDir() { // can only compare and delete file 419 return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex) 420 } 421 422 // If both of the prevValue and prevIndex are given, we will test both of them. 423 // Command will be executed, only if both of the tests are successful. 424 if ok, which := n.Compare(prevValue, prevIndex); !ok { 425 cause := getCompareFailCause(n, which, prevValue, prevIndex) 426 return nil, v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex) 427 } 428 429 // update etcd index 430 s.CurrentIndex++ 431 432 e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex) 433 e.EtcdIndex = s.CurrentIndex 434 e.PrevNode = n.Repr(false, false, s.clock) 435 436 callback := func(path string) { // notify function 437 // notify the watchers with deleted set true 438 s.WatcherHub.notifyWatchers(e, path, true) 439 } 440 441 err = n.Remove(false, false, callback) 442 if err != nil { 443 return nil, err 444 } 445 446 s.WatcherHub.notify(e) 447 448 return e, nil 449} 450 451func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) { 452 s.worldLock.RLock() 453 defer s.worldLock.RUnlock() 454 455 key = path.Clean(path.Join("/", key)) 456 if sinceIndex == 0 { 457 sinceIndex = s.CurrentIndex + 1 458 } 459 // WatcherHub does not know about the current index, so we need to pass it in 460 w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex) 461 if err != nil { 462 return nil, err 463 } 464 465 return w, nil 466} 467 468// walk walks all the nodePath and apply the walkFunc on each directory 469func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) { 470 components := strings.Split(nodePath, "/") 471 472 curr := s.Root 473 var err *v2error.Error 474 475 for i := 1; i < len(components); i++ { 476 if len(components[i]) == 0 { // ignore empty string 477 return curr, nil 478 } 479 480 curr, err = walkFunc(curr, components[i]) 481 if err != nil { 482 return nil, err 483 } 484 } 485 486 return curr, nil 487} 488 489// Update updates the value/ttl of the node. 490// If the node is a file, the value and the ttl can be updated. 491// If the node is a directory, only the ttl can be updated. 492func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) { 493 var err *v2error.Error 494 495 s.worldLock.Lock() 496 defer s.worldLock.Unlock() 497 498 defer func() { 499 if err == nil { 500 s.Stats.Inc(UpdateSuccess) 501 reportWriteSuccess(Update) 502 return 503 } 504 505 s.Stats.Inc(UpdateFail) 506 reportWriteFailure(Update) 507 }() 508 509 nodePath = path.Clean(path.Join("/", nodePath)) 510 // we do not allow the user to change "/" 511 if s.readonlySet.Contains(nodePath) { 512 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex) 513 } 514 515 currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1 516 517 n, err := s.internalGet(nodePath) 518 if err != nil { // if the node does not exist, return error 519 return nil, err 520 } 521 if n.IsDir() && len(newValue) != 0 { 522 // if the node is a directory, we cannot update value to non-empty 523 return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex) 524 } 525 526 if expireOpts.Refresh { 527 newValue = n.Value 528 } 529 530 e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex) 531 e.EtcdIndex = nextIndex 532 e.PrevNode = n.Repr(false, false, s.clock) 533 eNode := e.Node 534 535 n.Write(newValue, nextIndex) 536 537 if n.IsDir() { 538 eNode.Dir = true 539 } else { 540 // copy the value for safety 541 newValueCopy := newValue 542 eNode.Value = &newValueCopy 543 } 544 545 // update ttl 546 n.UpdateTTL(expireOpts.ExpireTime) 547 548 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) 549 550 if !expireOpts.Refresh { 551 s.WatcherHub.notify(e) 552 } else { 553 e.SetRefresh() 554 s.WatcherHub.add(e) 555 } 556 557 s.CurrentIndex = nextIndex 558 559 return e, nil 560} 561 562func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool, 563 expireTime time.Time, action string) (*Event, *v2error.Error) { 564 565 currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1 566 567 if unique { // append unique item under the node path 568 nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10)) 569 } 570 571 nodePath = path.Clean(path.Join("/", nodePath)) 572 573 // we do not allow the user to change "/" 574 if s.readonlySet.Contains(nodePath) { 575 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex) 576 } 577 578 // Assume expire times that are way in the past are 579 // This can occur when the time is serialized to JS 580 if expireTime.Before(minExpireTime) { 581 expireTime = Permanent 582 } 583 584 dirName, nodeName := path.Split(nodePath) 585 586 // walk through the nodePath, create dirs and get the last directory node 587 d, err := s.walk(dirName, s.checkDir) 588 589 if err != nil { 590 s.Stats.Inc(SetFail) 591 reportWriteFailure(action) 592 err.Index = currIndex 593 return nil, err 594 } 595 596 e := newEvent(action, nodePath, nextIndex, nextIndex) 597 eNode := e.Node 598 599 n, _ := d.GetChild(nodeName) 600 601 // force will try to replace an existing file 602 if n != nil { 603 if replace { 604 if n.IsDir() { 605 return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex) 606 } 607 e.PrevNode = n.Repr(false, false, s.clock) 608 609 n.Remove(false, false, nil) 610 } else { 611 return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex) 612 } 613 } 614 615 if !dir { // create file 616 // copy the value for safety 617 valueCopy := value 618 eNode.Value = &valueCopy 619 620 n = newKV(s, nodePath, value, nextIndex, d, expireTime) 621 622 } else { // create directory 623 eNode.Dir = true 624 625 n = newDir(s, nodePath, nextIndex, d, expireTime) 626 } 627 628 // we are sure d is a directory and does not have the children with name n.Name 629 d.Add(n) 630 631 // node with TTL 632 if !n.IsPermanent() { 633 s.ttlKeyHeap.push(n) 634 635 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock) 636 } 637 638 s.CurrentIndex = nextIndex 639 640 return e, nil 641} 642 643// InternalGet gets the node of the given nodePath. 644func (s *store) internalGet(nodePath string) (*node, *v2error.Error) { 645 nodePath = path.Clean(path.Join("/", nodePath)) 646 647 walkFunc := func(parent *node, name string) (*node, *v2error.Error) { 648 649 if !parent.IsDir() { 650 err := v2error.NewError(v2error.EcodeNotDir, parent.Path, s.CurrentIndex) 651 return nil, err 652 } 653 654 child, ok := parent.Children[name] 655 if ok { 656 return child, nil 657 } 658 659 return nil, v2error.NewError(v2error.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex) 660 } 661 662 f, err := s.walk(nodePath, walkFunc) 663 664 if err != nil { 665 return nil, err 666 } 667 return f, nil 668} 669 670// DeleteExpiredKeys will delete all expired keys 671func (s *store) DeleteExpiredKeys(cutoff time.Time) { 672 s.worldLock.Lock() 673 defer s.worldLock.Unlock() 674 675 for { 676 node := s.ttlKeyHeap.top() 677 if node == nil || node.ExpireTime.After(cutoff) { 678 break 679 } 680 681 s.CurrentIndex++ 682 e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex) 683 e.EtcdIndex = s.CurrentIndex 684 e.PrevNode = node.Repr(false, false, s.clock) 685 if node.IsDir() { 686 e.Node.Dir = true 687 } 688 689 callback := func(path string) { // notify function 690 // notify the watchers with deleted set true 691 s.WatcherHub.notifyWatchers(e, path, true) 692 } 693 694 s.ttlKeyHeap.pop() 695 node.Remove(true, true, callback) 696 697 reportExpiredKey() 698 s.Stats.Inc(ExpireCount) 699 700 s.WatcherHub.notify(e) 701 } 702 703} 704 705// checkDir will check whether the component is a directory under parent node. 706// If it is a directory, this function will return the pointer to that node. 707// If it does not exist, this function will create a new directory and return the pointer to that node. 708// If it is a file, this function will return error. 709func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) { 710 node, ok := parent.Children[dirName] 711 712 if ok { 713 if node.IsDir() { 714 return node, nil 715 } 716 717 return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex) 718 } 719 720 n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent) 721 722 parent.Children[dirName] = n 723 724 return n, nil 725} 726 727// Save saves the static state of the store system. 728// It will not be able to save the state of watchers. 729// It will not save the parent field of the node. Or there will 730// be cyclic dependencies issue for the json package. 731func (s *store) Save() ([]byte, error) { 732 b, err := json.Marshal(s.Clone()) 733 if err != nil { 734 return nil, err 735 } 736 737 return b, nil 738} 739 740func (s *store) SaveNoCopy() ([]byte, error) { 741 b, err := json.Marshal(s) 742 if err != nil { 743 return nil, err 744 } 745 746 return b, nil 747} 748 749func (s *store) Clone() Store { 750 s.worldLock.Lock() 751 752 clonedStore := newStore() 753 clonedStore.CurrentIndex = s.CurrentIndex 754 clonedStore.Root = s.Root.Clone() 755 clonedStore.WatcherHub = s.WatcherHub.clone() 756 clonedStore.Stats = s.Stats.clone() 757 clonedStore.CurrentVersion = s.CurrentVersion 758 759 s.worldLock.Unlock() 760 return clonedStore 761} 762 763// Recovery recovers the store system from a static state 764// It needs to recover the parent field of the nodes. 765// It needs to delete the expired nodes since the saved time and also 766// needs to create monitoring go routines. 767func (s *store) Recovery(state []byte) error { 768 s.worldLock.Lock() 769 defer s.worldLock.Unlock() 770 err := json.Unmarshal(state, s) 771 772 if err != nil { 773 return err 774 } 775 776 s.ttlKeyHeap = newTtlKeyHeap() 777 778 s.Root.recoverAndclean() 779 return nil 780} 781 782func (s *store) JsonStats() []byte { 783 s.Stats.Watchers = uint64(s.WatcherHub.count) 784 return s.Stats.toJson() 785} 786 787func (s *store) HasTTLKeys() bool { 788 s.worldLock.RLock() 789 defer s.worldLock.RUnlock() 790 return s.ttlKeyHeap.Len() != 0 791} 792