1package consul 2 3import ( 4 "fmt" 5 "strings" 6 "sync" 7 "time" 8 9 "github.com/hashicorp/go-hclog" 10 "github.com/hashicorp/go-memdb" 11 12 "github.com/hashicorp/consul/acl" 13 "github.com/hashicorp/consul/agent/consul/state" 14 "github.com/hashicorp/consul/agent/structs" 15 "github.com/hashicorp/consul/logging" 16 "github.com/hashicorp/consul/types" 17) 18 19// Coordinate manages queries and updates for network coordinates. 20type Coordinate struct { 21 // srv is a pointer back to the server. 22 srv *Server 23 24 logger hclog.Logger 25 26 // updates holds pending coordinate updates for the given nodes. This is 27 // keyed by node:segment so we can get a coordinate for each segment for 28 // servers, and we only track the latest update per node:segment. 29 updates map[string]*structs.CoordinateUpdateRequest 30 31 // updatesLock synchronizes access to the updates map. 32 updatesLock sync.Mutex 33} 34 35// NewCoordinate returns a new Coordinate endpoint. 36func NewCoordinate(srv *Server, logger hclog.Logger) *Coordinate { 37 c := &Coordinate{ 38 srv: srv, 39 logger: logger.Named(logging.Coordinate), 40 updates: make(map[string]*structs.CoordinateUpdateRequest), 41 } 42 43 go c.batchUpdate() 44 return c 45} 46 47// batchUpdate is a long-running routine that flushes pending coordinates to the 48// Raft log in batches. 49func (c *Coordinate) batchUpdate() { 50 for { 51 select { 52 case <-time.After(c.srv.config.CoordinateUpdatePeriod): 53 if err := c.batchApplyUpdates(); err != nil { 54 c.logger.Warn("Batch update failed", "error", err) 55 } 56 case <-c.srv.shutdownCh: 57 return 58 } 59 } 60} 61 62// batchApplyUpdates applies all pending updates to the Raft log in a series of 63// batches. 64func (c *Coordinate) batchApplyUpdates() error { 65 // Grab the pending updates and release the lock so we can still handle 66 // incoming messages. 67 c.updatesLock.Lock() 68 pending := c.updates 69 c.updates = make(map[string]*structs.CoordinateUpdateRequest) 70 c.updatesLock.Unlock() 71 72 // Enforce the rate limit. 73 limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches 74 size := len(pending) 75 if size > limit { 76 c.logger.Warn("Discarded coordinate updates", "number_discarded", size-limit) 77 size = limit 78 } 79 80 // Transform the map into a slice that we can feed to the Raft log in 81 // batches. 82 i := 0 83 updates := make(structs.Coordinates, size) 84 for _, update := range pending { 85 if !(i < size) { 86 break 87 } 88 89 updates[i] = &structs.Coordinate{ 90 Node: update.Node, 91 Segment: update.Segment, 92 Coord: update.Coord, 93 } 94 i++ 95 } 96 97 // Apply the updates to the Raft log in batches. 98 for start := 0; start < size; start += c.srv.config.CoordinateUpdateBatchSize { 99 end := start + c.srv.config.CoordinateUpdateBatchSize 100 if end > size { 101 end = size 102 } 103 104 // We set the "safe to ignore" flag on this update type so old 105 // servers don't crash if they see one of these. 106 t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag 107 108 slice := updates[start:end] 109 _, err := c.srv.raftApply(t, slice) 110 if err != nil { 111 return err 112 } 113 } 114 return nil 115} 116 117// Update inserts or updates the LAN coordinate of a node. 118func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { 119 if done, err := c.srv.ForwardRPC("Coordinate.Update", args, reply); done { 120 return err 121 } 122 123 // Older clients can send coordinates with invalid numeric values like 124 // NaN and Inf. We guard against these coming in, though newer clients 125 // should never send these. 126 if !args.Coord.IsValid() { 127 return fmt.Errorf("invalid coordinate") 128 } 129 130 // Since this is a coordinate coming from some place else we harden this 131 // and look for dimensionality problems proactively. 132 coord, err := c.srv.serfLAN.GetCoordinate() 133 if err != nil { 134 return err 135 } 136 if !coord.IsCompatibleWith(args.Coord) { 137 return fmt.Errorf("incompatible coordinate") 138 } 139 140 // Fetch the ACL token, if any, and enforce the node policy if enabled. 141 authz, err := c.srv.ResolveToken(args.Token) 142 if err != nil { 143 return err 144 } 145 if authz != nil { 146 var authzContext acl.AuthorizerContext 147 structs.DefaultEnterpriseMeta().FillAuthzContext(&authzContext) 148 if authz.NodeWrite(args.Node, &authzContext) != acl.Allow { 149 return acl.ErrPermissionDenied 150 } 151 } 152 153 // Add the coordinate to the map of pending updates. 154 key := fmt.Sprintf("%s:%s", args.Node, args.Segment) 155 c.updatesLock.Lock() 156 c.updates[key] = args 157 c.updatesLock.Unlock() 158 return nil 159} 160 161// ListDatacenters returns the list of datacenters and their respective nodes 162// and the raw coordinates of those nodes (if no coordinates are available for 163// any of the nodes, the node list may be empty). This endpoint will not return 164// information about the LAN network area. 165func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error { 166 maps, err := c.srv.router.GetDatacenterMaps() 167 if err != nil { 168 return err 169 } 170 171 var out []structs.DatacenterMap 172 173 // Strip the datacenter suffixes from all the node names. 174 for _, dcMap := range maps { 175 if dcMap.AreaID == types.AreaLAN { 176 continue 177 } 178 179 suffix := fmt.Sprintf(".%s", dcMap.Datacenter) 180 for j := range dcMap.Coordinates { 181 node := dcMap.Coordinates[j].Node 182 dcMap.Coordinates[j].Node = strings.TrimSuffix(node, suffix) 183 } 184 185 out = append(out, dcMap) 186 } 187 188 *reply = out 189 return nil 190} 191 192// ListNodes returns the list of nodes with their raw network coordinates (if no 193// coordinates are available for a node it won't appear in this list). 194func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { 195 if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, reply); done { 196 return err 197 } 198 199 return c.srv.blockingQuery(&args.QueryOptions, 200 &reply.QueryMeta, 201 func(ws memdb.WatchSet, state *state.Store) error { 202 index, coords, err := state.Coordinates(ws) 203 if err != nil { 204 return err 205 } 206 207 reply.Index, reply.Coordinates = index, coords 208 if err := c.srv.filterACL(args.Token, reply); err != nil { 209 return err 210 } 211 212 return nil 213 }) 214} 215 216// Node returns the raw coordinates for a single node. 217func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { 218 if done, err := c.srv.ForwardRPC("Coordinate.Node", args, reply); done { 219 return err 220 } 221 222 // Fetch the ACL token, if any, and enforce the node policy if enabled. 223 224 authz, err := c.srv.ResolveToken(args.Token) 225 if err != nil { 226 return err 227 } 228 if authz != nil { 229 var authzContext acl.AuthorizerContext 230 structs.WildcardEnterpriseMeta().FillAuthzContext(&authzContext) 231 if authz.NodeRead(args.Node, &authzContext) != acl.Allow { 232 return acl.ErrPermissionDenied 233 } 234 } 235 236 return c.srv.blockingQuery(&args.QueryOptions, 237 &reply.QueryMeta, 238 func(ws memdb.WatchSet, state *state.Store) error { 239 index, nodeCoords, err := state.Coordinate(args.Node, ws) 240 if err != nil { 241 return err 242 } 243 244 var coords structs.Coordinates 245 for segment, coord := range nodeCoords { 246 coords = append(coords, &structs.Coordinate{ 247 Node: args.Node, 248 Segment: segment, 249 Coord: coord, 250 }) 251 } 252 reply.Index, reply.Coordinates = index, coords 253 return nil 254 }) 255} 256