1package local 2 3import ( 4 "fmt" 5 "reflect" 6 "strconv" 7 "strings" 8 "sync" 9 "sync/atomic" 10 "time" 11 12 "github.com/armon/go-metrics" 13 "github.com/armon/go-metrics/prometheus" 14 "github.com/hashicorp/go-hclog" 15 16 "github.com/hashicorp/consul/acl" 17 "github.com/hashicorp/consul/agent/structs" 18 "github.com/hashicorp/consul/agent/token" 19 "github.com/hashicorp/consul/api" 20 "github.com/hashicorp/consul/lib" 21 "github.com/hashicorp/consul/types" 22) 23 24var StateCounters = []prometheus.CounterDefinition{ 25 { 26 Name: []string{"acl", "blocked", "service", "registration"}, 27 Help: "Increments whenever a registration fails for a service (blocked by an ACL)", 28 }, 29 { 30 Name: []string{"acl", "blocked", "service", "deregistration"}, 31 Help: "Increments whenever a deregistration fails for a service (blocked by an ACL)", 32 }, 33 { 34 Name: []string{"acl", "blocked", "check", "registration"}, 35 Help: "Increments whenever a registration fails for a check (blocked by an ACL)", 36 }, 37 { 38 Name: []string{"acl", "blocked", "check", "deregistration"}, 39 Help: "Increments whenever a deregistration fails for a check (blocked by an ACL)", 40 }, 41 { 42 Name: []string{"acl", "blocked", "node", "registration"}, 43 Help: "Increments whenever a registration fails for a node (blocked by an ACL)", 44 }, 45 { 46 Name: []string{"acl", "blocked", "node", "deregistration"}, 47 Help: "Increments whenever a deregistration fails for a node (blocked by an ACL)", 48 }, 49} 50 51const fullSyncReadMaxStale = 2 * time.Second 52 53// Config is the configuration for the State. 54type Config struct { 55 AdvertiseAddr string 56 CheckUpdateInterval time.Duration 57 Datacenter string 58 DiscardCheckOutput bool 59 NodeID types.NodeID 60 NodeName string 61 TaggedAddresses map[string]string 62} 63 64// ServiceState describes the state of a service record. 65type ServiceState struct { 66 // Service is the local copy of the service record. 67 Service *structs.NodeService 68 69 // Token is the ACL to update or delete the service record on the 70 // server. 71 Token string 72 73 // InSync contains whether the local state of the service record 74 // is in sync with the remote state on the server. 75 InSync bool 76 77 // Deleted is true when the service record has been marked as deleted 78 // but has not been removed on the server yet. 79 Deleted bool 80 81 // WatchCh is closed when the service state changes. Suitable for use in a 82 // memdb.WatchSet when watching agent local changes with hash-based blocking. 83 WatchCh chan struct{} 84} 85 86// Clone returns a shallow copy of the object. The service record still points 87// to the original service record and must not be modified. The WatchCh is also 88// still pointing to the original so the clone will be update when the original 89// is. 90func (s *ServiceState) Clone() *ServiceState { 91 s2 := new(ServiceState) 92 *s2 = *s 93 return s2 94} 95 96// CheckState describes the state of a health check record. 97type CheckState struct { 98 // Check is the local copy of the health check record. 99 // 100 // Must Clone() the overall CheckState before mutating this. After mutation 101 // reinstall into the checks map. If Deleted is true, this field can be nil. 102 Check *structs.HealthCheck 103 104 // Token is the ACL record to update or delete the health check 105 // record on the server. 106 Token string 107 108 // CriticalTime is the last time the health check status went 109 // from non-critical to critical. When the health check is not 110 // in critical state the value is the zero value. 111 CriticalTime time.Time 112 113 // DeferCheck is used to delay the sync of a health check when 114 // only the output has changed. This rate limits changes which 115 // do not affect the state of the node and/or service. 116 DeferCheck *time.Timer 117 118 // InSync contains whether the local state of the health check 119 // record is in sync with the remote state on the server. 120 InSync bool 121 122 // Deleted is true when the health check record has been marked as 123 // deleted but has not been removed on the server yet. 124 Deleted bool 125} 126 127// Clone returns a shallow copy of the object. 128// 129// The defer timer still points to the original value and must not be modified. 130func (c *CheckState) Clone() *CheckState { 131 c2 := new(CheckState) 132 *c2 = *c 133 if c.Check != nil { 134 c2.Check = c.Check.Clone() 135 } 136 return c2 137} 138 139// Critical returns true when the health check is in critical state. 140func (c *CheckState) Critical() bool { 141 return !c.CriticalTime.IsZero() 142} 143 144// CriticalFor returns the amount of time the service has been in critical 145// state. Its value is undefined when the service is not in critical state. 146func (c *CheckState) CriticalFor() time.Duration { 147 return time.Since(c.CriticalTime) 148} 149 150type rpc interface { 151 RPC(method string, args interface{}, reply interface{}) error 152 ResolveTokenToIdentity(secretID string) (structs.ACLIdentity, error) 153} 154 155// State is used to represent the node's services, 156// and checks. We use it to perform anti-entropy with the 157// catalog representation 158type State struct { 159 sync.RWMutex 160 161 // Delegate the RPC interface to the consul server or agent. 162 // 163 // It is set after both the state and the consul server/agent have 164 // been created. 165 Delegate rpc 166 167 // TriggerSyncChanges is used to notify the state syncer that a 168 // partial sync should be performed. 169 // 170 // It is set after both the state and the state syncer have been 171 // created. 172 TriggerSyncChanges func() 173 174 logger hclog.Logger 175 176 // Config is the agent config 177 config Config 178 179 // nodeInfoInSync tracks whether the server has our correct top-level 180 // node information in sync 181 nodeInfoInSync bool 182 183 // Services tracks the local services 184 services map[structs.ServiceID]*ServiceState 185 186 // Checks tracks the local checks. checkAliases are aliased checks. 187 checks map[structs.CheckID]*CheckState 188 checkAliases map[structs.ServiceID]map[structs.CheckID]chan<- struct{} 189 190 // metadata tracks the node metadata fields 191 metadata map[string]string 192 193 // discardCheckOutput stores whether the output of health checks 194 // is stored in the raft log. 195 discardCheckOutput atomic.Value // bool 196 197 // tokens contains the ACL tokens 198 tokens *token.Store 199 200 // notifyHandlers is a map of registered channel listeners that are sent 201 // messages whenever state changes occur. For now these events only include 202 // service registration and deregistration since that is all that is needed 203 // but the same mechanism could be used for other state changes. Any 204 // future notifications should re-use this mechanism. 205 notifyHandlers map[chan<- struct{}]struct{} 206} 207 208// NewState creates a new local state for the agent. 209func NewState(c Config, logger hclog.Logger, tokens *token.Store) *State { 210 l := &State{ 211 config: c, 212 logger: logger, 213 services: make(map[structs.ServiceID]*ServiceState), 214 checks: make(map[structs.CheckID]*CheckState), 215 checkAliases: make(map[structs.ServiceID]map[structs.CheckID]chan<- struct{}), 216 metadata: make(map[string]string), 217 tokens: tokens, 218 notifyHandlers: make(map[chan<- struct{}]struct{}), 219 } 220 l.SetDiscardCheckOutput(c.DiscardCheckOutput) 221 return l 222} 223 224// SetDiscardCheckOutput configures whether the check output 225// is discarded. This can be changed at runtime. 226func (l *State) SetDiscardCheckOutput(b bool) { 227 l.discardCheckOutput.Store(b) 228} 229 230// ServiceToken returns the ACL token associated with the service. If the service is 231// not found, or does not have a token, the empty string is returned. 232func (l *State) ServiceToken(id structs.ServiceID) string { 233 l.RLock() 234 defer l.RUnlock() 235 if s := l.services[id]; s != nil { 236 return s.Token 237 } 238 return "" 239} 240 241// aclTokenForServiceSync returns an ACL token associated with a service. If there is 242// no ACL token associated with the service, fallback is used to return a value. 243// This method is not synchronized and the lock must already be held. 244func (l *State) aclTokenForServiceSync(id structs.ServiceID, fallback func() string) string { 245 if s := l.services[id]; s != nil && s.Token != "" { 246 return s.Token 247 } 248 return fallback() 249} 250 251// AddService is used to add a service entry to the local state. 252// This entry is persistent and the agent will make a best effort to 253// ensure it is registered 254func (l *State) AddService(service *structs.NodeService, token string) error { 255 l.Lock() 256 defer l.Unlock() 257 return l.addServiceLocked(service, token) 258} 259 260func (l *State) addServiceLocked(service *structs.NodeService, token string) error { 261 if service == nil { 262 return fmt.Errorf("no service") 263 } 264 265 // use the service name as id if the id was omitted 266 if service.ID == "" { 267 service.ID = service.Service 268 } 269 270 l.setServiceStateLocked(&ServiceState{ 271 Service: service, 272 Token: token, 273 }) 274 return nil 275} 276 277// AddServiceWithChecks adds a service and its check tp the local state atomically 278func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error { 279 l.Lock() 280 defer l.Unlock() 281 282 if err := l.addServiceLocked(service, token); err != nil { 283 return err 284 } 285 286 for _, check := range checks { 287 if err := l.addCheckLocked(check, token); err != nil { 288 return err 289 } 290 } 291 return nil 292} 293 294// RemoveService is used to remove a service entry from the local state. 295// The agent will make a best effort to ensure it is deregistered. 296func (l *State) RemoveService(id structs.ServiceID) error { 297 l.Lock() 298 defer l.Unlock() 299 return l.removeServiceLocked(id) 300} 301 302// RemoveServiceWithChecks removes a service and its check from the local state atomically 303func (l *State) RemoveServiceWithChecks(serviceID structs.ServiceID, checkIDs []structs.CheckID) error { 304 l.Lock() 305 defer l.Unlock() 306 307 if err := l.removeServiceLocked(serviceID); err != nil { 308 return err 309 } 310 311 for _, id := range checkIDs { 312 if err := l.removeCheckLocked(id); err != nil { 313 return err 314 } 315 } 316 317 return nil 318} 319 320func (l *State) removeServiceLocked(id structs.ServiceID) error { 321 s := l.services[id] 322 if s == nil || s.Deleted { 323 return fmt.Errorf("Service %q does not exist", id) 324 } 325 326 // To remove the service on the server we need the token. 327 // Therefore, we mark the service as deleted and keep the 328 // entry around until it is actually removed. 329 s.InSync = false 330 s.Deleted = true 331 if s.WatchCh != nil { 332 close(s.WatchCh) 333 s.WatchCh = nil 334 } 335 336 l.notifyIfAliased(id) 337 l.TriggerSyncChanges() 338 l.broadcastUpdateLocked() 339 340 return nil 341} 342 343// Service returns the locally registered service that the 344// agent is aware of and are being kept in sync with the server 345func (l *State) Service(id structs.ServiceID) *structs.NodeService { 346 l.RLock() 347 defer l.RUnlock() 348 349 s := l.services[id] 350 if s == nil || s.Deleted { 351 return nil 352 } 353 return s.Service 354} 355 356// Services returns the locally registered services that the 357// agent is aware of and are being kept in sync with the server 358func (l *State) Services(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*structs.NodeService { 359 l.RLock() 360 defer l.RUnlock() 361 362 m := make(map[structs.ServiceID]*structs.NodeService) 363 for id, s := range l.services { 364 if s.Deleted { 365 continue 366 } 367 368 if !entMeta.Matches(&id.EnterpriseMeta) { 369 continue 370 } 371 m[id] = s.Service 372 } 373 return m 374} 375 376// ServiceState returns a shallow copy of the current service state record. The 377// service record still points to the original service record and must not be 378// modified. The WatchCh for the copy returned will also be closed when the 379// actual service state is changed. 380func (l *State) ServiceState(id structs.ServiceID) *ServiceState { 381 l.RLock() 382 defer l.RUnlock() 383 384 s := l.services[id] 385 if s == nil || s.Deleted { 386 return nil 387 } 388 return s.Clone() 389} 390 391// SetServiceState is used to overwrite a raw service state with the given 392// state. This method is safe to be called concurrently but should only be used 393// during testing. You should most likely call AddService instead. 394func (l *State) SetServiceState(s *ServiceState) { 395 l.Lock() 396 defer l.Unlock() 397 398 l.setServiceStateLocked(s) 399} 400 401func (l *State) setServiceStateLocked(s *ServiceState) { 402 key := s.Service.CompoundServiceID() 403 old, hasOld := l.services[key] 404 if hasOld { 405 s.InSync = s.Service.IsSame(old.Service) 406 } 407 l.services[key] = s 408 409 s.WatchCh = make(chan struct{}, 1) 410 if hasOld && old.WatchCh != nil { 411 close(old.WatchCh) 412 } 413 if !hasOld { 414 // The status of an alias check is updated if the alias service is added/removed 415 // Only try notify alias checks if service didn't already exist (!hasOld) 416 l.notifyIfAliased(key) 417 } 418 419 l.TriggerSyncChanges() 420 l.broadcastUpdateLocked() 421} 422 423// ServiceStates returns a shallow copy of all service state records. 424// The service record still points to the original service record and 425// must not be modified. 426func (l *State) ServiceStates(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*ServiceState { 427 l.RLock() 428 defer l.RUnlock() 429 430 m := make(map[structs.ServiceID]*ServiceState) 431 for id, s := range l.services { 432 if s.Deleted { 433 continue 434 } 435 if !entMeta.Matches(&id.EnterpriseMeta) { 436 continue 437 } 438 m[id] = s.Clone() 439 } 440 return m 441} 442 443// CheckToken returns the ACL token associated with the check. If the check is 444// not found, or does not have a token, the empty string is returned. 445func (l *State) CheckToken(id structs.CheckID) string { 446 l.RLock() 447 defer l.RUnlock() 448 if c := l.checks[id]; c != nil { 449 return c.Token 450 } 451 return "" 452} 453 454// aclTokenForCheckSync returns an ACL token associated with a check. If there is 455// no ACL token associated with the check, the callback is used to return a value. 456// This method is not synchronized and the lock must already be held. 457func (l *State) aclTokenForCheckSync(id structs.CheckID, fallback func() string) string { 458 if c := l.checks[id]; c != nil && c.Token != "" { 459 return c.Token 460 } 461 return fallback() 462} 463 464// AddCheck is used to add a health check to the local state. 465// This entry is persistent and the agent will make a best effort to 466// ensure it is registered 467func (l *State) AddCheck(check *structs.HealthCheck, token string) error { 468 l.Lock() 469 defer l.Unlock() 470 471 return l.addCheckLocked(check, token) 472} 473 474func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error { 475 if check == nil { 476 return fmt.Errorf("no check") 477 } 478 479 // clone the check since we will be modifying it. 480 check = check.Clone() 481 482 if l.discardCheckOutput.Load().(bool) { 483 check.Output = "" 484 } 485 486 // if there is a serviceID associated with the check, make sure it exists before adding it 487 // NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor 488 if _, ok := l.services[check.CompoundServiceID()]; check.ServiceID != "" && !ok { 489 return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID) 490 } 491 492 // hard-set the node name 493 check.Node = l.config.NodeName 494 495 l.setCheckStateLocked(&CheckState{ 496 Check: check, 497 Token: token, 498 }) 499 return nil 500} 501 502// AddAliasCheck creates an alias check. When any check for the srcServiceID is 503// changed, checkID will reflect that using the same semantics as 504// checks.CheckAlias. 505// 506// This is a local optimization so that the Alias check doesn't need to use 507// blocking queries against the remote server for check updates for local 508// services. 509func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID, notifyCh chan<- struct{}) error { 510 l.Lock() 511 defer l.Unlock() 512 513 m, ok := l.checkAliases[srcServiceID] 514 if !ok { 515 m = make(map[structs.CheckID]chan<- struct{}) 516 l.checkAliases[srcServiceID] = m 517 } 518 m[checkID] = notifyCh 519 520 return nil 521} 522 523// ServiceExists return true if the given service does exists 524func (l *State) ServiceExists(serviceID structs.ServiceID) bool { 525 serviceID.EnterpriseMeta.Normalize() 526 527 l.Lock() 528 defer l.Unlock() 529 return l.services[serviceID] != nil 530} 531 532// RemoveAliasCheck removes the mapping for the alias check. 533func (l *State) RemoveAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID) { 534 l.Lock() 535 defer l.Unlock() 536 537 if m, ok := l.checkAliases[srcServiceID]; ok { 538 delete(m, checkID) 539 if len(m) == 0 { 540 delete(l.checkAliases, srcServiceID) 541 } 542 } 543} 544 545// RemoveCheck is used to remove a health check from the local state. 546// The agent will make a best effort to ensure it is deregistered 547// todo(fs): RemoveService returns an error for a non-existent service. RemoveCheck should as well. 548// todo(fs): Check code that calls this to handle the error. 549func (l *State) RemoveCheck(id structs.CheckID) error { 550 l.Lock() 551 defer l.Unlock() 552 return l.removeCheckLocked(id) 553} 554 555func (l *State) removeCheckLocked(id structs.CheckID) error { 556 c := l.checks[id] 557 if c == nil || c.Deleted { 558 return fmt.Errorf("Check %q does not exist", id) 559 } 560 561 // If this is a check for an aliased service, then notify the waiters. 562 l.notifyIfAliased(c.Check.CompoundServiceID()) 563 564 // To remove the check on the server we need the token. 565 // Therefore, we mark the service as deleted and keep the 566 // entry around until it is actually removed. 567 c.InSync = false 568 c.Deleted = true 569 l.TriggerSyncChanges() 570 571 return nil 572} 573 574// UpdateCheck is used to update the status of a check 575func (l *State) UpdateCheck(id structs.CheckID, status, output string) { 576 l.Lock() 577 defer l.Unlock() 578 579 c := l.checks[id] 580 if c == nil || c.Deleted { 581 return 582 } 583 584 if l.discardCheckOutput.Load().(bool) { 585 output = "" 586 } 587 588 // Update the critical time tracking (this doesn't cause a server updates 589 // so we can always keep this up to date). 590 if status == api.HealthCritical { 591 if !c.Critical() { 592 c.CriticalTime = time.Now() 593 } 594 } else { 595 c.CriticalTime = time.Time{} 596 } 597 598 // Do nothing if update is idempotent 599 if c.Check.Status == status && c.Check.Output == output { 600 return 601 } 602 603 // Ensure we only mutate a copy of the check state and put the finalized 604 // version into the checks map when complete. 605 // 606 // Note that we are relying upon the earlier deferred mutex unlock to 607 // happen AFTER this defer. As per the Go spec this is true, but leaving 608 // this note here for the future in case of any refactorings which may not 609 // notice this relationship. 610 c = c.Clone() 611 defer func(c *CheckState) { 612 l.checks[id] = c 613 }(c) 614 615 // Defer a sync if the output has changed. This is an optimization around 616 // frequent updates of output. Instead, we update the output internally, 617 // and periodically do a write-back to the servers. If there is a status 618 // change we do the write immediately. 619 if l.config.CheckUpdateInterval > 0 && c.Check.Status == status { 620 c.Check.Output = output 621 if c.DeferCheck == nil { 622 d := l.config.CheckUpdateInterval 623 intv := time.Duration(uint64(d)/2) + lib.RandomStagger(d) 624 c.DeferCheck = time.AfterFunc(intv, func() { 625 l.Lock() 626 defer l.Unlock() 627 628 c := l.checks[id] 629 if c == nil { 630 return 631 } 632 c.DeferCheck = nil 633 if c.Deleted { 634 return 635 } 636 c.InSync = false 637 l.TriggerSyncChanges() 638 }) 639 } 640 return 641 } 642 643 // If this is a check for an aliased service, then notify the waiters. 644 l.notifyIfAliased(c.Check.CompoundServiceID()) 645 646 // Update status and mark out of sync 647 c.Check.Status = status 648 c.Check.Output = output 649 c.InSync = false 650 l.TriggerSyncChanges() 651} 652 653// Check returns the locally registered check that the 654// agent is aware of and are being kept in sync with the server 655func (l *State) Check(id structs.CheckID) *structs.HealthCheck { 656 l.RLock() 657 defer l.RUnlock() 658 659 c := l.checks[id] 660 if c == nil || c.Deleted { 661 return nil 662 } 663 return c.Check 664} 665 666// Checks returns the locally registered checks that the 667// agent is aware of and are being kept in sync with the server 668func (l *State) Checks(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck { 669 m := make(map[structs.CheckID]*structs.HealthCheck) 670 for id, c := range l.CheckStates(entMeta) { 671 m[id] = c.Check 672 } 673 return m 674} 675 676func (l *State) ChecksForService(serviceID structs.ServiceID, includeNodeChecks bool) map[structs.CheckID]*structs.HealthCheck { 677 m := make(map[structs.CheckID]*structs.HealthCheck) 678 679 l.RLock() 680 defer l.RUnlock() 681 682 for id, c := range l.checks { 683 if c.Deleted { 684 continue 685 } 686 687 if c.Check.ServiceID != "" { 688 sid := c.Check.CompoundServiceID() 689 if !serviceID.Matches(sid) { 690 continue 691 } 692 } else if !includeNodeChecks { 693 continue 694 } 695 696 m[id] = c.Check.Clone() 697 } 698 return m 699} 700 701// CheckState returns a shallow copy of the current health check state record. 702// 703// The defer timer still points to the original value and must not be modified. 704func (l *State) CheckState(id structs.CheckID) *CheckState { 705 l.RLock() 706 defer l.RUnlock() 707 708 c := l.checks[id] 709 if c == nil || c.Deleted { 710 return nil 711 } 712 return c.Clone() 713} 714 715// SetCheckState is used to overwrite a raw check state with the given 716// state. This method is safe to be called concurrently but should only be used 717// during testing. You should most likely call AddCheck instead. 718func (l *State) SetCheckState(c *CheckState) { 719 l.Lock() 720 defer l.Unlock() 721 722 l.setCheckStateLocked(c) 723} 724 725func (l *State) setCheckStateLocked(c *CheckState) { 726 id := c.Check.CompoundCheckID() 727 existing := l.checks[id] 728 if existing != nil { 729 c.InSync = c.Check.IsSame(existing.Check) 730 } 731 732 l.checks[id] = c 733 734 // If this is a check for an aliased service, then notify the waiters. 735 l.notifyIfAliased(c.Check.CompoundServiceID()) 736 737 l.TriggerSyncChanges() 738} 739 740// CheckStates returns a shallow copy of all health check state records. 741// The map contains a shallow copy of the current check states. 742// 743// The defer timers still point to the original values and must not be modified. 744func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState { 745 l.RLock() 746 defer l.RUnlock() 747 748 m := make(map[structs.CheckID]*CheckState) 749 for id, c := range l.checks { 750 if c.Deleted { 751 continue 752 } 753 if !entMeta.Matches(&id.EnterpriseMeta) { 754 continue 755 } 756 m[id] = c.Clone() 757 } 758 return m 759} 760 761// CriticalCheckStates returns the locally registered checks that the 762// agent is aware of and are being kept in sync with the server. 763// The map contains a shallow copy of the current check states. 764// 765// The defer timers still point to the original values and must not be modified. 766func (l *State) CriticalCheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState { 767 l.RLock() 768 defer l.RUnlock() 769 770 m := make(map[structs.CheckID]*CheckState) 771 for id, c := range l.checks { 772 if c.Deleted || !c.Critical() { 773 continue 774 } 775 if !entMeta.Matches(&id.EnterpriseMeta) { 776 continue 777 } 778 m[id] = c.Clone() 779 } 780 return m 781} 782 783// broadcastUpdateLocked assumes l is locked and delivers an update to all 784// registered watchers. 785func (l *State) broadcastUpdateLocked() { 786 for ch := range l.notifyHandlers { 787 // Do not block 788 select { 789 case ch <- struct{}{}: 790 default: 791 } 792 } 793} 794 795// Notify will register a channel to receive messages when the local state 796// changes. Only service add/remove are supported for now. See notes on 797// l.notifyHandlers for more details. 798// 799// This will not block on channel send so ensure the channel has a buffer. Note 800// that any buffer size is generally fine since actual data is not sent over the 801// channel, so a dropped send due to a full buffer does not result in any loss 802// of data. The fact that a buffer already contains a notification means that 803// the receiver will still be notified that changes occurred. 804func (l *State) Notify(ch chan<- struct{}) { 805 l.Lock() 806 defer l.Unlock() 807 l.notifyHandlers[ch] = struct{}{} 808} 809 810// StopNotify will deregister a channel receiving state change notifications. 811// Pair this with all calls to Notify to clean up state. 812func (l *State) StopNotify(ch chan<- struct{}) { 813 l.Lock() 814 defer l.Unlock() 815 delete(l.notifyHandlers, ch) 816} 817 818// Metadata returns the local node metadata fields that the 819// agent is aware of and are being kept in sync with the server 820func (l *State) Metadata() map[string]string { 821 l.RLock() 822 defer l.RUnlock() 823 824 m := make(map[string]string) 825 for k, v := range l.metadata { 826 m[k] = v 827 } 828 return m 829} 830 831// LoadMetadata loads node metadata fields from the agent config and 832// updates them on the local agent. 833func (l *State) LoadMetadata(data map[string]string) error { 834 l.Lock() 835 defer l.Unlock() 836 837 for k, v := range data { 838 l.metadata[k] = v 839 } 840 l.TriggerSyncChanges() 841 return nil 842} 843 844// UnloadMetadata resets the local metadata state 845func (l *State) UnloadMetadata() { 846 l.Lock() 847 defer l.Unlock() 848 l.metadata = make(map[string]string) 849} 850 851// Stats is used to get various debugging state from the sub-systems 852func (l *State) Stats() map[string]string { 853 l.RLock() 854 defer l.RUnlock() 855 856 services := 0 857 for _, s := range l.services { 858 if s.Deleted { 859 continue 860 } 861 services++ 862 } 863 864 checks := 0 865 for _, c := range l.checks { 866 if c.Deleted { 867 continue 868 } 869 checks++ 870 } 871 872 return map[string]string{ 873 "services": strconv.Itoa(services), 874 "checks": strconv.Itoa(checks), 875 } 876} 877 878// updateSyncState queries the server for all the services and checks in the catalog 879// registered to this node, and updates the local entries as InSync or Deleted. 880func (l *State) updateSyncState() error { 881 // Get all checks and services from the master 882 req := structs.NodeSpecificRequest{ 883 Datacenter: l.config.Datacenter, 884 Node: l.config.NodeName, 885 QueryOptions: structs.QueryOptions{ 886 Token: l.tokens.AgentToken(), 887 AllowStale: true, 888 MaxStaleDuration: fullSyncReadMaxStale, 889 }, 890 EnterpriseMeta: *structs.WildcardEnterpriseMeta(), 891 } 892 893 var out1 structs.IndexedNodeServiceList 894 remoteServices := make(map[structs.ServiceID]*structs.NodeService) 895 var svcNode *structs.Node 896 897 if err := l.Delegate.RPC("Catalog.NodeServiceList", &req, &out1); err == nil { 898 for _, svc := range out1.NodeServices.Services { 899 remoteServices[svc.CompoundServiceID()] = svc 900 } 901 902 svcNode = out1.NodeServices.Node 903 } else if errMsg := err.Error(); strings.Contains(errMsg, "rpc: can't find method") { 904 // fallback to the old RPC 905 var out1 structs.IndexedNodeServices 906 if err := l.Delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { 907 return err 908 } 909 910 if out1.NodeServices != nil { 911 for _, svc := range out1.NodeServices.Services { 912 remoteServices[svc.CompoundServiceID()] = svc 913 } 914 915 svcNode = out1.NodeServices.Node 916 } 917 } else { 918 return err 919 } 920 921 var out2 structs.IndexedHealthChecks 922 if err := l.Delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { 923 return err 924 } 925 926 remoteChecks := make(map[structs.CheckID]*structs.HealthCheck, len(out2.HealthChecks)) 927 for _, rc := range out2.HealthChecks { 928 remoteChecks[rc.CompoundCheckID()] = rc 929 } 930 931 // Traverse all checks, services and the node info to determine 932 // which entries need to be updated on or removed from the server 933 934 l.Lock() 935 defer l.Unlock() 936 937 // Check if node info needs syncing 938 if svcNode == nil || svcNode.ID != l.config.NodeID || 939 !reflect.DeepEqual(svcNode.TaggedAddresses, l.config.TaggedAddresses) || 940 !reflect.DeepEqual(svcNode.Meta, l.metadata) { 941 l.nodeInfoInSync = false 942 } 943 // Check which services need syncing 944 945 // Look for local services that do not exist remotely and mark them for 946 // syncing so that they will be pushed to the server later 947 for id, s := range l.services { 948 if remoteServices[id] == nil { 949 s.InSync = false 950 } 951 } 952 953 // Traverse the list of services from the server. 954 // Remote services which do not exist locally have been deregistered. 955 // Otherwise, check whether the two definitions are still in sync. 956 for id, rs := range remoteServices { 957 ls := l.services[id] 958 if ls == nil { 959 // The consul service is managed automatically and does 960 // not need to be deregistered 961 if id == structs.ConsulCompoundServiceID { 962 continue 963 } 964 965 // Mark a remote service that does not exist locally as deleted so 966 // that it will be removed on the server later. 967 l.services[id] = &ServiceState{Deleted: true} 968 continue 969 } 970 971 // If the service is already scheduled for removal skip it 972 if ls.Deleted { 973 continue 974 } 975 976 // If our definition is different, we need to update it. Make a 977 // copy so that we don't retain a pointer to any actual state 978 // store info for in-memory RPCs. 979 if ls.Service.EnableTagOverride { 980 ls.Service.Tags = make([]string, len(rs.Tags)) 981 copy(ls.Service.Tags, rs.Tags) 982 } 983 ls.InSync = ls.Service.IsSame(rs) 984 } 985 986 // Check which checks need syncing 987 988 // Look for local checks that do not exist remotely and mark them for 989 // syncing so that they will be pushed to the server later 990 for id, c := range l.checks { 991 if remoteChecks[id] == nil { 992 c.InSync = false 993 } 994 } 995 996 // Traverse the list of checks from the server. 997 // Remote checks which do not exist locally have been deregistered. 998 // Otherwise, check whether the two definitions are still in sync. 999 for id, rc := range remoteChecks { 1000 lc := l.checks[id] 1001 1002 if lc == nil { 1003 // The Serf check is created automatically and does not 1004 // need to be deregistered. 1005 if id == structs.SerfCompoundCheckID { 1006 l.logger.Debug("Skipping remote check since it is managed automatically", "check", structs.SerfCheckID) 1007 continue 1008 } 1009 1010 // Mark a remote check that does not exist locally as deleted so 1011 // that it will be removed on the server later. 1012 l.checks[id] = &CheckState{Deleted: true} 1013 continue 1014 } 1015 1016 // If the check is already scheduled for removal skip it. 1017 if lc.Deleted { 1018 continue 1019 } 1020 1021 // If our definition is different, we need to update it 1022 if l.config.CheckUpdateInterval == 0 { 1023 lc.InSync = lc.Check.IsSame(rc) 1024 continue 1025 } 1026 1027 // Copy the existing check before potentially modifying 1028 // it before the compare operation. 1029 lcCopy := lc.Check.Clone() 1030 1031 // Copy the server's check before modifying, otherwise 1032 // in-memory RPCs will have side effects. 1033 rcCopy := rc.Clone() 1034 1035 // If there's a defer timer active then we've got a 1036 // potentially spammy check so we don't sync the output 1037 // during this sweep since the timer will mark the check 1038 // out of sync for us. Otherwise, it is safe to sync the 1039 // output now. This is especially important for checks 1040 // that don't change state after they are created, in 1041 // which case we'd never see their output synced back ever. 1042 if lc.DeferCheck != nil { 1043 lcCopy.Output = "" 1044 rcCopy.Output = "" 1045 } 1046 lc.InSync = lcCopy.IsSame(rcCopy) 1047 } 1048 return nil 1049} 1050 1051// SyncFull determines the delta between the local and remote state 1052// and synchronizes the changes. 1053func (l *State) SyncFull() error { 1054 // note that we do not acquire the lock here since the methods 1055 // we are calling will do that themselves. 1056 // 1057 // Also note that we don't hold the lock for the entire operation 1058 // but release it between the two calls. This is not an issue since 1059 // the algorithm is best-effort to achieve eventual consistency. 1060 // SyncChanges will sync whatever updateSyncState() has determined 1061 // needs updating. 1062 1063 if err := l.updateSyncState(); err != nil { 1064 return err 1065 } 1066 return l.SyncChanges() 1067} 1068 1069// SyncChanges pushes checks, services and node info data which has been 1070// marked out of sync or deleted to the server. 1071func (l *State) SyncChanges() error { 1072 l.Lock() 1073 defer l.Unlock() 1074 1075 // Sync the node level info if we need to. 1076 if l.nodeInfoInSync { 1077 l.logger.Debug("Node info in sync") 1078 } else { 1079 if err := l.syncNodeInfo(); err != nil { 1080 return err 1081 } 1082 } 1083 1084 // We will do node-level info syncing at the end, since it will get 1085 // updated by a service or check sync anyway, given how the register 1086 // API works. 1087 1088 // Sync the services 1089 // (logging happens in the helper methods) 1090 for id, s := range l.services { 1091 var err error 1092 switch { 1093 case s.Deleted: 1094 err = l.deleteService(id) 1095 case !s.InSync: 1096 err = l.syncService(id) 1097 default: 1098 l.logger.Debug("Service in sync", "service", id.String()) 1099 } 1100 if err != nil { 1101 return err 1102 } 1103 } 1104 1105 // Sync the checks 1106 // (logging happens in the helper methods) 1107 for id, c := range l.checks { 1108 var err error 1109 switch { 1110 case c.Deleted: 1111 err = l.deleteCheck(id) 1112 case !c.InSync: 1113 if c.DeferCheck != nil { 1114 c.DeferCheck.Stop() 1115 c.DeferCheck = nil 1116 } 1117 err = l.syncCheck(id) 1118 default: 1119 l.logger.Debug("Check in sync", "check", id.String()) 1120 } 1121 if err != nil { 1122 return err 1123 } 1124 } 1125 return nil 1126} 1127 1128// deleteService is used to delete a service from the server 1129func (l *State) deleteService(key structs.ServiceID) error { 1130 if key.ID == "" { 1131 return fmt.Errorf("ServiceID missing") 1132 } 1133 1134 st := l.aclTokenForServiceSync(key, l.tokens.AgentToken) 1135 req := structs.DeregisterRequest{ 1136 Datacenter: l.config.Datacenter, 1137 Node: l.config.NodeName, 1138 ServiceID: key.ID, 1139 EnterpriseMeta: key.EnterpriseMeta, 1140 WriteRequest: structs.WriteRequest{Token: st}, 1141 } 1142 var out struct{} 1143 err := l.Delegate.RPC("Catalog.Deregister", &req, &out) 1144 switch { 1145 case err == nil || strings.Contains(err.Error(), "Unknown service"): 1146 delete(l.services, key) 1147 // service deregister also deletes associated checks 1148 for _, c := range l.checks { 1149 if c.Deleted && c.Check != nil { 1150 sid := c.Check.CompoundServiceID() 1151 if sid.Matches(key) { 1152 l.pruneCheck(c.Check.CompoundCheckID()) 1153 } 1154 } 1155 } 1156 l.logger.Info("Deregistered service", "service", key.ID) 1157 return nil 1158 1159 case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err): 1160 // todo(fs): mark the service to be in sync to prevent excessive retrying before next full sync 1161 // todo(fs): some backoff strategy might be a better solution 1162 l.services[key].InSync = true 1163 accessorID := l.aclAccessorID(st) 1164 l.logger.Warn("Service deregistration blocked by ACLs", "service", key.String(), "accessorID", accessorID) 1165 metrics.IncrCounter([]string{"acl", "blocked", "service", "deregistration"}, 1) 1166 return nil 1167 1168 default: 1169 l.logger.Warn("Deregistering service failed.", 1170 "service", key.String(), 1171 "error", err, 1172 ) 1173 return err 1174 } 1175} 1176 1177// deleteCheck is used to delete a check from the server 1178func (l *State) deleteCheck(key structs.CheckID) error { 1179 if key.ID == "" { 1180 return fmt.Errorf("CheckID missing") 1181 } 1182 1183 ct := l.aclTokenForCheckSync(key, l.tokens.AgentToken) 1184 req := structs.DeregisterRequest{ 1185 Datacenter: l.config.Datacenter, 1186 Node: l.config.NodeName, 1187 CheckID: key.ID, 1188 EnterpriseMeta: key.EnterpriseMeta, 1189 WriteRequest: structs.WriteRequest{Token: ct}, 1190 } 1191 var out struct{} 1192 err := l.Delegate.RPC("Catalog.Deregister", &req, &out) 1193 switch { 1194 case err == nil || strings.Contains(err.Error(), "Unknown check"): 1195 l.pruneCheck(key) 1196 l.logger.Info("Deregistered check", "check", key.String()) 1197 return nil 1198 1199 case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err): 1200 // todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync 1201 // todo(fs): some backoff strategy might be a better solution 1202 l.checks[key].InSync = true 1203 accessorID := l.aclAccessorID(ct) 1204 l.logger.Warn("Check deregistration blocked by ACLs", "check", key.String(), "accessorID", accessorID) 1205 metrics.IncrCounter([]string{"acl", "blocked", "check", "deregistration"}, 1) 1206 return nil 1207 1208 default: 1209 l.logger.Warn("Deregistering check failed.", 1210 "check", key.String(), 1211 "error", err, 1212 ) 1213 return err 1214 } 1215} 1216 1217func (l *State) pruneCheck(id structs.CheckID) { 1218 c := l.checks[id] 1219 if c != nil && c.DeferCheck != nil { 1220 c.DeferCheck.Stop() 1221 } 1222 delete(l.checks, id) 1223} 1224 1225// syncService is used to sync a service to the server 1226func (l *State) syncService(key structs.ServiceID) error { 1227 st := l.aclTokenForServiceSync(key, l.tokens.UserToken) 1228 1229 // If the service has associated checks that are out of sync, 1230 // piggyback them on the service sync so they are part of the 1231 // same transaction and are registered atomically. We only let 1232 // checks ride on service registrations with the same token, 1233 // otherwise we need to register them separately so they don't 1234 // pick up privileges from the service token. 1235 var checks structs.HealthChecks 1236 for checkKey, c := range l.checks { 1237 if c.Deleted || c.InSync { 1238 continue 1239 } 1240 if !key.Matches(c.Check.CompoundServiceID()) { 1241 continue 1242 } 1243 if st != l.aclTokenForCheckSync(checkKey, l.tokens.UserToken) { 1244 continue 1245 } 1246 checks = append(checks, c.Check) 1247 } 1248 1249 req := structs.RegisterRequest{ 1250 Datacenter: l.config.Datacenter, 1251 ID: l.config.NodeID, 1252 Node: l.config.NodeName, 1253 Address: l.config.AdvertiseAddr, 1254 TaggedAddresses: l.config.TaggedAddresses, 1255 NodeMeta: l.metadata, 1256 Service: l.services[key].Service, 1257 EnterpriseMeta: key.EnterpriseMeta, 1258 WriteRequest: structs.WriteRequest{Token: st}, 1259 SkipNodeUpdate: l.nodeInfoInSync, 1260 } 1261 1262 // Backwards-compatibility for Consul < 0.5 1263 if len(checks) == 1 { 1264 req.Check = checks[0] 1265 } else { 1266 req.Checks = checks 1267 } 1268 1269 var out struct{} 1270 err := l.Delegate.RPC("Catalog.Register", &req, &out) 1271 switch { 1272 case err == nil: 1273 l.services[key].InSync = true 1274 // Given how the register API works, this info is also updated 1275 // every time we sync a service. 1276 l.nodeInfoInSync = true 1277 for _, check := range checks { 1278 checkKey := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta) 1279 l.checks[checkKey].InSync = true 1280 } 1281 l.logger.Info("Synced service", "service", key.String()) 1282 return nil 1283 1284 case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err): 1285 // todo(fs): mark the service and the checks to be in sync to prevent excessive retrying before next full sync 1286 // todo(fs): some backoff strategy might be a better solution 1287 l.services[key].InSync = true 1288 for _, check := range checks { 1289 checkKey := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta) 1290 l.checks[checkKey].InSync = true 1291 } 1292 accessorID := l.aclAccessorID(st) 1293 l.logger.Warn("Service registration blocked by ACLs", "service", key.String(), "accessorID", accessorID) 1294 metrics.IncrCounter([]string{"acl", "blocked", "service", "registration"}, 1) 1295 return nil 1296 1297 default: 1298 l.logger.Warn("Syncing service failed.", 1299 "service", key.String(), 1300 "error", err, 1301 ) 1302 return err 1303 } 1304} 1305 1306// syncCheck is used to sync a check to the server 1307func (l *State) syncCheck(key structs.CheckID) error { 1308 c := l.checks[key] 1309 ct := l.aclTokenForCheckSync(key, l.tokens.UserToken) 1310 req := structs.RegisterRequest{ 1311 Datacenter: l.config.Datacenter, 1312 ID: l.config.NodeID, 1313 Node: l.config.NodeName, 1314 Address: l.config.AdvertiseAddr, 1315 TaggedAddresses: l.config.TaggedAddresses, 1316 NodeMeta: l.metadata, 1317 Check: c.Check, 1318 EnterpriseMeta: c.Check.EnterpriseMeta, 1319 WriteRequest: structs.WriteRequest{Token: ct}, 1320 SkipNodeUpdate: l.nodeInfoInSync, 1321 } 1322 1323 serviceKey := structs.NewServiceID(c.Check.ServiceID, &key.EnterpriseMeta) 1324 1325 // Pull in the associated service if any 1326 s := l.services[serviceKey] 1327 if s != nil && !s.Deleted { 1328 req.Service = s.Service 1329 } 1330 1331 var out struct{} 1332 err := l.Delegate.RPC("Catalog.Register", &req, &out) 1333 switch { 1334 case err == nil: 1335 l.checks[key].InSync = true 1336 // Given how the register API works, this info is also updated 1337 // every time we sync a check. 1338 l.nodeInfoInSync = true 1339 l.logger.Info("Synced check", "check", key.String()) 1340 return nil 1341 1342 case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err): 1343 // todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync 1344 // todo(fs): some backoff strategy might be a better solution 1345 l.checks[key].InSync = true 1346 accessorID := l.aclAccessorID(ct) 1347 l.logger.Warn("Check registration blocked by ACLs", "check", key.String(), "accessorID", accessorID) 1348 metrics.IncrCounter([]string{"acl", "blocked", "check", "registration"}, 1) 1349 return nil 1350 1351 default: 1352 l.logger.Warn("Syncing check failed.", 1353 "check", key.String(), 1354 "error", err, 1355 ) 1356 return err 1357 } 1358} 1359 1360func (l *State) syncNodeInfo() error { 1361 at := l.tokens.AgentToken() 1362 req := structs.RegisterRequest{ 1363 Datacenter: l.config.Datacenter, 1364 ID: l.config.NodeID, 1365 Node: l.config.NodeName, 1366 Address: l.config.AdvertiseAddr, 1367 TaggedAddresses: l.config.TaggedAddresses, 1368 NodeMeta: l.metadata, 1369 WriteRequest: structs.WriteRequest{Token: at}, 1370 } 1371 var out struct{} 1372 err := l.Delegate.RPC("Catalog.Register", &req, &out) 1373 switch { 1374 case err == nil: 1375 l.nodeInfoInSync = true 1376 l.logger.Info("Synced node info") 1377 return nil 1378 1379 case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err): 1380 // todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync 1381 // todo(fs): some backoff strategy might be a better solution 1382 l.nodeInfoInSync = true 1383 accessorID := l.aclAccessorID(at) 1384 l.logger.Warn("Node info update blocked by ACLs", "node", l.config.NodeID, "accessorID", accessorID) 1385 metrics.IncrCounter([]string{"acl", "blocked", "node", "registration"}, 1) 1386 return nil 1387 1388 default: 1389 l.logger.Warn("Syncing node info failed.", "error", err) 1390 return err 1391 } 1392} 1393 1394// notifyIfAliased will notify waiters of changes to an aliased service 1395func (l *State) notifyIfAliased(serviceID structs.ServiceID) { 1396 if aliases, ok := l.checkAliases[serviceID]; ok && len(aliases) > 0 { 1397 for _, notifyCh := range aliases { 1398 // Do not block. All notify channels should be buffered to at 1399 // least 1 in which case not-blocking does not result in loss 1400 // of data because a failed send means a notification is 1401 // already queued. This must be called with the lock held. 1402 select { 1403 case notifyCh <- struct{}{}: 1404 default: 1405 } 1406 } 1407 } 1408} 1409 1410// aclAccessorID is used to convert an ACLToken's secretID to its accessorID for non- 1411// critical purposes, such as logging. Therefore we interpret all errors as empty-string 1412// so we can safely log it without handling non-critical errors at the usage site. 1413func (l *State) aclAccessorID(secretID string) string { 1414 ident, err := l.Delegate.ResolveTokenToIdentity(secretID) 1415 if acl.IsErrNotFound(err) { 1416 return "" 1417 } 1418 if err != nil { 1419 l.logger.Debug("non-critical error resolving acl token accessor for logging", "error", err) 1420 return "" 1421 } 1422 if ident == nil { 1423 return "" 1424 } 1425 return ident.ID() 1426} 1427