1package consul 2 3import ( 4 "fmt" 5 "strings" 6 "sync" 7 "time" 8 9 "github.com/hashicorp/consul/acl" 10 "github.com/hashicorp/consul/agent/consul/state" 11 "github.com/hashicorp/consul/agent/structs" 12 "github.com/hashicorp/go-memdb" 13) 14 15// Coordinate manages queries and updates for network coordinates. 16type Coordinate struct { 17 // srv is a pointer back to the server. 18 srv *Server 19 20 // updates holds pending coordinate updates for the given nodes. This is 21 // keyed by node:segment so we can get a coordinate for each segment for 22 // servers, and we only track the latest update per node:segment. 23 updates map[string]*structs.CoordinateUpdateRequest 24 25 // updatesLock synchronizes access to the updates map. 26 updatesLock sync.Mutex 27} 28 29// NewCoordinate returns a new Coordinate endpoint. 30func NewCoordinate(srv *Server) *Coordinate { 31 c := &Coordinate{ 32 srv: srv, 33 updates: make(map[string]*structs.CoordinateUpdateRequest), 34 } 35 36 go c.batchUpdate() 37 return c 38} 39 40// batchUpdate is a long-running routine that flushes pending coordinates to the 41// Raft log in batches. 42func (c *Coordinate) batchUpdate() { 43 for { 44 select { 45 case <-time.After(c.srv.config.CoordinateUpdatePeriod): 46 if err := c.batchApplyUpdates(); err != nil { 47 c.srv.logger.Printf("[WARN] consul.coordinate: Batch update failed: %v", err) 48 } 49 case <-c.srv.shutdownCh: 50 return 51 } 52 } 53} 54 55// batchApplyUpdates applies all pending updates to the Raft log in a series of 56// batches. 57func (c *Coordinate) batchApplyUpdates() error { 58 // Grab the pending updates and release the lock so we can still handle 59 // incoming messages. 60 c.updatesLock.Lock() 61 pending := c.updates 62 c.updates = make(map[string]*structs.CoordinateUpdateRequest) 63 c.updatesLock.Unlock() 64 65 // Enforce the rate limit. 66 limit := c.srv.config.CoordinateUpdateBatchSize * c.srv.config.CoordinateUpdateMaxBatches 67 size := len(pending) 68 if size > limit { 69 c.srv.logger.Printf("[WARN] consul.coordinate: Discarded %d coordinate updates", size-limit) 70 size = limit 71 } 72 73 // Transform the map into a slice that we can feed to the Raft log in 74 // batches. 75 i := 0 76 updates := make(structs.Coordinates, size) 77 for _, update := range pending { 78 if !(i < size) { 79 break 80 } 81 82 updates[i] = &structs.Coordinate{ 83 Node: update.Node, 84 Segment: update.Segment, 85 Coord: update.Coord, 86 } 87 i++ 88 } 89 90 // Apply the updates to the Raft log in batches. 91 for start := 0; start < size; start += c.srv.config.CoordinateUpdateBatchSize { 92 end := start + c.srv.config.CoordinateUpdateBatchSize 93 if end > size { 94 end = size 95 } 96 97 // We set the "safe to ignore" flag on this update type so old 98 // servers don't crash if they see one of these. 99 t := structs.CoordinateBatchUpdateType | structs.IgnoreUnknownTypeFlag 100 101 slice := updates[start:end] 102 resp, err := c.srv.raftApply(t, slice) 103 if err != nil { 104 return err 105 } 106 if respErr, ok := resp.(error); ok { 107 return respErr 108 } 109 } 110 return nil 111} 112 113// Update inserts or updates the LAN coordinate of a node. 114func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { 115 if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { 116 return err 117 } 118 119 // Older clients can send coordinates with invalid numeric values like 120 // NaN and Inf. We guard against these coming in, though newer clients 121 // should never send these. 122 if !args.Coord.IsValid() { 123 return fmt.Errorf("invalid coordinate") 124 } 125 126 // Since this is a coordinate coming from some place else we harden this 127 // and look for dimensionality problems proactively. 128 coord, err := c.srv.serfLAN.GetCoordinate() 129 if err != nil { 130 return err 131 } 132 if !coord.IsCompatibleWith(args.Coord) { 133 return fmt.Errorf("incompatible coordinate") 134 } 135 136 // Fetch the ACL token, if any, and enforce the node policy if enabled. 137 rule, err := c.srv.resolveToken(args.Token) 138 if err != nil { 139 return err 140 } 141 if rule != nil && c.srv.config.ACLEnforceVersion8 { 142 // We don't enforce the sentinel policy here, since at this time 143 // sentinel only applies to creating or updating node or service 144 // info, not updating coordinates. 145 if !rule.NodeWrite(args.Node, nil) { 146 return acl.ErrPermissionDenied 147 } 148 } 149 150 // Add the coordinate to the map of pending updates. 151 key := fmt.Sprintf("%s:%s", args.Node, args.Segment) 152 c.updatesLock.Lock() 153 c.updates[key] = args 154 c.updatesLock.Unlock() 155 return nil 156} 157 158// ListDatacenters returns the list of datacenters and their respective nodes 159// and the raw coordinates of those nodes (if no coordinates are available for 160// any of the nodes, the node list may be empty). 161func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.DatacenterMap) error { 162 maps, err := c.srv.router.GetDatacenterMaps() 163 if err != nil { 164 return err 165 } 166 167 // Strip the datacenter suffixes from all the node names. 168 for i := range maps { 169 suffix := fmt.Sprintf(".%s", maps[i].Datacenter) 170 for j := range maps[i].Coordinates { 171 node := maps[i].Coordinates[j].Node 172 maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix) 173 } 174 } 175 176 *reply = maps 177 return nil 178} 179 180// ListNodes returns the list of nodes with their raw network coordinates (if no 181// coordinates are available for a node it won't appear in this list). 182func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { 183 if done, err := c.srv.forward("Coordinate.ListNodes", args, args, reply); done { 184 return err 185 } 186 187 return c.srv.blockingQuery(&args.QueryOptions, 188 &reply.QueryMeta, 189 func(ws memdb.WatchSet, state *state.Store) error { 190 index, coords, err := state.Coordinates(ws) 191 if err != nil { 192 return err 193 } 194 195 reply.Index, reply.Coordinates = index, coords 196 if err := c.srv.filterACL(args.Token, reply); err != nil { 197 return err 198 } 199 200 return nil 201 }) 202} 203