1package local
2
3import (
4	"fmt"
5	"reflect"
6	"strconv"
7	"strings"
8	"sync"
9	"sync/atomic"
10	"time"
11
12	"github.com/armon/go-metrics"
13	"github.com/armon/go-metrics/prometheus"
14	"github.com/hashicorp/go-hclog"
15
16	"github.com/hashicorp/consul/acl"
17	"github.com/hashicorp/consul/agent/structs"
18	"github.com/hashicorp/consul/agent/token"
19	"github.com/hashicorp/consul/api"
20	"github.com/hashicorp/consul/lib"
21	"github.com/hashicorp/consul/types"
22)
23
24var StateCounters = []prometheus.CounterDefinition{
25	{
26		Name: []string{"acl", "blocked", "service", "registration"},
27		Help: "Increments whenever a registration fails for a service (blocked by an ACL)",
28	},
29	{
30		Name: []string{"acl", "blocked", "service", "deregistration"},
31		Help: "Increments whenever a deregistration fails for a service (blocked by an ACL)",
32	},
33	{
34		Name: []string{"acl", "blocked", "check", "registration"},
35		Help: "Increments whenever a registration fails for a check (blocked by an ACL)",
36	},
37	{
38		Name: []string{"acl", "blocked", "check", "deregistration"},
39		Help: "Increments whenever a deregistration fails for a check (blocked by an ACL)",
40	},
41	{
42		Name: []string{"acl", "blocked", "node", "registration"},
43		Help: "Increments whenever a registration fails for a node (blocked by an ACL)",
44	},
45	{
46		Name: []string{"acl", "blocked", "node", "deregistration"},
47		Help: "Increments whenever a deregistration fails for a node (blocked by an ACL)",
48	},
49}
50
51const fullSyncReadMaxStale = 2 * time.Second
52
53// Config is the configuration for the State.
54type Config struct {
55	AdvertiseAddr       string
56	CheckUpdateInterval time.Duration
57	Datacenter          string
58	DiscardCheckOutput  bool
59	NodeID              types.NodeID
60	NodeName            string
61	TaggedAddresses     map[string]string
62}
63
64// ServiceState describes the state of a service record.
65type ServiceState struct {
66	// Service is the local copy of the service record.
67	Service *structs.NodeService
68
69	// Token is the ACL to update or delete the service record on the
70	// server.
71	Token string
72
73	// InSync contains whether the local state of the service record
74	// is in sync with the remote state on the server.
75	InSync bool
76
77	// Deleted is true when the service record has been marked as deleted
78	// but has not been removed on the server yet.
79	Deleted bool
80
81	// WatchCh is closed when the service state changes. Suitable for use in a
82	// memdb.WatchSet when watching agent local changes with hash-based blocking.
83	WatchCh chan struct{}
84}
85
86// Clone returns a shallow copy of the object. The service record still points
87// to the original service record and must not be modified. The WatchCh is also
88// still pointing to the original so the clone will be update when the original
89// is.
90func (s *ServiceState) Clone() *ServiceState {
91	s2 := new(ServiceState)
92	*s2 = *s
93	return s2
94}
95
96// CheckState describes the state of a health check record.
97type CheckState struct {
98	// Check is the local copy of the health check record.
99	//
100	// Must Clone() the overall CheckState before mutating this. After mutation
101	// reinstall into the checks map. If Deleted is true, this field can be nil.
102	Check *structs.HealthCheck
103
104	// Token is the ACL record to update or delete the health check
105	// record on the server.
106	Token string
107
108	// CriticalTime is the last time the health check status went
109	// from non-critical to critical. When the health check is not
110	// in critical state the value is the zero value.
111	CriticalTime time.Time
112
113	// DeferCheck is used to delay the sync of a health check when
114	// only the output has changed. This rate limits changes which
115	// do not affect the state of the node and/or service.
116	DeferCheck *time.Timer
117
118	// InSync contains whether the local state of the health check
119	// record is in sync with the remote state on the server.
120	InSync bool
121
122	// Deleted is true when the health check record has been marked as
123	// deleted but has not been removed on the server yet.
124	Deleted bool
125}
126
127// Clone returns a shallow copy of the object.
128//
129// The defer timer still points to the original value and must not be modified.
130func (c *CheckState) Clone() *CheckState {
131	c2 := new(CheckState)
132	*c2 = *c
133	if c.Check != nil {
134		c2.Check = c.Check.Clone()
135	}
136	return c2
137}
138
139// Critical returns true when the health check is in critical state.
140func (c *CheckState) Critical() bool {
141	return !c.CriticalTime.IsZero()
142}
143
144// CriticalFor returns the amount of time the service has been in critical
145// state. Its value is undefined when the service is not in critical state.
146func (c *CheckState) CriticalFor() time.Duration {
147	return time.Since(c.CriticalTime)
148}
149
150type rpc interface {
151	RPC(method string, args interface{}, reply interface{}) error
152	ResolveTokenToIdentity(secretID string) (structs.ACLIdentity, error)
153}
154
155// State is used to represent the node's services,
156// and checks. We use it to perform anti-entropy with the
157// catalog representation
158type State struct {
159	sync.RWMutex
160
161	// Delegate the RPC interface to the consul server or agent.
162	//
163	// It is set after both the state and the consul server/agent have
164	// been created.
165	Delegate rpc
166
167	// TriggerSyncChanges is used to notify the state syncer that a
168	// partial sync should be performed.
169	//
170	// It is set after both the state and the state syncer have been
171	// created.
172	TriggerSyncChanges func()
173
174	logger hclog.Logger
175
176	// Config is the agent config
177	config Config
178
179	// nodeInfoInSync tracks whether the server has our correct top-level
180	// node information in sync
181	nodeInfoInSync bool
182
183	// Services tracks the local services
184	services map[structs.ServiceID]*ServiceState
185
186	// Checks tracks the local checks. checkAliases are aliased checks.
187	checks       map[structs.CheckID]*CheckState
188	checkAliases map[structs.ServiceID]map[structs.CheckID]chan<- struct{}
189
190	// metadata tracks the node metadata fields
191	metadata map[string]string
192
193	// discardCheckOutput stores whether the output of health checks
194	// is stored in the raft log.
195	discardCheckOutput atomic.Value // bool
196
197	// tokens contains the ACL tokens
198	tokens *token.Store
199
200	// notifyHandlers is a map of registered channel listeners that are sent
201	// messages whenever state changes occur. For now these events only include
202	// service registration and deregistration since that is all that is needed
203	// but the same mechanism could be used for other state changes. Any
204	// future notifications should re-use this mechanism.
205	notifyHandlers map[chan<- struct{}]struct{}
206}
207
208// NewState creates a new local state for the agent.
209func NewState(c Config, logger hclog.Logger, tokens *token.Store) *State {
210	l := &State{
211		config:         c,
212		logger:         logger,
213		services:       make(map[structs.ServiceID]*ServiceState),
214		checks:         make(map[structs.CheckID]*CheckState),
215		checkAliases:   make(map[structs.ServiceID]map[structs.CheckID]chan<- struct{}),
216		metadata:       make(map[string]string),
217		tokens:         tokens,
218		notifyHandlers: make(map[chan<- struct{}]struct{}),
219	}
220	l.SetDiscardCheckOutput(c.DiscardCheckOutput)
221	return l
222}
223
224// SetDiscardCheckOutput configures whether the check output
225// is discarded. This can be changed at runtime.
226func (l *State) SetDiscardCheckOutput(b bool) {
227	l.discardCheckOutput.Store(b)
228}
229
230// ServiceToken returns the ACL token associated with the service. If the service is
231// not found, or does not have a token, the empty string is returned.
232func (l *State) ServiceToken(id structs.ServiceID) string {
233	l.RLock()
234	defer l.RUnlock()
235	if s := l.services[id]; s != nil {
236		return s.Token
237	}
238	return ""
239}
240
241// aclTokenForServiceSync returns an ACL token associated with a service. If there is
242// no ACL token associated with the service, fallback is used to return a value.
243// This method is not synchronized and the lock must already be held.
244func (l *State) aclTokenForServiceSync(id structs.ServiceID, fallback func() string) string {
245	if s := l.services[id]; s != nil && s.Token != "" {
246		return s.Token
247	}
248	return fallback()
249}
250
251// AddService is used to add a service entry to the local state.
252// This entry is persistent and the agent will make a best effort to
253// ensure it is registered
254func (l *State) AddService(service *structs.NodeService, token string) error {
255	l.Lock()
256	defer l.Unlock()
257	return l.addServiceLocked(service, token)
258}
259
260func (l *State) addServiceLocked(service *structs.NodeService, token string) error {
261	if service == nil {
262		return fmt.Errorf("no service")
263	}
264
265	// use the service name as id if the id was omitted
266	if service.ID == "" {
267		service.ID = service.Service
268	}
269
270	l.setServiceStateLocked(&ServiceState{
271		Service: service,
272		Token:   token,
273	})
274	return nil
275}
276
277// AddServiceWithChecks adds a service and its check tp the local state atomically
278func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*structs.HealthCheck, token string) error {
279	l.Lock()
280	defer l.Unlock()
281
282	if err := l.addServiceLocked(service, token); err != nil {
283		return err
284	}
285
286	for _, check := range checks {
287		if err := l.addCheckLocked(check, token); err != nil {
288			return err
289		}
290	}
291	return nil
292}
293
294// RemoveService is used to remove a service entry from the local state.
295// The agent will make a best effort to ensure it is deregistered.
296func (l *State) RemoveService(id structs.ServiceID) error {
297	l.Lock()
298	defer l.Unlock()
299	return l.removeServiceLocked(id)
300}
301
302// RemoveServiceWithChecks removes a service and its check from the local state atomically
303func (l *State) RemoveServiceWithChecks(serviceID structs.ServiceID, checkIDs []structs.CheckID) error {
304	l.Lock()
305	defer l.Unlock()
306
307	if err := l.removeServiceLocked(serviceID); err != nil {
308		return err
309	}
310
311	for _, id := range checkIDs {
312		if err := l.removeCheckLocked(id); err != nil {
313			return err
314		}
315	}
316
317	return nil
318}
319
320func (l *State) removeServiceLocked(id structs.ServiceID) error {
321	s := l.services[id]
322	if s == nil || s.Deleted {
323		return fmt.Errorf("Service %q does not exist", id)
324	}
325
326	// To remove the service on the server we need the token.
327	// Therefore, we mark the service as deleted and keep the
328	// entry around until it is actually removed.
329	s.InSync = false
330	s.Deleted = true
331	if s.WatchCh != nil {
332		close(s.WatchCh)
333		s.WatchCh = nil
334	}
335
336	l.notifyIfAliased(id)
337	l.TriggerSyncChanges()
338	l.broadcastUpdateLocked()
339
340	return nil
341}
342
343// Service returns the locally registered service that the
344// agent is aware of and are being kept in sync with the server
345func (l *State) Service(id structs.ServiceID) *structs.NodeService {
346	l.RLock()
347	defer l.RUnlock()
348
349	s := l.services[id]
350	if s == nil || s.Deleted {
351		return nil
352	}
353	return s.Service
354}
355
356// Services returns the locally registered services that the
357// agent is aware of and are being kept in sync with the server
358func (l *State) Services(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*structs.NodeService {
359	l.RLock()
360	defer l.RUnlock()
361
362	m := make(map[structs.ServiceID]*structs.NodeService)
363	for id, s := range l.services {
364		if s.Deleted {
365			continue
366		}
367
368		if !entMeta.Matches(&id.EnterpriseMeta) {
369			continue
370		}
371		m[id] = s.Service
372	}
373	return m
374}
375
376// ServiceState returns a shallow copy of the current service state record. The
377// service record still points to the original service record and must not be
378// modified. The WatchCh for the copy returned will also be closed when the
379// actual service state is changed.
380func (l *State) ServiceState(id structs.ServiceID) *ServiceState {
381	l.RLock()
382	defer l.RUnlock()
383
384	s := l.services[id]
385	if s == nil || s.Deleted {
386		return nil
387	}
388	return s.Clone()
389}
390
391// SetServiceState is used to overwrite a raw service state with the given
392// state. This method is safe to be called concurrently but should only be used
393// during testing. You should most likely call AddService instead.
394func (l *State) SetServiceState(s *ServiceState) {
395	l.Lock()
396	defer l.Unlock()
397
398	l.setServiceStateLocked(s)
399}
400
401func (l *State) setServiceStateLocked(s *ServiceState) {
402	key := s.Service.CompoundServiceID()
403	old, hasOld := l.services[key]
404	if hasOld {
405		s.InSync = s.Service.IsSame(old.Service)
406	}
407	l.services[key] = s
408
409	s.WatchCh = make(chan struct{}, 1)
410	if hasOld && old.WatchCh != nil {
411		close(old.WatchCh)
412	}
413	if !hasOld {
414		// The status of an alias check is updated if the alias service is added/removed
415		// Only try notify alias checks if service didn't already exist (!hasOld)
416		l.notifyIfAliased(key)
417	}
418
419	l.TriggerSyncChanges()
420	l.broadcastUpdateLocked()
421}
422
423// ServiceStates returns a shallow copy of all service state records.
424// The service record still points to the original service record and
425// must not be modified.
426func (l *State) ServiceStates(entMeta *structs.EnterpriseMeta) map[structs.ServiceID]*ServiceState {
427	l.RLock()
428	defer l.RUnlock()
429
430	m := make(map[structs.ServiceID]*ServiceState)
431	for id, s := range l.services {
432		if s.Deleted {
433			continue
434		}
435		if !entMeta.Matches(&id.EnterpriseMeta) {
436			continue
437		}
438		m[id] = s.Clone()
439	}
440	return m
441}
442
443// CheckToken returns the ACL token associated with the check. If the check is
444// not found, or does not have a token, the empty string is returned.
445func (l *State) CheckToken(id structs.CheckID) string {
446	l.RLock()
447	defer l.RUnlock()
448	if c := l.checks[id]; c != nil {
449		return c.Token
450	}
451	return ""
452}
453
454// aclTokenForCheckSync returns an ACL token associated with a check. If there is
455// no ACL token associated with the check, the callback is used to return a value.
456// This method is not synchronized and the lock must already be held.
457func (l *State) aclTokenForCheckSync(id structs.CheckID, fallback func() string) string {
458	if c := l.checks[id]; c != nil && c.Token != "" {
459		return c.Token
460	}
461	return fallback()
462}
463
464// AddCheck is used to add a health check to the local state.
465// This entry is persistent and the agent will make a best effort to
466// ensure it is registered
467func (l *State) AddCheck(check *structs.HealthCheck, token string) error {
468	l.Lock()
469	defer l.Unlock()
470
471	return l.addCheckLocked(check, token)
472}
473
474func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
475	if check == nil {
476		return fmt.Errorf("no check")
477	}
478
479	// clone the check since we will be modifying it.
480	check = check.Clone()
481
482	if l.discardCheckOutput.Load().(bool) {
483		check.Output = ""
484	}
485
486	// if there is a serviceID associated with the check, make sure it exists before adding it
487	// NOTE - This logic may be moved to be handled within the Agent's Addcheck method after a refactor
488	if _, ok := l.services[check.CompoundServiceID()]; check.ServiceID != "" && !ok {
489		return fmt.Errorf("Check %q refers to non-existent service %q", check.CheckID, check.ServiceID)
490	}
491
492	// hard-set the node name
493	check.Node = l.config.NodeName
494
495	l.setCheckStateLocked(&CheckState{
496		Check: check,
497		Token: token,
498	})
499	return nil
500}
501
502// AddAliasCheck creates an alias check. When any check for the srcServiceID is
503// changed, checkID will reflect that using the same semantics as
504// checks.CheckAlias.
505//
506// This is a local optimization so that the Alias check doesn't need to use
507// blocking queries against the remote server for check updates for local
508// services.
509func (l *State) AddAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID, notifyCh chan<- struct{}) error {
510	l.Lock()
511	defer l.Unlock()
512
513	m, ok := l.checkAliases[srcServiceID]
514	if !ok {
515		m = make(map[structs.CheckID]chan<- struct{})
516		l.checkAliases[srcServiceID] = m
517	}
518	m[checkID] = notifyCh
519
520	return nil
521}
522
523// ServiceExists return true if the given service does exists
524func (l *State) ServiceExists(serviceID structs.ServiceID) bool {
525	serviceID.EnterpriseMeta.Normalize()
526
527	l.Lock()
528	defer l.Unlock()
529	return l.services[serviceID] != nil
530}
531
532// RemoveAliasCheck removes the mapping for the alias check.
533func (l *State) RemoveAliasCheck(checkID structs.CheckID, srcServiceID structs.ServiceID) {
534	l.Lock()
535	defer l.Unlock()
536
537	if m, ok := l.checkAliases[srcServiceID]; ok {
538		delete(m, checkID)
539		if len(m) == 0 {
540			delete(l.checkAliases, srcServiceID)
541		}
542	}
543}
544
545// RemoveCheck is used to remove a health check from the local state.
546// The agent will make a best effort to ensure it is deregistered
547// todo(fs): RemoveService returns an error for a non-existent service. RemoveCheck should as well.
548// todo(fs): Check code that calls this to handle the error.
549func (l *State) RemoveCheck(id structs.CheckID) error {
550	l.Lock()
551	defer l.Unlock()
552	return l.removeCheckLocked(id)
553}
554
555func (l *State) removeCheckLocked(id structs.CheckID) error {
556	c := l.checks[id]
557	if c == nil || c.Deleted {
558		return fmt.Errorf("Check %q does not exist", id)
559	}
560
561	// If this is a check for an aliased service, then notify the waiters.
562	l.notifyIfAliased(c.Check.CompoundServiceID())
563
564	// To remove the check on the server we need the token.
565	// Therefore, we mark the service as deleted and keep the
566	// entry around until it is actually removed.
567	c.InSync = false
568	c.Deleted = true
569	l.TriggerSyncChanges()
570
571	return nil
572}
573
574// UpdateCheck is used to update the status of a check
575func (l *State) UpdateCheck(id structs.CheckID, status, output string) {
576	l.Lock()
577	defer l.Unlock()
578
579	c := l.checks[id]
580	if c == nil || c.Deleted {
581		return
582	}
583
584	if l.discardCheckOutput.Load().(bool) {
585		output = ""
586	}
587
588	// Update the critical time tracking (this doesn't cause a server updates
589	// so we can always keep this up to date).
590	if status == api.HealthCritical {
591		if !c.Critical() {
592			c.CriticalTime = time.Now()
593		}
594	} else {
595		c.CriticalTime = time.Time{}
596	}
597
598	// Do nothing if update is idempotent
599	if c.Check.Status == status && c.Check.Output == output {
600		return
601	}
602
603	// Ensure we only mutate a copy of the check state and put the finalized
604	// version into the checks map when complete.
605	//
606	// Note that we are relying upon the earlier deferred mutex unlock to
607	// happen AFTER this defer. As per the Go spec this is true, but leaving
608	// this note here for the future in case of any refactorings which may not
609	// notice this relationship.
610	c = c.Clone()
611	defer func(c *CheckState) {
612		l.checks[id] = c
613	}(c)
614
615	// Defer a sync if the output has changed. This is an optimization around
616	// frequent updates of output. Instead, we update the output internally,
617	// and periodically do a write-back to the servers. If there is a status
618	// change we do the write immediately.
619	if l.config.CheckUpdateInterval > 0 && c.Check.Status == status {
620		c.Check.Output = output
621		if c.DeferCheck == nil {
622			d := l.config.CheckUpdateInterval
623			intv := time.Duration(uint64(d)/2) + lib.RandomStagger(d)
624			c.DeferCheck = time.AfterFunc(intv, func() {
625				l.Lock()
626				defer l.Unlock()
627
628				c := l.checks[id]
629				if c == nil {
630					return
631				}
632				c.DeferCheck = nil
633				if c.Deleted {
634					return
635				}
636				c.InSync = false
637				l.TriggerSyncChanges()
638			})
639		}
640		return
641	}
642
643	// If this is a check for an aliased service, then notify the waiters.
644	l.notifyIfAliased(c.Check.CompoundServiceID())
645
646	// Update status and mark out of sync
647	c.Check.Status = status
648	c.Check.Output = output
649	c.InSync = false
650	l.TriggerSyncChanges()
651}
652
653// Check returns the locally registered check that the
654// agent is aware of and are being kept in sync with the server
655func (l *State) Check(id structs.CheckID) *structs.HealthCheck {
656	l.RLock()
657	defer l.RUnlock()
658
659	c := l.checks[id]
660	if c == nil || c.Deleted {
661		return nil
662	}
663	return c.Check
664}
665
666// Checks returns the locally registered checks that the
667// agent is aware of and are being kept in sync with the server
668func (l *State) Checks(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*structs.HealthCheck {
669	m := make(map[structs.CheckID]*structs.HealthCheck)
670	for id, c := range l.CheckStates(entMeta) {
671		m[id] = c.Check
672	}
673	return m
674}
675
676func (l *State) ChecksForService(serviceID structs.ServiceID, includeNodeChecks bool) map[structs.CheckID]*structs.HealthCheck {
677	m := make(map[structs.CheckID]*structs.HealthCheck)
678
679	l.RLock()
680	defer l.RUnlock()
681
682	for id, c := range l.checks {
683		if c.Deleted {
684			continue
685		}
686
687		if c.Check.ServiceID != "" {
688			sid := c.Check.CompoundServiceID()
689			if !serviceID.Matches(sid) {
690				continue
691			}
692		} else if !includeNodeChecks {
693			continue
694		}
695
696		m[id] = c.Check.Clone()
697	}
698	return m
699}
700
701// CheckState returns a shallow copy of the current health check state record.
702//
703// The defer timer still points to the original value and must not be modified.
704func (l *State) CheckState(id structs.CheckID) *CheckState {
705	l.RLock()
706	defer l.RUnlock()
707
708	c := l.checks[id]
709	if c == nil || c.Deleted {
710		return nil
711	}
712	return c.Clone()
713}
714
715// SetCheckState is used to overwrite a raw check state with the given
716// state. This method is safe to be called concurrently but should only be used
717// during testing. You should most likely call AddCheck instead.
718func (l *State) SetCheckState(c *CheckState) {
719	l.Lock()
720	defer l.Unlock()
721
722	l.setCheckStateLocked(c)
723}
724
725func (l *State) setCheckStateLocked(c *CheckState) {
726	id := c.Check.CompoundCheckID()
727	existing := l.checks[id]
728	if existing != nil {
729		c.InSync = c.Check.IsSame(existing.Check)
730	}
731
732	l.checks[id] = c
733
734	// If this is a check for an aliased service, then notify the waiters.
735	l.notifyIfAliased(c.Check.CompoundServiceID())
736
737	l.TriggerSyncChanges()
738}
739
740// CheckStates returns a shallow copy of all health check state records.
741// The map contains a shallow copy of the current check states.
742//
743// The defer timers still point to the original values and must not be modified.
744func (l *State) CheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState {
745	l.RLock()
746	defer l.RUnlock()
747
748	m := make(map[structs.CheckID]*CheckState)
749	for id, c := range l.checks {
750		if c.Deleted {
751			continue
752		}
753		if !entMeta.Matches(&id.EnterpriseMeta) {
754			continue
755		}
756		m[id] = c.Clone()
757	}
758	return m
759}
760
761// CriticalCheckStates returns the locally registered checks that the
762// agent is aware of and are being kept in sync with the server.
763// The map contains a shallow copy of the current check states.
764//
765// The defer timers still point to the original values and must not be modified.
766func (l *State) CriticalCheckStates(entMeta *structs.EnterpriseMeta) map[structs.CheckID]*CheckState {
767	l.RLock()
768	defer l.RUnlock()
769
770	m := make(map[structs.CheckID]*CheckState)
771	for id, c := range l.checks {
772		if c.Deleted || !c.Critical() {
773			continue
774		}
775		if !entMeta.Matches(&id.EnterpriseMeta) {
776			continue
777		}
778		m[id] = c.Clone()
779	}
780	return m
781}
782
783// broadcastUpdateLocked assumes l is locked and delivers an update to all
784// registered watchers.
785func (l *State) broadcastUpdateLocked() {
786	for ch := range l.notifyHandlers {
787		// Do not block
788		select {
789		case ch <- struct{}{}:
790		default:
791		}
792	}
793}
794
795// Notify will register a channel to receive messages when the local state
796// changes. Only service add/remove are supported for now. See notes on
797// l.notifyHandlers for more details.
798//
799// This will not block on channel send so ensure the channel has a buffer. Note
800// that any buffer size is generally fine since actual data is not sent over the
801// channel, so a dropped send due to a full buffer does not result in any loss
802// of data. The fact that a buffer already contains a notification means that
803// the receiver will still be notified that changes occurred.
804func (l *State) Notify(ch chan<- struct{}) {
805	l.Lock()
806	defer l.Unlock()
807	l.notifyHandlers[ch] = struct{}{}
808}
809
810// StopNotify will deregister a channel receiving state change notifications.
811// Pair this with all calls to Notify to clean up state.
812func (l *State) StopNotify(ch chan<- struct{}) {
813	l.Lock()
814	defer l.Unlock()
815	delete(l.notifyHandlers, ch)
816}
817
818// Metadata returns the local node metadata fields that the
819// agent is aware of and are being kept in sync with the server
820func (l *State) Metadata() map[string]string {
821	l.RLock()
822	defer l.RUnlock()
823
824	m := make(map[string]string)
825	for k, v := range l.metadata {
826		m[k] = v
827	}
828	return m
829}
830
831// LoadMetadata loads node metadata fields from the agent config and
832// updates them on the local agent.
833func (l *State) LoadMetadata(data map[string]string) error {
834	l.Lock()
835	defer l.Unlock()
836
837	for k, v := range data {
838		l.metadata[k] = v
839	}
840	l.TriggerSyncChanges()
841	return nil
842}
843
844// UnloadMetadata resets the local metadata state
845func (l *State) UnloadMetadata() {
846	l.Lock()
847	defer l.Unlock()
848	l.metadata = make(map[string]string)
849}
850
851// Stats is used to get various debugging state from the sub-systems
852func (l *State) Stats() map[string]string {
853	l.RLock()
854	defer l.RUnlock()
855
856	services := 0
857	for _, s := range l.services {
858		if s.Deleted {
859			continue
860		}
861		services++
862	}
863
864	checks := 0
865	for _, c := range l.checks {
866		if c.Deleted {
867			continue
868		}
869		checks++
870	}
871
872	return map[string]string{
873		"services": strconv.Itoa(services),
874		"checks":   strconv.Itoa(checks),
875	}
876}
877
878// updateSyncState queries the server for all the services and checks in the catalog
879// registered to this node, and updates the local entries as InSync or Deleted.
880func (l *State) updateSyncState() error {
881	// Get all checks and services from the master
882	req := structs.NodeSpecificRequest{
883		Datacenter: l.config.Datacenter,
884		Node:       l.config.NodeName,
885		QueryOptions: structs.QueryOptions{
886			Token:            l.tokens.AgentToken(),
887			AllowStale:       true,
888			MaxStaleDuration: fullSyncReadMaxStale,
889		},
890		EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
891	}
892
893	var out1 structs.IndexedNodeServiceList
894	remoteServices := make(map[structs.ServiceID]*structs.NodeService)
895	var svcNode *structs.Node
896
897	if err := l.Delegate.RPC("Catalog.NodeServiceList", &req, &out1); err == nil {
898		for _, svc := range out1.NodeServices.Services {
899			remoteServices[svc.CompoundServiceID()] = svc
900		}
901
902		svcNode = out1.NodeServices.Node
903	} else if errMsg := err.Error(); strings.Contains(errMsg, "rpc: can't find method") {
904		// fallback to the old RPC
905		var out1 structs.IndexedNodeServices
906		if err := l.Delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil {
907			return err
908		}
909
910		if out1.NodeServices != nil {
911			for _, svc := range out1.NodeServices.Services {
912				remoteServices[svc.CompoundServiceID()] = svc
913			}
914
915			svcNode = out1.NodeServices.Node
916		}
917	} else {
918		return err
919	}
920
921	var out2 structs.IndexedHealthChecks
922	if err := l.Delegate.RPC("Health.NodeChecks", &req, &out2); err != nil {
923		return err
924	}
925
926	remoteChecks := make(map[structs.CheckID]*structs.HealthCheck, len(out2.HealthChecks))
927	for _, rc := range out2.HealthChecks {
928		remoteChecks[rc.CompoundCheckID()] = rc
929	}
930
931	// Traverse all checks, services and the node info to determine
932	// which entries need to be updated on or removed from the server
933
934	l.Lock()
935	defer l.Unlock()
936
937	// Check if node info needs syncing
938	if svcNode == nil || svcNode.ID != l.config.NodeID ||
939		!reflect.DeepEqual(svcNode.TaggedAddresses, l.config.TaggedAddresses) ||
940		!reflect.DeepEqual(svcNode.Meta, l.metadata) {
941		l.nodeInfoInSync = false
942	}
943	// Check which services need syncing
944
945	// Look for local services that do not exist remotely and mark them for
946	// syncing so that they will be pushed to the server later
947	for id, s := range l.services {
948		if remoteServices[id] == nil {
949			s.InSync = false
950		}
951	}
952
953	// Traverse the list of services from the server.
954	// Remote services which do not exist locally have been deregistered.
955	// Otherwise, check whether the two definitions are still in sync.
956	for id, rs := range remoteServices {
957		ls := l.services[id]
958		if ls == nil {
959			// The consul service is managed automatically and does
960			// not need to be deregistered
961			if id == structs.ConsulCompoundServiceID {
962				continue
963			}
964
965			// Mark a remote service that does not exist locally as deleted so
966			// that it will be removed on the server later.
967			l.services[id] = &ServiceState{Deleted: true}
968			continue
969		}
970
971		// If the service is already scheduled for removal skip it
972		if ls.Deleted {
973			continue
974		}
975
976		// If our definition is different, we need to update it. Make a
977		// copy so that we don't retain a pointer to any actual state
978		// store info for in-memory RPCs.
979		if ls.Service.EnableTagOverride {
980			ls.Service.Tags = make([]string, len(rs.Tags))
981			copy(ls.Service.Tags, rs.Tags)
982		}
983		ls.InSync = ls.Service.IsSame(rs)
984	}
985
986	// Check which checks need syncing
987
988	// Look for local checks that do not exist remotely and mark them for
989	// syncing so that they will be pushed to the server later
990	for id, c := range l.checks {
991		if remoteChecks[id] == nil {
992			c.InSync = false
993		}
994	}
995
996	// Traverse the list of checks from the server.
997	// Remote checks which do not exist locally have been deregistered.
998	// Otherwise, check whether the two definitions are still in sync.
999	for id, rc := range remoteChecks {
1000		lc := l.checks[id]
1001
1002		if lc == nil {
1003			// The Serf check is created automatically and does not
1004			// need to be deregistered.
1005			if id == structs.SerfCompoundCheckID {
1006				l.logger.Debug("Skipping remote check since it is managed automatically", "check", structs.SerfCheckID)
1007				continue
1008			}
1009
1010			// Mark a remote check that does not exist locally as deleted so
1011			// that it will be removed on the server later.
1012			l.checks[id] = &CheckState{Deleted: true}
1013			continue
1014		}
1015
1016		// If the check is already scheduled for removal skip it.
1017		if lc.Deleted {
1018			continue
1019		}
1020
1021		// If our definition is different, we need to update it
1022		if l.config.CheckUpdateInterval == 0 {
1023			lc.InSync = lc.Check.IsSame(rc)
1024			continue
1025		}
1026
1027		// Copy the existing check before potentially modifying
1028		// it before the compare operation.
1029		lcCopy := lc.Check.Clone()
1030
1031		// Copy the server's check before modifying, otherwise
1032		// in-memory RPCs will have side effects.
1033		rcCopy := rc.Clone()
1034
1035		// If there's a defer timer active then we've got a
1036		// potentially spammy check so we don't sync the output
1037		// during this sweep since the timer will mark the check
1038		// out of sync for us. Otherwise, it is safe to sync the
1039		// output now. This is especially important for checks
1040		// that don't change state after they are created, in
1041		// which case we'd never see their output synced back ever.
1042		if lc.DeferCheck != nil {
1043			lcCopy.Output = ""
1044			rcCopy.Output = ""
1045		}
1046		lc.InSync = lcCopy.IsSame(rcCopy)
1047	}
1048	return nil
1049}
1050
1051// SyncFull determines the delta between the local and remote state
1052// and synchronizes the changes.
1053func (l *State) SyncFull() error {
1054	// note that we do not acquire the lock here since the methods
1055	// we are calling will do that themselves.
1056	//
1057	// Also note that we don't hold the lock for the entire operation
1058	// but release it between the two calls. This is not an issue since
1059	// the algorithm is best-effort to achieve eventual consistency.
1060	// SyncChanges will sync whatever updateSyncState() has determined
1061	// needs updating.
1062
1063	if err := l.updateSyncState(); err != nil {
1064		return err
1065	}
1066	return l.SyncChanges()
1067}
1068
1069// SyncChanges pushes checks, services and node info data which has been
1070// marked out of sync or deleted to the server.
1071func (l *State) SyncChanges() error {
1072	l.Lock()
1073	defer l.Unlock()
1074
1075	// Sync the node level info if we need to.
1076	if l.nodeInfoInSync {
1077		l.logger.Debug("Node info in sync")
1078	} else {
1079		if err := l.syncNodeInfo(); err != nil {
1080			return err
1081		}
1082	}
1083
1084	// We will do node-level info syncing at the end, since it will get
1085	// updated by a service or check sync anyway, given how the register
1086	// API works.
1087
1088	// Sync the services
1089	// (logging happens in the helper methods)
1090	for id, s := range l.services {
1091		var err error
1092		switch {
1093		case s.Deleted:
1094			err = l.deleteService(id)
1095		case !s.InSync:
1096			err = l.syncService(id)
1097		default:
1098			l.logger.Debug("Service in sync", "service", id.String())
1099		}
1100		if err != nil {
1101			return err
1102		}
1103	}
1104
1105	// Sync the checks
1106	// (logging happens in the helper methods)
1107	for id, c := range l.checks {
1108		var err error
1109		switch {
1110		case c.Deleted:
1111			err = l.deleteCheck(id)
1112		case !c.InSync:
1113			if c.DeferCheck != nil {
1114				c.DeferCheck.Stop()
1115				c.DeferCheck = nil
1116			}
1117			err = l.syncCheck(id)
1118		default:
1119			l.logger.Debug("Check in sync", "check", id.String())
1120		}
1121		if err != nil {
1122			return err
1123		}
1124	}
1125	return nil
1126}
1127
1128// deleteService is used to delete a service from the server
1129func (l *State) deleteService(key structs.ServiceID) error {
1130	if key.ID == "" {
1131		return fmt.Errorf("ServiceID missing")
1132	}
1133
1134	st := l.aclTokenForServiceSync(key, l.tokens.AgentToken)
1135	req := structs.DeregisterRequest{
1136		Datacenter:     l.config.Datacenter,
1137		Node:           l.config.NodeName,
1138		ServiceID:      key.ID,
1139		EnterpriseMeta: key.EnterpriseMeta,
1140		WriteRequest:   structs.WriteRequest{Token: st},
1141	}
1142	var out struct{}
1143	err := l.Delegate.RPC("Catalog.Deregister", &req, &out)
1144	switch {
1145	case err == nil || strings.Contains(err.Error(), "Unknown service"):
1146		delete(l.services, key)
1147		// service deregister also deletes associated checks
1148		for _, c := range l.checks {
1149			if c.Deleted && c.Check != nil {
1150				sid := c.Check.CompoundServiceID()
1151				if sid.Matches(key) {
1152					l.pruneCheck(c.Check.CompoundCheckID())
1153				}
1154			}
1155		}
1156		l.logger.Info("Deregistered service", "service", key.ID)
1157		return nil
1158
1159	case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err):
1160		// todo(fs): mark the service to be in sync to prevent excessive retrying before next full sync
1161		// todo(fs): some backoff strategy might be a better solution
1162		l.services[key].InSync = true
1163		accessorID := l.aclAccessorID(st)
1164		l.logger.Warn("Service deregistration blocked by ACLs", "service", key.String(), "accessorID", accessorID)
1165		metrics.IncrCounter([]string{"acl", "blocked", "service", "deregistration"}, 1)
1166		return nil
1167
1168	default:
1169		l.logger.Warn("Deregistering service failed.",
1170			"service", key.String(),
1171			"error", err,
1172		)
1173		return err
1174	}
1175}
1176
1177// deleteCheck is used to delete a check from the server
1178func (l *State) deleteCheck(key structs.CheckID) error {
1179	if key.ID == "" {
1180		return fmt.Errorf("CheckID missing")
1181	}
1182
1183	ct := l.aclTokenForCheckSync(key, l.tokens.AgentToken)
1184	req := structs.DeregisterRequest{
1185		Datacenter:     l.config.Datacenter,
1186		Node:           l.config.NodeName,
1187		CheckID:        key.ID,
1188		EnterpriseMeta: key.EnterpriseMeta,
1189		WriteRequest:   structs.WriteRequest{Token: ct},
1190	}
1191	var out struct{}
1192	err := l.Delegate.RPC("Catalog.Deregister", &req, &out)
1193	switch {
1194	case err == nil || strings.Contains(err.Error(), "Unknown check"):
1195		l.pruneCheck(key)
1196		l.logger.Info("Deregistered check", "check", key.String())
1197		return nil
1198
1199	case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err):
1200		// todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync
1201		// todo(fs): some backoff strategy might be a better solution
1202		l.checks[key].InSync = true
1203		accessorID := l.aclAccessorID(ct)
1204		l.logger.Warn("Check deregistration blocked by ACLs", "check", key.String(), "accessorID", accessorID)
1205		metrics.IncrCounter([]string{"acl", "blocked", "check", "deregistration"}, 1)
1206		return nil
1207
1208	default:
1209		l.logger.Warn("Deregistering check failed.",
1210			"check", key.String(),
1211			"error", err,
1212		)
1213		return err
1214	}
1215}
1216
1217func (l *State) pruneCheck(id structs.CheckID) {
1218	c := l.checks[id]
1219	if c != nil && c.DeferCheck != nil {
1220		c.DeferCheck.Stop()
1221	}
1222	delete(l.checks, id)
1223}
1224
1225// syncService is used to sync a service to the server
1226func (l *State) syncService(key structs.ServiceID) error {
1227	st := l.aclTokenForServiceSync(key, l.tokens.UserToken)
1228
1229	// If the service has associated checks that are out of sync,
1230	// piggyback them on the service sync so they are part of the
1231	// same transaction and are registered atomically. We only let
1232	// checks ride on service registrations with the same token,
1233	// otherwise we need to register them separately so they don't
1234	// pick up privileges from the service token.
1235	var checks structs.HealthChecks
1236	for checkKey, c := range l.checks {
1237		if c.Deleted || c.InSync {
1238			continue
1239		}
1240		if !key.Matches(c.Check.CompoundServiceID()) {
1241			continue
1242		}
1243		if st != l.aclTokenForCheckSync(checkKey, l.tokens.UserToken) {
1244			continue
1245		}
1246		checks = append(checks, c.Check)
1247	}
1248
1249	req := structs.RegisterRequest{
1250		Datacenter:      l.config.Datacenter,
1251		ID:              l.config.NodeID,
1252		Node:            l.config.NodeName,
1253		Address:         l.config.AdvertiseAddr,
1254		TaggedAddresses: l.config.TaggedAddresses,
1255		NodeMeta:        l.metadata,
1256		Service:         l.services[key].Service,
1257		EnterpriseMeta:  key.EnterpriseMeta,
1258		WriteRequest:    structs.WriteRequest{Token: st},
1259		SkipNodeUpdate:  l.nodeInfoInSync,
1260	}
1261
1262	// Backwards-compatibility for Consul < 0.5
1263	if len(checks) == 1 {
1264		req.Check = checks[0]
1265	} else {
1266		req.Checks = checks
1267	}
1268
1269	var out struct{}
1270	err := l.Delegate.RPC("Catalog.Register", &req, &out)
1271	switch {
1272	case err == nil:
1273		l.services[key].InSync = true
1274		// Given how the register API works, this info is also updated
1275		// every time we sync a service.
1276		l.nodeInfoInSync = true
1277		for _, check := range checks {
1278			checkKey := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
1279			l.checks[checkKey].InSync = true
1280		}
1281		l.logger.Info("Synced service", "service", key.String())
1282		return nil
1283
1284	case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err):
1285		// todo(fs): mark the service and the checks to be in sync to prevent excessive retrying before next full sync
1286		// todo(fs): some backoff strategy might be a better solution
1287		l.services[key].InSync = true
1288		for _, check := range checks {
1289			checkKey := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
1290			l.checks[checkKey].InSync = true
1291		}
1292		accessorID := l.aclAccessorID(st)
1293		l.logger.Warn("Service registration blocked by ACLs", "service", key.String(), "accessorID", accessorID)
1294		metrics.IncrCounter([]string{"acl", "blocked", "service", "registration"}, 1)
1295		return nil
1296
1297	default:
1298		l.logger.Warn("Syncing service failed.",
1299			"service", key.String(),
1300			"error", err,
1301		)
1302		return err
1303	}
1304}
1305
1306// syncCheck is used to sync a check to the server
1307func (l *State) syncCheck(key structs.CheckID) error {
1308	c := l.checks[key]
1309	ct := l.aclTokenForCheckSync(key, l.tokens.UserToken)
1310	req := structs.RegisterRequest{
1311		Datacenter:      l.config.Datacenter,
1312		ID:              l.config.NodeID,
1313		Node:            l.config.NodeName,
1314		Address:         l.config.AdvertiseAddr,
1315		TaggedAddresses: l.config.TaggedAddresses,
1316		NodeMeta:        l.metadata,
1317		Check:           c.Check,
1318		EnterpriseMeta:  c.Check.EnterpriseMeta,
1319		WriteRequest:    structs.WriteRequest{Token: ct},
1320		SkipNodeUpdate:  l.nodeInfoInSync,
1321	}
1322
1323	serviceKey := structs.NewServiceID(c.Check.ServiceID, &key.EnterpriseMeta)
1324
1325	// Pull in the associated service if any
1326	s := l.services[serviceKey]
1327	if s != nil && !s.Deleted {
1328		req.Service = s.Service
1329	}
1330
1331	var out struct{}
1332	err := l.Delegate.RPC("Catalog.Register", &req, &out)
1333	switch {
1334	case err == nil:
1335		l.checks[key].InSync = true
1336		// Given how the register API works, this info is also updated
1337		// every time we sync a check.
1338		l.nodeInfoInSync = true
1339		l.logger.Info("Synced check", "check", key.String())
1340		return nil
1341
1342	case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err):
1343		// todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync
1344		// todo(fs): some backoff strategy might be a better solution
1345		l.checks[key].InSync = true
1346		accessorID := l.aclAccessorID(ct)
1347		l.logger.Warn("Check registration blocked by ACLs", "check", key.String(), "accessorID", accessorID)
1348		metrics.IncrCounter([]string{"acl", "blocked", "check", "registration"}, 1)
1349		return nil
1350
1351	default:
1352		l.logger.Warn("Syncing check failed.",
1353			"check", key.String(),
1354			"error", err,
1355		)
1356		return err
1357	}
1358}
1359
1360func (l *State) syncNodeInfo() error {
1361	at := l.tokens.AgentToken()
1362	req := structs.RegisterRequest{
1363		Datacenter:      l.config.Datacenter,
1364		ID:              l.config.NodeID,
1365		Node:            l.config.NodeName,
1366		Address:         l.config.AdvertiseAddr,
1367		TaggedAddresses: l.config.TaggedAddresses,
1368		NodeMeta:        l.metadata,
1369		WriteRequest:    structs.WriteRequest{Token: at},
1370	}
1371	var out struct{}
1372	err := l.Delegate.RPC("Catalog.Register", &req, &out)
1373	switch {
1374	case err == nil:
1375		l.nodeInfoInSync = true
1376		l.logger.Info("Synced node info")
1377		return nil
1378
1379	case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err):
1380		// todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync
1381		// todo(fs): some backoff strategy might be a better solution
1382		l.nodeInfoInSync = true
1383		accessorID := l.aclAccessorID(at)
1384		l.logger.Warn("Node info update blocked by ACLs", "node", l.config.NodeID, "accessorID", accessorID)
1385		metrics.IncrCounter([]string{"acl", "blocked", "node", "registration"}, 1)
1386		return nil
1387
1388	default:
1389		l.logger.Warn("Syncing node info failed.", "error", err)
1390		return err
1391	}
1392}
1393
1394// notifyIfAliased will notify waiters of changes to an aliased service
1395func (l *State) notifyIfAliased(serviceID structs.ServiceID) {
1396	if aliases, ok := l.checkAliases[serviceID]; ok && len(aliases) > 0 {
1397		for _, notifyCh := range aliases {
1398			// Do not block. All notify channels should be buffered to at
1399			// least 1 in which case not-blocking does not result in loss
1400			// of data because a failed send means a notification is
1401			// already queued. This must be called with the lock held.
1402			select {
1403			case notifyCh <- struct{}{}:
1404			default:
1405			}
1406		}
1407	}
1408}
1409
1410// aclAccessorID is used to convert an ACLToken's secretID to its accessorID for non-
1411// critical purposes, such as logging. Therefore we interpret all errors as empty-string
1412// so we can safely log it without handling non-critical errors at the usage site.
1413func (l *State) aclAccessorID(secretID string) string {
1414	ident, err := l.Delegate.ResolveTokenToIdentity(secretID)
1415	if acl.IsErrNotFound(err) {
1416		return ""
1417	}
1418	if err != nil {
1419		l.logger.Debug("non-critical error resolving acl token accessor for logging", "error", err)
1420		return ""
1421	}
1422	if ident == nil {
1423		return ""
1424	}
1425	return ident.ID()
1426}
1427