1package consul
2
3import (
4	"fmt"
5	"strings"
6	"sync"
7	"time"
8
9	"github.com/hashicorp/go-hclog"
10	"github.com/hashicorp/go-memdb"
11
12	"github.com/hashicorp/consul/acl"
13	"github.com/hashicorp/consul/agent/consul/state"
14	"github.com/hashicorp/consul/agent/structs"
15	"github.com/hashicorp/consul/logging"
16	"github.com/hashicorp/consul/types"
17)
18
19// Coordinate manages queries and updates for network coordinates.
20type Coordinate struct {
21	// srv is a pointer back to the server.
22	srv *Server
23
24	logger hclog.Logger
25
26	// updates holds pending coordinate updates for the given nodes. This is
27	// keyed by node:segment so we can get a coordinate for each segment for
28	// servers, and we only track the latest update per node:segment.
29	updates map[string]*structs.CoordinateUpdateRequest
30
31	// updatesLock synchronizes access to the updates map.
32	updatesLock sync.Mutex
33}
34
35// NewCoordinate returns a new Coordinate endpoint.
36func NewCoordinate(srv *Server, logger hclog.Logger) *Coordinate {
37	c := &Coordinate{
38		srv:     srv,
39		logger:  logger.Named(logging.Coordinate),
40		updates: make(map[string]*structs.CoordinateUpdateRequest),
41	}
42
43	go c.batchUpdate()
44	return c
45}
46
47// batchUpdate is a long-running routine that flushes pending coordinates to the
48// Raft log in batches.
49func (c *Coordinate) batchUpdate() {
50	for {
51		select {
52		case <-time.After(c.srv.config.CoordinateUpdatePeriod):
53			if err := c.batchApplyUpdates(); err != nil {
54				c.logger.Warn("Batch update failed", "error", err)
55			}
56		case <-c.srv.shutdownCh:
57			return
58		}
59	}
60}
61
62// batchApplyUpdates applies all pending updates to the Raft log in a series of
63// batches.
64func (c *Coordinate) batchApplyUpdates() error {
65	// Grab the pending updates and release the lock so we can still handle
66	// incoming messages.
67	c.updatesLock.Lock()
68	pending := c.updates
69	c.updates = make(map[string]*structs.CoordinateUpdateRequest)
70	c.updatesLock.Unlock()
71
72	// Enforce the rate limit.
73	limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches
74	size := len(pending)
75	if size > limit {
76		c.logger.Warn("Discarded coordinate updates", "number_discarded", size-limit)
77		size = limit
78	}
79
80	// Transform the map into a slice that we can feed to the Raft log in
81	// batches.
82	i := 0
83	updates := make(structs.Coordinates, size)
84	for _, update := range pending {
85		if !(i < size) {
86			break
87		}
88
89		updates[i] = &structs.Coordinate{
90			Node:    update.Node,
91			Segment: update.Segment,
92			Coord:   update.Coord,
93		}
94		i++
95	}
96
97	// Apply the updates to the Raft log in batches.
98	for start := 0; start < size; start += c.srv.config.CoordinateUpdateBatchSize {
99		end := start + c.srv.config.CoordinateUpdateBatchSize
100		if end > size {
101			end = size
102		}
103
104		// We set the "safe to ignore" flag on this update type so old
105		// servers don't crash if they see one of these.
106		t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag
107
108		slice := updates[start:end]
109		_, err := c.srv.raftApply(t, slice)
110		if err != nil {
111			return err
112		}
113	}
114	return nil
115}
116
117// Update inserts or updates the LAN coordinate of a node.
118func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) {
119	if done, err := c.srv.ForwardRPC("Coordinate.Update", args, reply); done {
120		return err
121	}
122
123	// Older clients can send coordinates with invalid numeric values like
124	// NaN and Inf. We guard against these coming in, though newer clients
125	// should never send these.
126	if !args.Coord.IsValid() {
127		return fmt.Errorf("invalid coordinate")
128	}
129
130	// Since this is a coordinate coming from some place else we harden this
131	// and look for dimensionality problems proactively.
132	coord, err := c.srv.serfLAN.GetCoordinate()
133	if err != nil {
134		return err
135	}
136	if !coord.IsCompatibleWith(args.Coord) {
137		return fmt.Errorf("incompatible coordinate")
138	}
139
140	// Fetch the ACL token, if any, and enforce the node policy if enabled.
141	authz, err := c.srv.ResolveToken(args.Token)
142	if err != nil {
143		return err
144	}
145	if authz != nil {
146		var authzContext acl.AuthorizerContext
147		structs.DefaultEnterpriseMeta().FillAuthzContext(&authzContext)
148		if authz.NodeWrite(args.Node, &authzContext) != acl.Allow {
149			return acl.ErrPermissionDenied
150		}
151	}
152
153	// Add the coordinate to the map of pending updates.
154	key := fmt.Sprintf("%s:%s", args.Node, args.Segment)
155	c.updatesLock.Lock()
156	c.updates[key] = args
157	c.updatesLock.Unlock()
158	return nil
159}
160
161// ListDatacenters returns the list of datacenters and their respective nodes
162// and the raw coordinates of those nodes (if no coordinates are available for
163// any of the nodes, the node list may be empty). This endpoint will not return
164// information about the LAN network area.
165func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error {
166	maps, err := c.srv.router.GetDatacenterMaps()
167	if err != nil {
168		return err
169	}
170
171	var out []structs.DatacenterMap
172
173	// Strip the datacenter suffixes from all the node names.
174	for _, dcMap := range maps {
175		if dcMap.AreaID == types.AreaLAN {
176			continue
177		}
178
179		suffix := fmt.Sprintf(".%s", dcMap.Datacenter)
180		for j := range dcMap.Coordinates {
181			node := dcMap.Coordinates[j].Node
182			dcMap.Coordinates[j].Node = strings.TrimSuffix(node, suffix)
183		}
184
185		out = append(out, dcMap)
186	}
187
188	*reply = out
189	return nil
190}
191
192// ListNodes returns the list of nodes with their raw network coordinates (if no
193// coordinates are available for a node it won't appear in this list).
194func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error {
195	if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, reply); done {
196		return err
197	}
198
199	return c.srv.blockingQuery(&args.QueryOptions,
200		&reply.QueryMeta,
201		func(ws memdb.WatchSet, state *state.Store) error {
202			index, coords, err := state.Coordinates(ws)
203			if err != nil {
204				return err
205			}
206
207			reply.Index, reply.Coordinates = index, coords
208			if err := c.srv.filterACL(args.Token, reply); err != nil {
209				return err
210			}
211
212			return nil
213		})
214}
215
216// Node returns the raw coordinates for a single node.
217func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error {
218	if done, err := c.srv.ForwardRPC("Coordinate.Node", args, reply); done {
219		return err
220	}
221
222	// Fetch the ACL token, if any, and enforce the node policy if enabled.
223
224	authz, err := c.srv.ResolveToken(args.Token)
225	if err != nil {
226		return err
227	}
228	if authz != nil {
229		var authzContext acl.AuthorizerContext
230		structs.WildcardEnterpriseMeta().FillAuthzContext(&authzContext)
231		if authz.NodeRead(args.Node, &authzContext) != acl.Allow {
232			return acl.ErrPermissionDenied
233		}
234	}
235
236	return c.srv.blockingQuery(&args.QueryOptions,
237		&reply.QueryMeta,
238		func(ws memdb.WatchSet, state *state.Store) error {
239			index, nodeCoords, err := state.Coordinate(args.Node, ws)
240			if err != nil {
241				return err
242			}
243
244			var coords structs.Coordinates
245			for segment, coord := range nodeCoords {
246				coords = append(coords, &structs.Coordinate{
247					Node:    args.Node,
248					Segment: segment,
249					Coord:   coord,
250				})
251			}
252			reply.Index, reply.Coordinates = index, coords
253			return nil
254		})
255}
256