1package consul
2
3import (
4	"fmt"
5	"strings"
6	"sync"
7	"time"
8
9	"github.com/hashicorp/consul/acl"
10	"github.com/hashicorp/consul/agent/consul/state"
11	"github.com/hashicorp/consul/agent/structs"
12	"github.com/hashicorp/consul/logging"
13	"github.com/hashicorp/go-hclog"
14	"github.com/hashicorp/go-memdb"
15)
16
17// Coordinate manages queries and updates for network coordinates.
18type Coordinate struct {
19	// srv is a pointer back to the server.
20	srv *Server
21
22	logger hclog.Logger
23
24	// updates holds pending coordinate updates for the given nodes. This is
25	// keyed by node:segment so we can get a coordinate for each segment for
26	// servers, and we only track the latest update per node:segment.
27	updates map[string]*structs.CoordinateUpdateRequest
28
29	// updatesLock synchronizes access to the updates map.
30	updatesLock sync.Mutex
31}
32
33// NewCoordinate returns a new Coordinate endpoint.
34func NewCoordinate(srv *Server, logger hclog.Logger) *Coordinate {
35	c := &Coordinate{
36		srv:     srv,
37		logger:  logger.Named(logging.Coordinate),
38		updates: make(map[string]*structs.CoordinateUpdateRequest),
39	}
40
41	go c.batchUpdate()
42	return c
43}
44
45// batchUpdate is a long-running routine that flushes pending coordinates to the
46// Raft log in batches.
47func (c *Coordinate) batchUpdate() {
48	for {
49		select {
50		case <-time.After(c.srv.config.CoordinateUpdatePeriod):
51			if err := c.batchApplyUpdates(); err != nil {
52				c.logger.Warn("Batch update failed", "error", err)
53			}
54		case <-c.srv.shutdownCh:
55			return
56		}
57	}
58}
59
60// batchApplyUpdates applies all pending updates to the Raft log in a series of
61// batches.
62func (c *Coordinate) batchApplyUpdates() error {
63	// Grab the pending updates and release the lock so we can still handle
64	// incoming messages.
65	c.updatesLock.Lock()
66	pending := c.updates
67	c.updates = make(map[string]*structs.CoordinateUpdateRequest)
68	c.updatesLock.Unlock()
69
70	// Enforce the rate limit.
71	limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
72	size := len(pending)
73	if size > limit {
74		c.logger.Warn("Discarded coordinate updates", "number_discarded", size-limit)
75		size = limit
76	}
77
78	// Transform the map into a slice that we can feed to the Raft log in
79	// batches.
80	i := 0
81	updates := make(structs.Coordinates, size)
82	for _, update := range pending {
83		if !(i < size) {
84			break
85		}
86
87		updates[i] = &structs.Coordinate{
88			Node:    update.Node,
89			Segment: update.Segment,
90			Coord:   update.Coord,
91		}
92		i++
93	}
94
95	// Apply the updates to the Raft log in batches.
96	for start := 0; start < size; start += c.srv.config.CoordinateUpdateBatchSize {
97		end := start + c.srv.config.CoordinateUpdateBatchSize
98		if end > size {
99			end = size
100		}
101
102		// We set the "safe to ignore" flag on this update type so old
103		// servers don't crash if they see one of these.
104		t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag
105
106		slice := updates[start:end]
107		resp, err := c.srv.raftApply(t, slice)
108		if err != nil {
109			return err
110		}
111		if respErr, ok := resp.(error); ok {
112			return respErr
113		}
114	}
115	return nil
116}
117
118// Update inserts or updates the LAN coordinate of a node.
119func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) {
120	if done, err := c.srv.ForwardRPC("Coordinate.Update", args, args, reply); done {
121		return err
122	}
123
124	// Older clients can send coordinates with invalid numeric values like
125	// NaN and Inf. We guard against these coming in, though newer clients
126	// should never send these.
127	if !args.Coord.IsValid() {
128		return fmt.Errorf("invalid coordinate")
129	}
130
131	// Since this is a coordinate coming from some place else we harden this
132	// and look for dimensionality problems proactively.
133	coord, err := c.srv.serfLAN.GetCoordinate()
134	if err != nil {
135		return err
136	}
137	if !coord.IsCompatibleWith(args.Coord) {
138		return fmt.Errorf("incompatible coordinate")
139	}
140
141	// Fetch the ACL token, if any, and enforce the node policy if enabled.
142	authz, err := c.srv.ResolveToken(args.Token)
143	if err != nil {
144		return err
145	}
146	if authz != nil {
147		var authzContext acl.AuthorizerContext
148		structs.DefaultEnterpriseMeta().FillAuthzContext(&authzContext)
149		if authz.NodeWrite(args.Node, &authzContext) != acl.Allow {
150			return acl.ErrPermissionDenied
151		}
152	}
153
154	// Add the coordinate to the map of pending updates.
155	key := fmt.Sprintf("%s:%s", args.Node, args.Segment)
156	c.updatesLock.Lock()
157	c.updates[key] = args
158	c.updatesLock.Unlock()
159	return nil
160}
161
162// ListDatacenters returns the list of datacenters and their respective nodes
163// and the raw coordinates of those nodes (if no coordinates are available for
164// any of the nodes, the node list may be empty).
165func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
166	maps, err := c.srv.router.GetDatacenterMaps()
167	if err != nil {
168		return err
169	}
170
171	// Strip the datacenter suffixes from all the node names.
172	for i := range maps {
173		suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
174		for j := range maps[i].Coordinates {
175			node := maps[i].Coordinates[j].Node
176			maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
177		}
178	}
179
180	*reply = maps
181	return nil
182}
183
184// ListNodes returns the list of nodes with their raw network coordinates (if no
185// coordinates are available for a node it won't appear in this list).
186func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error {
187	if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, args, reply); done {
188		return err
189	}
190
191	return c.srv.blockingQuery(&args.QueryOptions,
192		&reply.QueryMeta,
193		func(ws memdb.WatchSet, state *state.Store) error {
194			index, coords, err := state.Coordinates(ws)
195			if err != nil {
196				return err
197			}
198
199			reply.Index, reply.Coordinates = index, coords
200			if err := c.srv.filterACL(args.Token, reply); err != nil {
201				return err
202			}
203
204			return nil
205		})
206}
207
208// Node returns the raw coordinates for a single node.
209func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error {
210	if done, err := c.srv.ForwardRPC("Coordinate.Node", args, args, reply); done {
211		return err
212	}
213
214	// Fetch the ACL token, if any, and enforce the node policy if enabled.
215
216	authz, err := c.srv.ResolveToken(args.Token)
217	if err != nil {
218		return err
219	}
220	if authz != nil {
221		var authzContext acl.AuthorizerContext
222		structs.WildcardEnterpriseMeta().FillAuthzContext(&authzContext)
223		if authz.NodeRead(args.Node, &authzContext) != acl.Allow {
224			return acl.ErrPermissionDenied
225		}
226	}
227
228	return c.srv.blockingQuery(&args.QueryOptions,
229		&reply.QueryMeta,
230		func(ws memdb.WatchSet, state *state.Store) error {
231			index, nodeCoords, err := state.Coordinate(args.Node, ws)
232			if err != nil {
233				return err
234			}
235
236			var coords structs.Coordinates
237			for segment, coord := range nodeCoords {
238				coords = append(coords, &structs.Coordinate{
239					Node:    args.Node,
240					Segment: segment,
241					Coord:   coord,
242				})
243			}
244			reply.Index, reply.Coordinates = index, coords
245			return nil
246		})
247}
248