1package consul
2
3import (
4	"context"
5	"fmt"
6	"sort"
7	"time"
8
9	metrics "github.com/armon/go-metrics"
10	"github.com/hashicorp/consul/agent/structs"
11	"github.com/hashicorp/go-hclog"
12)
13
14// aclIterator simplifies the algorithm below by providing a basic iterator that
15// moves through a list of ACLs and returns nil when it's exhausted. It also has
16// methods for pre-sorting the ACLs being iterated over by ID, which should
17// already be true, but since this is crucial for correctness and we are taking
18// input from other servers, we sort to make sure.
19type aclIterator struct {
20	acls structs.ACLs
21
22	// index is the current position of the iterator.
23	index int
24}
25
26// newACLIterator returns a new ACL iterator.
27func newACLIterator(acls structs.ACLs) *aclIterator {
28	return &aclIterator{acls: acls}
29}
30
31// See sort.Interface.
32func (a *aclIterator) Len() int {
33	return len(a.acls)
34}
35
36// See sort.Interface.
37func (a *aclIterator) Swap(i, j int) {
38	a.acls[i], a.acls[j] = a.acls[j], a.acls[i]
39}
40
41// See sort.Interface.
42func (a *aclIterator) Less(i, j int) bool {
43	return a.acls[i].ID < a.acls[j].ID
44}
45
46// Front returns the item at index position, or nil if the list is exhausted.
47func (a *aclIterator) Front() *structs.ACL {
48	if a.index < len(a.acls) {
49		return a.acls[a.index]
50	}
51	return nil
52}
53
54// Next advances the iterator to the next index.
55func (a *aclIterator) Next() {
56	a.index++
57}
58
59// reconcileACLs takes the local and remote ACL state, and produces a list of
60// changes required in order to bring the local ACLs into sync with the remote
61// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
62// up to that remote index and it will make this process more efficient by only
63// comparing ACL entries modified after that index. Setting this to 0 will force
64// a full compare of all existing ACLs.
65func reconcileLegacyACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests {
66	// Since sorting the lists is crucial for correctness, we are depending
67	// on data coming from other servers potentially running a different,
68	// version of Consul, and sorted-ness is kind of a subtle property of
69	// the state store indexing, it's prudent to make sure things are sorted
70	// before we begin.
71	localIter, remoteIter := newACLIterator(local), newACLIterator(remote)
72	sort.Sort(localIter)
73	sort.Sort(remoteIter)
74
75	// Run through both lists and reconcile them.
76	var changes structs.ACLRequests
77	for localIter.Front() != nil || remoteIter.Front() != nil {
78		// If the local list is exhausted, then process this as a remote
79		// add. We know from the loop condition that there's something
80		// in the remote list.
81		if localIter.Front() == nil {
82			changes = append(changes, &structs.ACLRequest{
83				Op:  structs.ACLSet,
84				ACL: *(remoteIter.Front()),
85			})
86			remoteIter.Next()
87			continue
88		}
89
90		// If the remote list is exhausted, then process this as a local
91		// delete. We know from the loop condition that there's something
92		// in the local list.
93		if remoteIter.Front() == nil {
94			changes = append(changes, &structs.ACLRequest{
95				Op:  structs.ACLDelete,
96				ACL: *(localIter.Front()),
97			})
98			localIter.Next()
99			continue
100		}
101
102		// At this point we know there's something at the front of each
103		// list we need to resolve.
104
105		// If the remote list has something local doesn't, we add it.
106		if localIter.Front().ID > remoteIter.Front().ID {
107			changes = append(changes, &structs.ACLRequest{
108				Op:  structs.ACLSet,
109				ACL: *(remoteIter.Front()),
110			})
111			remoteIter.Next()
112			continue
113		}
114
115		// If local has something remote doesn't, we delete it.
116		if localIter.Front().ID < remoteIter.Front().ID {
117			changes = append(changes, &structs.ACLRequest{
118				Op:  structs.ACLDelete,
119				ACL: *(localIter.Front()),
120			})
121			localIter.Next()
122			continue
123		}
124
125		// Local and remote have an ACL with the same ID, so we might
126		// need to compare them.
127		l, r := localIter.Front(), remoteIter.Front()
128		if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) {
129			changes = append(changes, &structs.ACLRequest{
130				Op:  structs.ACLSet,
131				ACL: *r,
132			})
133		}
134		localIter.Next()
135		remoteIter.Next()
136	}
137	return changes
138}
139
140// FetchLocalACLs returns the ACLs in the local state store.
141func (s *Server) fetchLocalLegacyACLs() (structs.ACLs, error) {
142	_, local, err := s.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil)
143	if err != nil {
144		return nil, err
145	}
146
147	now := time.Now()
148
149	var acls structs.ACLs
150	for _, token := range local {
151		if token.IsExpired(now) {
152			continue
153		}
154		if acl, err := token.Convert(); err == nil && acl != nil {
155			acls = append(acls, acl)
156		}
157	}
158
159	return acls, nil
160}
161
162// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
163// datacenter. The lastIndex parameter is a hint about which remote index we
164// have replicated to, so this is expected to block until something changes.
165func (s *Server) fetchRemoteLegacyACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
166	defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now())
167
168	args := structs.DCSpecificRequest{
169		Datacenter: s.config.ACLDatacenter,
170		QueryOptions: structs.QueryOptions{
171			Token:         s.tokens.ReplicationToken(),
172			MinQueryIndex: lastRemoteIndex,
173			AllowStale:    true,
174		},
175	}
176	var remote structs.IndexedACLs
177	if err := s.RPC("ACL.List", &args, &remote); err != nil {
178		return nil, err
179	}
180	return &remote, nil
181}
182
183// UpdateLocalACLs is given a list of changes to apply in order to bring the
184// local ACLs in-line with the remote ACLs from the ACL datacenter.
185func (s *Server) updateLocalLegacyACLs(changes structs.ACLRequests, ctx context.Context) (bool, error) {
186	defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now())
187
188	minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
189	for _, change := range changes {
190		// Note that we are using the single ACL interface here and not
191		// performing all this inside a single transaction. This is OK
192		// for two reasons. First, there's nothing else other than this
193		// replication routine that alters the local ACLs, so there's
194		// nothing to contend with locally. Second, if an apply fails
195		// in the middle (most likely due to losing leadership), the
196		// next replication pass will clean up and check everything
197		// again.
198		var reply string
199		start := time.Now()
200		if err := aclApplyInternal(s, change, &reply); err != nil {
201			return false, err
202		}
203
204		// Do a smooth rate limit to wait out the min time allowed for
205		// each op. If this op took longer than the min, then the sleep
206		// time will be negative and we will just move on.
207		elapsed := time.Since(start)
208		select {
209		case <-ctx.Done():
210			return true, nil
211		case <-time.After(minTimePerOp - elapsed):
212			// do nothing
213		}
214	}
215	return false, nil
216}
217
218// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
219// a remote ACL datacenter to local state. If there's any error, this will return
220// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
221// next time.
222func (s *Server) replicateLegacyACLs(ctx context.Context, logger hclog.Logger, lastRemoteIndex uint64) (uint64, bool, error) {
223	remote, err := s.fetchRemoteLegacyACLs(lastRemoteIndex)
224	if err != nil {
225		return 0, false, fmt.Errorf("failed to retrieve remote ACLs: %v", err)
226	}
227
228	// Need to check if we should be stopping. This will be common as the fetching process is a blocking
229	// RPC which could have been hanging around for a long time and during that time leadership could
230	// have been lost.
231	select {
232	case <-ctx.Done():
233		return 0, true, nil
234	default:
235		// do nothing
236	}
237
238	// Measure everything after the remote query, which can block for long
239	// periods of time. This metric is a good measure of how expensive the
240	// replication process is.
241	defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now())
242
243	local, err := s.fetchLocalLegacyACLs()
244	if err != nil {
245		return 0, false, fmt.Errorf("failed to retrieve local ACLs: %v", err)
246	}
247
248	// If the remote index ever goes backwards, it's a good indication that
249	// the remote side was rebuilt and we should do a full sync since we
250	// can't make any assumptions about what's going on.
251	if remote.QueryMeta.Index < lastRemoteIndex {
252		logger.Warn(
253			"Legacy ACL replication remote index moved backwards, forcing a full ACL sync",
254			"from", lastRemoteIndex,
255			"to", remote.QueryMeta.Index,
256		)
257		lastRemoteIndex = 0
258	}
259
260	// Calculate the changes required to bring the state into sync and then
261	// apply them.
262	changes := reconcileLegacyACLs(local, remote.ACLs, lastRemoteIndex)
263	exit, err := s.updateLocalLegacyACLs(changes, ctx)
264	if exit {
265		return 0, true, nil
266	}
267
268	if err != nil {
269		return 0, false, fmt.Errorf("failed to sync ACL changes: %v", err)
270	}
271
272	// Return the index we got back from the remote side, since we've synced
273	// up with the remote state as of that index.
274	return remote.QueryMeta.Index, false, nil
275}
276