1package consul 2 3import ( 4 "fmt" 5 "strings" 6 "sync" 7 "time" 8 9 "github.com/hashicorp/consul/acl" 10 "github.com/hashicorp/consul/agent/consul/state" 11 "github.com/hashicorp/consul/agent/structs" 12 "github.com/hashicorp/consul/logging" 13 "github.com/hashicorp/go-hclog" 14 "github.com/hashicorp/go-memdb" 15) 16 17// Coordinate manages queries and updates for network coordinates. 18type Coordinate struct { 19 // srv is a pointer back to the server. 20 srv *Server 21 22 logger hclog.Logger 23 24 // updates holds pending coordinate updates for the given nodes. This is 25 // keyed by node:segment so we can get a coordinate for each segment for 26 // servers, and we only track the latest update per node:segment. 27 updates map[string]*structs.CoordinateUpdateRequest 28 29 // updatesLock synchronizes access to the updates map. 30 updatesLock sync.Mutex 31} 32 33// NewCoordinate returns a new Coordinate endpoint. 34func NewCoordinate(srv *Server, logger hclog.Logger) *Coordinate { 35 c := &Coordinate{ 36 srv: srv, 37 logger: logger.Named(logging.Coordinate), 38 updates: make(map[string]*structs.CoordinateUpdateRequest), 39 } 40 41 go c.batchUpdate() 42 return c 43} 44 45// batchUpdate is a long-running routine that flushes pending coordinates to the 46// Raft log in batches. 47func (c *Coordinate) batchUpdate() { 48 for { 49 select { 50 case <-time.After(c.srv.config.CoordinateUpdatePeriod): 51 if err := c.batchApplyUpdates(); err != nil { 52 c.logger.Warn("Batch update failed", "error", err) 53 } 54 case <-c.srv.shutdownCh: 55 return 56 } 57 } 58} 59 60// batchApplyUpdates applies all pending updates to the Raft log in a series of 61// batches. 62func (c *Coordinate) batchApplyUpdates() error { 63 // Grab the pending updates and release the lock so we can still handle 64 // incoming messages. 65 c.updatesLock.Lock() 66 pending := c.updates 67 c.updates = make(map[string]*structs.CoordinateUpdateRequest) 68 c.updatesLock.Unlock() 69 70 // Enforce the rate limit. 71 limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches 72 size := len(pending) 73 if size > limit { 74 c.logger.Warn("Discarded coordinate updates", "number_discarded", size-limit) 75 size = limit 76 } 77 78 // Transform the map into a slice that we can feed to the Raft log in 79 // batches. 80 i := 0 81 updates := make(structs.Coordinates, size) 82 for _, update := range pending { 83 if !(i < size) { 84 break 85 } 86 87 updates[i] = &structs.Coordinate{ 88 Node: update.Node, 89 Segment: update.Segment, 90 Coord: update.Coord, 91 } 92 i++ 93 } 94 95 // Apply the updates to the Raft log in batches. 96 for start := 0; start < size; start += c.srv.config.CoordinateUpdateBatchSize { 97 end := start + c.srv.config.CoordinateUpdateBatchSize 98 if end > size { 99 end = size 100 } 101 102 // We set the "safe to ignore" flag on this update type so old 103 // servers don't crash if they see one of these. 104 t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag 105 106 slice := updates[start:end] 107 resp, err := c.srv.raftApply(t, slice) 108 if err != nil { 109 return err 110 } 111 if respErr, ok := resp.(error); ok { 112 return respErr 113 } 114 } 115 return nil 116} 117 118// Update inserts or updates the LAN coordinate of a node. 119func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { 120 if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { 121 return err 122 } 123 124 // Older clients can send coordinates with invalid numeric values like 125 // NaN and Inf. We guard against these coming in, though newer clients 126 // should never send these. 127 if !args.Coord.IsValid() { 128 return fmt.Errorf("invalid coordinate") 129 } 130 131 // Since this is a coordinate coming from some place else we harden this 132 // and look for dimensionality problems proactively. 133 coord, err := c.srv.serfLAN.GetCoordinate() 134 if err != nil { 135 return err 136 } 137 if !coord.IsCompatibleWith(args.Coord) { 138 return fmt.Errorf("incompatible coordinate") 139 } 140 141 // Fetch the ACL token, if any, and enforce the node policy if enabled. 142 authz, err := c.srv.ResolveToken(args.Token) 143 if err != nil { 144 return err 145 } 146 if authz != nil && c.srv.config.ACLEnforceVersion8 { 147 var authzContext acl.AuthorizerContext 148 structs.DefaultEnterpriseMeta().FillAuthzContext(&authzContext) 149 if authz.NodeWrite(args.Node, &authzContext) != acl.Allow { 150 return acl.ErrPermissionDenied 151 } 152 } 153 154 // Add the coordinate to the map of pending updates. 155 key := fmt.Sprintf("%s:%s", args.Node, args.Segment) 156 c.updatesLock.Lock() 157 c.updates[key] = args 158 c.updatesLock.Unlock() 159 return nil 160} 161 162// ListDatacenters returns the list of datacenters and their respective nodes 163// and the raw coordinates of those nodes (if no coordinates are available for 164// any of the nodes, the node list may be empty). 165func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error { 166 maps, err := c.srv.router.GetDatacenterMaps() 167 if err != nil { 168 return err 169 } 170 171 // Strip the datacenter suffixes from all the node names. 172 for i := range maps { 173 suffix := fmt.Sprintf(".%s", maps[i].Datacenter) 174 for j := range maps[i].Coordinates { 175 node := maps[i].Coordinates[j].Node 176 maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix) 177 } 178 } 179 180 *reply = maps 181 return nil 182} 183 184// ListNodes returns the list of nodes with their raw network coordinates (if no 185// coordinates are available for a node it won't appear in this list). 186func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { 187 if done, err := c.srv.forward("Coordinate.ListNodes", args, args, reply); done { 188 return err 189 } 190 191 return c.srv.blockingQuery(&args.QueryOptions, 192 &reply.QueryMeta, 193 func(ws memdb.WatchSet, state *state.Store) error { 194 index, coords, err := state.Coordinates(ws) 195 if err != nil { 196 return err 197 } 198 199 reply.Index, reply.Coordinates = index, coords 200 if err := c.srv.filterACL(args.Token, reply); err != nil { 201 return err 202 } 203 204 return nil 205 }) 206} 207 208// Node returns the raw coordinates for a single node. 209func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { 210 if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done { 211 return err 212 } 213 214 // Fetch the ACL token, if any, and enforce the node policy if enabled. 215 216 authz, err := c.srv.ResolveToken(args.Token) 217 if err != nil { 218 return err 219 } 220 if authz != nil && c.srv.config.ACLEnforceVersion8 { 221 var authzContext acl.AuthorizerContext 222 structs.WildcardEnterpriseMeta().FillAuthzContext(&authzContext) 223 if authz.NodeRead(args.Node, &authzContext) != acl.Allow { 224 return acl.ErrPermissionDenied 225 } 226 } 227 228 return c.srv.blockingQuery(&args.QueryOptions, 229 &reply.QueryMeta, 230 func(ws memdb.WatchSet, state *state.Store) error { 231 index, nodeCoords, err := state.Coordinate(args.Node, ws) 232 if err != nil { 233 return err 234 } 235 236 var coords structs.Coordinates 237 for segment, coord := range nodeCoords { 238 coords = append(coords, &structs.Coordinate{ 239 Node: args.Node, 240 Segment: segment, 241 Coord: coord, 242 }) 243 } 244 reply.Index, reply.Coordinates = index, coords 245 return nil 246 }) 247} 248