1package consul
2
3import (
4	"fmt"
5	"sort"
6	"time"
7
8	"github.com/armon/go-metrics"
9	"github.com/hashicorp/consul/agent/structs"
10	"github.com/hashicorp/consul/lib"
11)
12
13// aclIterator simplifies the algorithm below by providing a basic iterator that
14// moves through a list of ACLs and returns nil when it's exhausted. It also has
15// methods for pre-sorting the ACLs being iterated over by ID, which should
16// already be true, but since this is crucial for correctness and we are taking
17// input from other servers, we sort to make sure.
18type aclIterator struct {
19	acls structs.ACLs
20
21	// index is the current position of the iterator.
22	index int
23}
24
25// newACLIterator returns a new ACL iterator.
26func newACLIterator(acls structs.ACLs) *aclIterator {
27	return &aclIterator{acls: acls}
28}
29
30// See sort.Interface.
31func (a *aclIterator) Len() int {
32	return len(a.acls)
33}
34
35// See sort.Interface.
36func (a *aclIterator) Swap(i, j int) {
37	a.acls[i], a.acls[j] = a.acls[j], a.acls[i]
38}
39
40// See sort.Interface.
41func (a *aclIterator) Less(i, j int) bool {
42	return a.acls[i].ID < a.acls[j].ID
43}
44
45// Front returns the item at index position, or nil if the list is exhausted.
46func (a *aclIterator) Front() *structs.ACL {
47	if a.index < len(a.acls) {
48		return a.acls[a.index]
49	}
50	return nil
51}
52
53// Next advances the iterator to the next index.
54func (a *aclIterator) Next() {
55	a.index++
56}
57
58// reconcileACLs takes the local and remote ACL state, and produces a list of
59// changes required in order to bring the local ACLs into sync with the remote
60// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
61// up to that remote index and it will make this process more efficient by only
62// comparing ACL entries modified after that index. Setting this to 0 will force
63// a full compare of all existing ACLs.
64func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests {
65	// Since sorting the lists is crucial for correctness, we are depending
66	// on data coming from other servers potentially running a different,
67	// version of Consul, and sorted-ness is kind of a subtle property of
68	// the state store indexing, it's prudent to make sure things are sorted
69	// before we begin.
70	localIter, remoteIter := newACLIterator(local), newACLIterator(remote)
71	sort.Sort(localIter)
72	sort.Sort(remoteIter)
73
74	// Run through both lists and reconcile them.
75	var changes structs.ACLRequests
76	for localIter.Front() != nil || remoteIter.Front() != nil {
77		// If the local list is exhausted, then process this as a remote
78		// add. We know from the loop condition that there's something
79		// in the remote list.
80		if localIter.Front() == nil {
81			changes = append(changes, &structs.ACLRequest{
82				Op:  structs.ACLSet,
83				ACL: *(remoteIter.Front()),
84			})
85			remoteIter.Next()
86			continue
87		}
88
89		// If the remote list is exhausted, then process this as a local
90		// delete. We know from the loop condition that there's something
91		// in the local list.
92		if remoteIter.Front() == nil {
93			changes = append(changes, &structs.ACLRequest{
94				Op:  structs.ACLDelete,
95				ACL: *(localIter.Front()),
96			})
97			localIter.Next()
98			continue
99		}
100
101		// At this point we know there's something at the front of each
102		// list we need to resolve.
103
104		// If the remote list has something local doesn't, we add it.
105		if localIter.Front().ID > remoteIter.Front().ID {
106			changes = append(changes, &structs.ACLRequest{
107				Op:  structs.ACLSet,
108				ACL: *(remoteIter.Front()),
109			})
110			remoteIter.Next()
111			continue
112		}
113
114		// If local has something remote doesn't, we delete it.
115		if localIter.Front().ID < remoteIter.Front().ID {
116			changes = append(changes, &structs.ACLRequest{
117				Op:  structs.ACLDelete,
118				ACL: *(localIter.Front()),
119			})
120			localIter.Next()
121			continue
122		}
123
124		// Local and remote have an ACL with the same ID, so we might
125		// need to compare them.
126		l, r := localIter.Front(), remoteIter.Front()
127		if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) {
128			changes = append(changes, &structs.ACLRequest{
129				Op:  structs.ACLSet,
130				ACL: *r,
131			})
132		}
133		localIter.Next()
134		remoteIter.Next()
135	}
136	return changes
137}
138
139// FetchLocalACLs returns the ACLs in the local state store.
140func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
141	_, local, err := s.fsm.State().ACLList(nil)
142	if err != nil {
143		return nil, err
144	}
145	return local, nil
146}
147
148// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
149// datacenter. The lastIndex parameter is a hint about which remote index we
150// have replicated to, so this is expected to block until something changes.
151func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
152	defer metrics.MeasureSince([]string{"consul", "leader", "fetchRemoteACLs"}, time.Now())
153	defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now())
154
155	args := structs.DCSpecificRequest{
156		Datacenter: s.config.ACLDatacenter,
157		QueryOptions: structs.QueryOptions{
158			Token:         s.tokens.ACLReplicationToken(),
159			MinQueryIndex: lastRemoteIndex,
160			AllowStale:    true,
161		},
162	}
163	var remote structs.IndexedACLs
164	if err := s.RPC("ACL.List", &args, &remote); err != nil {
165		return nil, err
166	}
167	return &remote, nil
168}
169
170// UpdateLocalACLs is given a list of changes to apply in order to bring the
171// local ACLs in-line with the remote ACLs from the ACL datacenter.
172func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
173	defer metrics.MeasureSince([]string{"consul", "leader", "updateLocalACLs"}, time.Now())
174	defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now())
175
176	minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
177	for _, change := range changes {
178		// Note that we are using the single ACL interface here and not
179		// performing all this inside a single transaction. This is OK
180		// for two reasons. First, there's nothing else other than this
181		// replication routine that alters the local ACLs, so there's
182		// nothing to contend with locally. Second, if an apply fails
183		// in the middle (most likely due to losing leadership), the
184		// next replication pass will clean up and check everything
185		// again.
186		var reply string
187		start := time.Now()
188		if err := aclApplyInternal(s, change, &reply); err != nil {
189			return err
190		}
191
192		// Do a smooth rate limit to wait out the min time allowed for
193		// each op. If this op took longer than the min, then the sleep
194		// time will be negative and we will just move on.
195		elapsed := time.Since(start)
196		time.Sleep(minTimePerOp - elapsed)
197	}
198	return nil
199}
200
201// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
202// a remote ACL datacenter to local state. If there's any error, this will return
203// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
204// next time.
205func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) {
206	remote, err := s.fetchRemoteACLs(lastRemoteIndex)
207	if err != nil {
208		return 0, fmt.Errorf("failed to retrieve remote ACLs: %v", err)
209	}
210
211	// This will be pretty common because we will be blocking for a long time
212	// and may have lost leadership, so lets control the message here instead
213	// of returning deeper error messages from from Raft.
214	if !s.IsLeader() {
215		return 0, fmt.Errorf("no longer cluster leader")
216	}
217
218	// Measure everything after the remote query, which can block for long
219	// periods of time. This metric is a good measure of how expensive the
220	// replication process is.
221	defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now())
222	defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now())
223
224	local, err := s.fetchLocalACLs()
225	if err != nil {
226		return 0, fmt.Errorf("failed to retrieve local ACLs: %v", err)
227	}
228
229	// If the remote index ever goes backwards, it's a good indication that
230	// the remote side was rebuilt and we should do a full sync since we
231	// can't make any assumptions about what's going on.
232	if remote.QueryMeta.Index < lastRemoteIndex {
233		s.logger.Printf("[WARN] consul: ACL replication remote index moved backwards (%d to %d), forcing a full ACL sync", lastRemoteIndex, remote.QueryMeta.Index)
234		lastRemoteIndex = 0
235	}
236
237	// Calculate the changes required to bring the state into sync and then
238	// apply them.
239	changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex)
240	if err := s.updateLocalACLs(changes); err != nil {
241		return 0, fmt.Errorf("failed to sync ACL changes: %v", err)
242	}
243
244	// Return the index we got back from the remote side, since we've synced
245	// up with the remote state as of that index.
246	return remote.QueryMeta.Index, nil
247}
248
249// IsACLReplicationEnabled returns true if ACL replication is enabled.
250func (s *Server) IsACLReplicationEnabled() bool {
251	authDC := s.config.ACLDatacenter
252	return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
253		s.config.EnableACLReplication
254}
255
256// updateACLReplicationStatus safely updates the ACL replication status.
257func (s *Server) updateACLReplicationStatus(status structs.ACLReplicationStatus) {
258	// Fixup the times to shed some useless precision to ease formattting,
259	// and always report UTC.
260	status.LastError = status.LastError.Round(time.Second).UTC()
261	status.LastSuccess = status.LastSuccess.Round(time.Second).UTC()
262
263	// Set the shared state.
264	s.aclReplicationStatusLock.Lock()
265	s.aclReplicationStatus = status
266	s.aclReplicationStatusLock.Unlock()
267}
268
269// runACLReplication is a long-running goroutine that will attempt to replicate
270// ACLs while the server is the leader, until the shutdown channel closes.
271func (s *Server) runACLReplication() {
272	var status structs.ACLReplicationStatus
273	status.Enabled = true
274	status.SourceDatacenter = s.config.ACLDatacenter
275	s.updateACLReplicationStatus(status)
276
277	// Show that it's not running on the way out.
278	defer func() {
279		status.Running = false
280		s.updateACLReplicationStatus(status)
281	}()
282
283	// Give each server's replicator a random initial phase for good
284	// measure.
285	select {
286	case <-s.shutdownCh:
287		return
288
289	case <-time.After(lib.RandomStagger(s.config.ACLReplicationInterval)):
290	}
291
292	// We are fairly conservative with the lastRemoteIndex so that after a
293	// leadership change or an error we re-sync everything (we also don't
294	// want to block the first time after one of these events so we can
295	// show a successful sync in the status endpoint).
296	var lastRemoteIndex uint64
297	replicate := func() {
298		if !status.Running {
299			lastRemoteIndex = 0 // Re-sync everything.
300			status.Running = true
301			s.updateACLReplicationStatus(status)
302			s.logger.Printf("[INFO] consul: ACL replication started")
303		}
304
305		index, err := s.replicateACLs(lastRemoteIndex)
306		if err != nil {
307			lastRemoteIndex = 0 // Re-sync everything.
308			status.LastError = time.Now()
309			s.updateACLReplicationStatus(status)
310			s.logger.Printf("[WARN] consul: ACL replication error (will retry if still leader): %v", err)
311		} else {
312			lastRemoteIndex = index
313			status.ReplicatedIndex = index
314			status.LastSuccess = time.Now()
315			s.updateACLReplicationStatus(status)
316			s.logger.Printf("[DEBUG] consul: ACL replication completed through remote index %d", index)
317		}
318	}
319	pause := func() {
320		if status.Running {
321			lastRemoteIndex = 0 // Re-sync everything.
322			status.Running = false
323			s.updateACLReplicationStatus(status)
324			s.logger.Printf("[INFO] consul: ACL replication stopped (no longer leader)")
325		}
326	}
327
328	// This will slowly poll to see if replication should be active. Once it
329	// is and we've caught up, the replicate() call will begin to block and
330	// only wake up when the query timer expires or there are new ACLs to
331	// replicate. We've chosen this design so that the ACLReplicationInterval
332	// is the lower bound for how quickly we will replicate, no matter how
333	// much ACL churn is happening on the remote side.
334	//
335	// The blocking query inside replicate() respects the shutdown channel,
336	// so we won't get stuck in here as things are torn down.
337	for {
338		select {
339		case <-s.shutdownCh:
340			return
341
342		case <-time.After(s.config.ACLReplicationInterval):
343			if s.IsLeader() {
344				replicate()
345			} else {
346				pause()
347			}
348		}
349	}
350}
351