1package consul 2 3import ( 4 "context" 5 "fmt" 6 "sort" 7 "time" 8 9 metrics "github.com/armon/go-metrics" 10 "github.com/hashicorp/consul/agent/structs" 11 "github.com/hashicorp/go-hclog" 12) 13 14// aclIterator simplifies the algorithm below by providing a basic iterator that 15// moves through a list of ACLs and returns nil when it's exhausted. It also has 16// methods for pre-sorting the ACLs being iterated over by ID, which should 17// already be true, but since this is crucial for correctness and we are taking 18// input from other servers, we sort to make sure. 19type aclIterator struct { 20 acls structs.ACLs 21 22 // index is the current position of the iterator. 23 index int 24} 25 26// newACLIterator returns a new ACL iterator. 27func newACLIterator(acls structs.ACLs) *aclIterator { 28 return &aclIterator{acls: acls} 29} 30 31// See sort.Interface. 32func (a *aclIterator) Len() int { 33 return len(a.acls) 34} 35 36// See sort.Interface. 37func (a *aclIterator) Swap(i, j int) { 38 a.acls[i], a.acls[j] = a.acls[j], a.acls[i] 39} 40 41// See sort.Interface. 42func (a *aclIterator) Less(i, j int) bool { 43 return a.acls[i].ID < a.acls[j].ID 44} 45 46// Front returns the item at index position, or nil if the list is exhausted. 47func (a *aclIterator) Front() *structs.ACL { 48 if a.index < len(a.acls) { 49 return a.acls[a.index] 50 } 51 return nil 52} 53 54// Next advances the iterator to the next index. 55func (a *aclIterator) Next() { 56 a.index++ 57} 58 59// reconcileACLs takes the local and remote ACL state, and produces a list of 60// changes required in order to bring the local ACLs into sync with the remote 61// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded 62// up to that remote index and it will make this process more efficient by only 63// comparing ACL entries modified after that index. Setting this to 0 will force 64// a full compare of all existing ACLs. 65func reconcileLegacyACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests { 66 // Since sorting the lists is crucial for correctness, we are depending 67 // on data coming from other servers potentially running a different, 68 // version of Consul, and sorted-ness is kind of a subtle property of 69 // the state store indexing, it's prudent to make sure things are sorted 70 // before we begin. 71 localIter, remoteIter := newACLIterator(local), newACLIterator(remote) 72 sort.Sort(localIter) 73 sort.Sort(remoteIter) 74 75 // Run through both lists and reconcile them. 76 var changes structs.ACLRequests 77 for localIter.Front() != nil || remoteIter.Front() != nil { 78 // If the local list is exhausted, then process this as a remote 79 // add. We know from the loop condition that there's something 80 // in the remote list. 81 if localIter.Front() == nil { 82 changes = append(changes, &structs.ACLRequest{ 83 Op: structs.ACLSet, 84 ACL: *(remoteIter.Front()), 85 }) 86 remoteIter.Next() 87 continue 88 } 89 90 // If the remote list is exhausted, then process this as a local 91 // delete. We know from the loop condition that there's something 92 // in the local list. 93 if remoteIter.Front() == nil { 94 changes = append(changes, &structs.ACLRequest{ 95 Op: structs.ACLDelete, 96 ACL: *(localIter.Front()), 97 }) 98 localIter.Next() 99 continue 100 } 101 102 // At this point we know there's something at the front of each 103 // list we need to resolve. 104 105 // If the remote list has something local doesn't, we add it. 106 if localIter.Front().ID > remoteIter.Front().ID { 107 changes = append(changes, &structs.ACLRequest{ 108 Op: structs.ACLSet, 109 ACL: *(remoteIter.Front()), 110 }) 111 remoteIter.Next() 112 continue 113 } 114 115 // If local has something remote doesn't, we delete it. 116 if localIter.Front().ID < remoteIter.Front().ID { 117 changes = append(changes, &structs.ACLRequest{ 118 Op: structs.ACLDelete, 119 ACL: *(localIter.Front()), 120 }) 121 localIter.Next() 122 continue 123 } 124 125 // Local and remote have an ACL with the same ID, so we might 126 // need to compare them. 127 l, r := localIter.Front(), remoteIter.Front() 128 if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) { 129 changes = append(changes, &structs.ACLRequest{ 130 Op: structs.ACLSet, 131 ACL: *r, 132 }) 133 } 134 localIter.Next() 135 remoteIter.Next() 136 } 137 return changes 138} 139 140// FetchLocalACLs returns the ACLs in the local state store. 141func (s *Server) fetchLocalLegacyACLs() (structs.ACLs, error) { 142 _, local, err := s.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil) 143 if err != nil { 144 return nil, err 145 } 146 147 now := time.Now() 148 149 var acls structs.ACLs 150 for _, token := range local { 151 if token.IsExpired(now) { 152 continue 153 } 154 if acl, err := token.Convert(); err == nil && acl != nil { 155 acls = append(acls, acl) 156 } 157 } 158 159 return acls, nil 160} 161 162// FetchRemoteACLs is used to get the remote set of ACLs from the ACL 163// datacenter. The lastIndex parameter is a hint about which remote index we 164// have replicated to, so this is expected to block until something changes. 165func (s *Server) fetchRemoteLegacyACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) { 166 defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now()) 167 168 args := structs.DCSpecificRequest{ 169 Datacenter: s.config.ACLDatacenter, 170 QueryOptions: structs.QueryOptions{ 171 Token: s.tokens.ReplicationToken(), 172 MinQueryIndex: lastRemoteIndex, 173 AllowStale: true, 174 }, 175 } 176 var remote structs.IndexedACLs 177 if err := s.RPC("ACL.List", &args, &remote); err != nil { 178 return nil, err 179 } 180 return &remote, nil 181} 182 183// UpdateLocalACLs is given a list of changes to apply in order to bring the 184// local ACLs in-line with the remote ACLs from the ACL datacenter. 185func (s *Server) updateLocalLegacyACLs(changes structs.ACLRequests, ctx context.Context) (bool, error) { 186 defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now()) 187 188 minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit) 189 for _, change := range changes { 190 // Note that we are using the single ACL interface here and not 191 // performing all this inside a single transaction. This is OK 192 // for two reasons. First, there's nothing else other than this 193 // replication routine that alters the local ACLs, so there's 194 // nothing to contend with locally. Second, if an apply fails 195 // in the middle (most likely due to losing leadership), the 196 // next replication pass will clean up and check everything 197 // again. 198 var reply string 199 start := time.Now() 200 if err := aclApplyInternal(s, change, &reply); err != nil { 201 return false, err 202 } 203 204 // Do a smooth rate limit to wait out the min time allowed for 205 // each op. If this op took longer than the min, then the sleep 206 // time will be negative and we will just move on. 207 elapsed := time.Since(start) 208 select { 209 case <-ctx.Done(): 210 return true, nil 211 case <-time.After(minTimePerOp - elapsed): 212 // do nothing 213 } 214 } 215 return false, nil 216} 217 218// replicateACLs is a runs one pass of the algorithm for replicating ACLs from 219// a remote ACL datacenter to local state. If there's any error, this will return 220// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync 221// next time. 222func (s *Server) replicateLegacyACLs(ctx context.Context, logger hclog.Logger, lastRemoteIndex uint64) (uint64, bool, error) { 223 remote, err := s.fetchRemoteLegacyACLs(lastRemoteIndex) 224 if err != nil { 225 return 0, false, fmt.Errorf("failed to retrieve remote ACLs: %v", err) 226 } 227 228 // Need to check if we should be stopping. This will be common as the fetching process is a blocking 229 // RPC which could have been hanging around for a long time and during that time leadership could 230 // have been lost. 231 select { 232 case <-ctx.Done(): 233 return 0, true, nil 234 default: 235 // do nothing 236 } 237 238 // Measure everything after the remote query, which can block for long 239 // periods of time. This metric is a good measure of how expensive the 240 // replication process is. 241 defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now()) 242 243 local, err := s.fetchLocalLegacyACLs() 244 if err != nil { 245 return 0, false, fmt.Errorf("failed to retrieve local ACLs: %v", err) 246 } 247 248 // If the remote index ever goes backwards, it's a good indication that 249 // the remote side was rebuilt and we should do a full sync since we 250 // can't make any assumptions about what's going on. 251 if remote.QueryMeta.Index < lastRemoteIndex { 252 logger.Warn( 253 "Legacy ACL replication remote index moved backwards, forcing a full ACL sync", 254 "from", lastRemoteIndex, 255 "to", remote.QueryMeta.Index, 256 ) 257 lastRemoteIndex = 0 258 } 259 260 // Calculate the changes required to bring the state into sync and then 261 // apply them. 262 changes := reconcileLegacyACLs(local, remote.ACLs, lastRemoteIndex) 263 exit, err := s.updateLocalLegacyACLs(changes, ctx) 264 if exit { 265 return 0, true, nil 266 } 267 268 if err != nil { 269 return 0, false, fmt.Errorf("failed to sync ACL changes: %v", err) 270 } 271 272 // Return the index we got back from the remote side, since we've synced 273 // up with the remote state as of that index. 274 return remote.QueryMeta.Index, false, nil 275} 276