1package consul 2 3import ( 4 "fmt" 5 "sort" 6 "time" 7 8 "github.com/armon/go-metrics" 9 "github.com/hashicorp/consul/agent/structs" 10 "github.com/hashicorp/consul/lib" 11) 12 13// aclIterator simplifies the algorithm below by providing a basic iterator that 14// moves through a list of ACLs and returns nil when it's exhausted. It also has 15// methods for pre-sorting the ACLs being iterated over by ID, which should 16// already be true, but since this is crucial for correctness and we are taking 17// input from other servers, we sort to make sure. 18type aclIterator struct { 19 acls structs.ACLs 20 21 // index is the current position of the iterator. 22 index int 23} 24 25// newACLIterator returns a new ACL iterator. 26func newACLIterator(acls structs.ACLs) *aclIterator { 27 return &aclIterator{acls: acls} 28} 29 30// See sort.Interface. 31func (a *aclIterator) Len() int { 32 return len(a.acls) 33} 34 35// See sort.Interface. 36func (a *aclIterator) Swap(i, j int) { 37 a.acls[i], a.acls[j] = a.acls[j], a.acls[i] 38} 39 40// See sort.Interface. 41func (a *aclIterator) Less(i, j int) bool { 42 return a.acls[i].ID < a.acls[j].ID 43} 44 45// Front returns the item at index position, or nil if the list is exhausted. 46func (a *aclIterator) Front() *structs.ACL { 47 if a.index < len(a.acls) { 48 return a.acls[a.index] 49 } 50 return nil 51} 52 53// Next advances the iterator to the next index. 54func (a *aclIterator) Next() { 55 a.index++ 56} 57 58// reconcileACLs takes the local and remote ACL state, and produces a list of 59// changes required in order to bring the local ACLs into sync with the remote 60// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded 61// up to that remote index and it will make this process more efficient by only 62// comparing ACL entries modified after that index. Setting this to 0 will force 63// a full compare of all existing ACLs. 64func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests { 65 // Since sorting the lists is crucial for correctness, we are depending 66 // on data coming from other servers potentially running a different, 67 // version of Consul, and sorted-ness is kind of a subtle property of 68 // the state store indexing, it's prudent to make sure things are sorted 69 // before we begin. 70 localIter, remoteIter := newACLIterator(local), newACLIterator(remote) 71 sort.Sort(localIter) 72 sort.Sort(remoteIter) 73 74 // Run through both lists and reconcile them. 75 var changes structs.ACLRequests 76 for localIter.Front() != nil || remoteIter.Front() != nil { 77 // If the local list is exhausted, then process this as a remote 78 // add. We know from the loop condition that there's something 79 // in the remote list. 80 if localIter.Front() == nil { 81 changes = append(changes, &structs.ACLRequest{ 82 Op: structs.ACLSet, 83 ACL: *(remoteIter.Front()), 84 }) 85 remoteIter.Next() 86 continue 87 } 88 89 // If the remote list is exhausted, then process this as a local 90 // delete. We know from the loop condition that there's something 91 // in the local list. 92 if remoteIter.Front() == nil { 93 changes = append(changes, &structs.ACLRequest{ 94 Op: structs.ACLDelete, 95 ACL: *(localIter.Front()), 96 }) 97 localIter.Next() 98 continue 99 } 100 101 // At this point we know there's something at the front of each 102 // list we need to resolve. 103 104 // If the remote list has something local doesn't, we add it. 105 if localIter.Front().ID > remoteIter.Front().ID { 106 changes = append(changes, &structs.ACLRequest{ 107 Op: structs.ACLSet, 108 ACL: *(remoteIter.Front()), 109 }) 110 remoteIter.Next() 111 continue 112 } 113 114 // If local has something remote doesn't, we delete it. 115 if localIter.Front().ID < remoteIter.Front().ID { 116 changes = append(changes, &structs.ACLRequest{ 117 Op: structs.ACLDelete, 118 ACL: *(localIter.Front()), 119 }) 120 localIter.Next() 121 continue 122 } 123 124 // Local and remote have an ACL with the same ID, so we might 125 // need to compare them. 126 l, r := localIter.Front(), remoteIter.Front() 127 if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) { 128 changes = append(changes, &structs.ACLRequest{ 129 Op: structs.ACLSet, 130 ACL: *r, 131 }) 132 } 133 localIter.Next() 134 remoteIter.Next() 135 } 136 return changes 137} 138 139// FetchLocalACLs returns the ACLs in the local state store. 140func (s *Server) fetchLocalACLs() (structs.ACLs, error) { 141 _, local, err := s.fsm.State().ACLList(nil) 142 if err != nil { 143 return nil, err 144 } 145 return local, nil 146} 147 148// FetchRemoteACLs is used to get the remote set of ACLs from the ACL 149// datacenter. The lastIndex parameter is a hint about which remote index we 150// have replicated to, so this is expected to block until something changes. 151func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) { 152 defer metrics.MeasureSince([]string{"consul", "leader", "fetchRemoteACLs"}, time.Now()) 153 defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now()) 154 155 args := structs.DCSpecificRequest{ 156 Datacenter: s.config.ACLDatacenter, 157 QueryOptions: structs.QueryOptions{ 158 Token: s.tokens.ACLReplicationToken(), 159 MinQueryIndex: lastRemoteIndex, 160 AllowStale: true, 161 }, 162 } 163 var remote structs.IndexedACLs 164 if err := s.RPC("ACL.List", &args, &remote); err != nil { 165 return nil, err 166 } 167 return &remote, nil 168} 169 170// UpdateLocalACLs is given a list of changes to apply in order to bring the 171// local ACLs in-line with the remote ACLs from the ACL datacenter. 172func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { 173 defer metrics.MeasureSince([]string{"consul", "leader", "updateLocalACLs"}, time.Now()) 174 defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now()) 175 176 minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit) 177 for _, change := range changes { 178 // Note that we are using the single ACL interface here and not 179 // performing all this inside a single transaction. This is OK 180 // for two reasons. First, there's nothing else other than this 181 // replication routine that alters the local ACLs, so there's 182 // nothing to contend with locally. Second, if an apply fails 183 // in the middle (most likely due to losing leadership), the 184 // next replication pass will clean up and check everything 185 // again. 186 var reply string 187 start := time.Now() 188 if err := aclApplyInternal(s, change, &reply); err != nil { 189 return err 190 } 191 192 // Do a smooth rate limit to wait out the min time allowed for 193 // each op. If this op took longer than the min, then the sleep 194 // time will be negative and we will just move on. 195 elapsed := time.Since(start) 196 time.Sleep(minTimePerOp - elapsed) 197 } 198 return nil 199} 200 201// replicateACLs is a runs one pass of the algorithm for replicating ACLs from 202// a remote ACL datacenter to local state. If there's any error, this will return 203// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync 204// next time. 205func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) { 206 remote, err := s.fetchRemoteACLs(lastRemoteIndex) 207 if err != nil { 208 return 0, fmt.Errorf("failed to retrieve remote ACLs: %v", err) 209 } 210 211 // This will be pretty common because we will be blocking for a long time 212 // and may have lost leadership, so lets control the message here instead 213 // of returning deeper error messages from from Raft. 214 if !s.IsLeader() { 215 return 0, fmt.Errorf("no longer cluster leader") 216 } 217 218 // Measure everything after the remote query, which can block for long 219 // periods of time. This metric is a good measure of how expensive the 220 // replication process is. 221 defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now()) 222 defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now()) 223 224 local, err := s.fetchLocalACLs() 225 if err != nil { 226 return 0, fmt.Errorf("failed to retrieve local ACLs: %v", err) 227 } 228 229 // If the remote index ever goes backwards, it's a good indication that 230 // the remote side was rebuilt and we should do a full sync since we 231 // can't make any assumptions about what's going on. 232 if remote.QueryMeta.Index < lastRemoteIndex { 233 s.logger.Printf("[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync", lastRemoteIndex, remote.QueryMeta.Index) 234 lastRemoteIndex = 0 235 } 236 237 // Calculate the changes required to bring the state into sync and then 238 // apply them. 239 changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex) 240 if err := s.updateLocalACLs(changes); err != nil { 241 return 0, fmt.Errorf("failed to sync ACL changes: %v", err) 242 } 243 244 // Return the index we got back from the remote side, since we've synced 245 // up with the remote state as of that index. 246 return remote.QueryMeta.Index, nil 247} 248 249// IsACLReplicationEnabled returns true if ACL replication is enabled. 250func (s *Server) IsACLReplicationEnabled() bool { 251 authDC := s.config.ACLDatacenter 252 return len(authDC) > 0 && (authDC != s.config.Datacenter) && 253 s.config.EnableACLReplication 254} 255 256// updateACLReplicationStatus safely updates the ACL replication status. 257func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) { 258 // Fixup the times to shed some useless precision to ease formattting, 259 // and always report UTC. 260 status.LastError = status.LastError.Round(time.Second).UTC() 261 status.LastSuccess = status.LastSuccess.Round(time.Second).UTC() 262 263 // Set the shared state. 264 s.aclReplicationStatusLock.Lock() 265 s.aclReplicationStatus = status 266 s.aclReplicationStatusLock.Unlock() 267} 268 269// runACLReplication is a long-running goroutine that will attempt to replicate 270// ACLs while the server is the leader, until the shutdown channel closes. 271func (s *Server) runACLReplication() { 272 var status structs.ACLReplicationStatus 273 status.Enabled = true 274 status.SourceDatacenter = s.config.ACLDatacenter 275 s.updateACLReplicationStatus(status) 276 277 // Show that it's not running on the way out. 278 defer func() { 279 status.Running = false 280 s.updateACLReplicationStatus(status) 281 }() 282 283 // Give each server's replicator a random initial phase for good 284 // measure. 285 select { 286 case <-s.shutdownCh: 287 return 288 289 case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)): 290 } 291 292 // We are fairly conservative with the lastRemoteIndex so that after a 293 // leadership change or an error we re-sync everything (we also don't 294 // want to block the first time after one of these events so we can 295 // show a successful sync in the status endpoint). 296 var lastRemoteIndex uint64 297 replicate := func() { 298 if !status.Running { 299 lastRemoteIndex = 0 // Re-sync everything. 300 status.Running = true 301 s.updateACLReplicationStatus(status) 302 s.logger.Printf("[INFO] consul: ACL replication started") 303 } 304 305 index, err := s.replicateACLs(lastRemoteIndex) 306 if err != nil { 307 lastRemoteIndex = 0 // Re-sync everything. 308 status.LastError = time.Now() 309 s.updateACLReplicationStatus(status) 310 s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err) 311 } else { 312 lastRemoteIndex = index 313 status.ReplicatedIndex = index 314 status.LastSuccess = time.Now() 315 s.updateACLReplicationStatus(status) 316 s.logger.Printf("[DEBUG] consul: ACL replication completed through remote index %d", index) 317 } 318 } 319 pause := func() { 320 if status.Running { 321 lastRemoteIndex = 0 // Re-sync everything. 322 status.Running = false 323 s.updateACLReplicationStatus(status) 324 s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)") 325 } 326 } 327 328 // This will slowly poll to see if replication should be active. Once it 329 // is and we've caught up, the replicate() call will begin to block and 330 // only wake up when the query timer expires or there are new ACLs to 331 // replicate. We've chosen this design so that the ACLReplicationInterval 332 // is the lower bound for how quickly we will replicate, no matter how 333 // much ACL churn is happening on the remote side. 334 // 335 // The blocking query inside replicate() respects the shutdown channel, 336 // so we won't get stuck in here as things are torn down. 337 for { 338 select { 339 case <-s.shutdownCh: 340 return 341 342 case <-time.After(s.config.ACLReplicationInterval): 343 if s.IsLeader() { 344 replicate() 345 } else { 346 pause() 347 } 348 } 349 } 350} 351