1package consul
2
3import (
4	"fmt"
5	"os"
6	"strconv"
7	"testing"
8	"time"
9
10	"github.com/hashicorp/consul/acl"
11	"github.com/hashicorp/consul/agent/structs"
12	tokenStore "github.com/hashicorp/consul/agent/token"
13	"github.com/hashicorp/consul/sdk/testutil/retry"
14	"github.com/hashicorp/consul/testrpc"
15	"github.com/stretchr/testify/require"
16)
17
18func TestACLReplication_diffACLPolicies(t *testing.T) {
19	diffACLPolicies := func(local structs.ACLPolicies, remote structs.ACLPolicyListStubs, lastRemoteIndex uint64) ([]string, []string) {
20		tr := &aclPolicyReplicator{local: local, remote: remote}
21		res := diffACLType(tr, lastRemoteIndex)
22		return res.LocalDeletes, res.LocalUpserts
23	}
24	local := structs.ACLPolicies{
25		&structs.ACLPolicy{
26			ID:          "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
27			Name:        "policy1",
28			Description: "policy1 - already in sync",
29			Rules:       `acl = "read"`,
30			Syntax:      acl.SyntaxCurrent,
31			Datacenters: nil,
32			Hash:        []byte{1, 2, 3, 4},
33			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
34		},
35		&structs.ACLPolicy{
36			ID:          "8ea41efb-8519-4091-bc91-c42da0cda9ae",
37			Name:        "policy2",
38			Description: "policy2 - updated but not changed",
39			Rules:       `acl = "read"`,
40			Syntax:      acl.SyntaxCurrent,
41			Datacenters: nil,
42			Hash:        []byte{1, 2, 3, 4},
43			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25},
44		},
45		&structs.ACLPolicy{
46			ID:          "539f1cb6-40aa-464f-ae66-a900d26bc1b2",
47			Name:        "policy3",
48			Description: "policy3 - updated and changed",
49			Rules:       `acl = "read"`,
50			Syntax:      acl.SyntaxCurrent,
51			Datacenters: nil,
52			Hash:        []byte{1, 2, 3, 4},
53			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25},
54		},
55		&structs.ACLPolicy{
56			ID:          "e9d33298-6490-4466-99cb-ba93af64fa76",
57			Name:        "policy4",
58			Description: "policy4 - needs deleting",
59			Rules:       `acl = "read"`,
60			Syntax:      acl.SyntaxCurrent,
61			Datacenters: nil,
62			Hash:        []byte{1, 2, 3, 4},
63			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25},
64		},
65	}
66
67	remote := structs.ACLPolicyListStubs{
68		&structs.ACLPolicyListStub{
69			ID:          "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
70			Name:        "policy1",
71			Description: "policy1 - already in sync",
72			Datacenters: nil,
73			Hash:        []byte{1, 2, 3, 4},
74			CreateIndex: 1,
75			ModifyIndex: 2,
76		},
77		&structs.ACLPolicyListStub{
78			ID:          "8ea41efb-8519-4091-bc91-c42da0cda9ae",
79			Name:        "policy2",
80			Description: "policy2 - updated but not changed",
81			Datacenters: nil,
82			Hash:        []byte{1, 2, 3, 4},
83			CreateIndex: 1,
84			ModifyIndex: 50,
85		},
86		&structs.ACLPolicyListStub{
87			ID:          "539f1cb6-40aa-464f-ae66-a900d26bc1b2",
88			Name:        "policy3",
89			Description: "policy3 - updated and changed",
90			Datacenters: nil,
91			Hash:        []byte{5, 6, 7, 8},
92			CreateIndex: 1,
93			ModifyIndex: 50,
94		},
95		&structs.ACLPolicyListStub{
96			ID:          "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926",
97			Name:        "policy5",
98			Description: "policy5 - needs adding",
99			Datacenters: nil,
100			Hash:        []byte{1, 2, 3, 4},
101			CreateIndex: 1,
102			ModifyIndex: 50,
103		},
104	}
105
106	// Do the full diff. This full exercises the main body of the loop
107	deletions, updates := diffACLPolicies(local, remote, 28)
108	require.Len(t, updates, 2)
109	require.ElementsMatch(t, updates, []string{
110		"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926",
111		"539f1cb6-40aa-464f-ae66-a900d26bc1b2"})
112
113	require.Len(t, deletions, 1)
114	require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", deletions[0])
115
116	deletions, updates = diffACLPolicies(local, nil, 28)
117	require.Len(t, updates, 0)
118	require.Len(t, deletions, 4)
119	require.ElementsMatch(t, deletions, []string{
120		"44ef9aec-7654-4401-901b-4d4a8b3c80fc",
121		"8ea41efb-8519-4091-bc91-c42da0cda9ae",
122		"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
123		"e9d33298-6490-4466-99cb-ba93af64fa76"})
124
125	deletions, updates = diffACLPolicies(nil, remote, 28)
126	require.Len(t, deletions, 0)
127	require.Len(t, updates, 4)
128	require.ElementsMatch(t, updates, []string{
129		"44ef9aec-7654-4401-901b-4d4a8b3c80fc",
130		"8ea41efb-8519-4091-bc91-c42da0cda9ae",
131		"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
132		"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"})
133}
134
135func TestACLReplication_diffACLTokens(t *testing.T) {
136	diffACLTokens := func(
137		local structs.ACLTokens,
138		remote structs.ACLTokenListStubs,
139		lastRemoteIndex uint64,
140	) itemDiffResults {
141		tr := &aclTokenReplicator{local: local, remote: remote}
142		return diffACLType(tr, lastRemoteIndex)
143	}
144
145	local := structs.ACLTokens{
146		// When a just-upgraded (1.3->1.4+) secondary DC is replicating from an
147		// upgraded primary DC (1.4+), the local state for tokens predating the
148		// upgrade will lack AccessorIDs.
149		//
150		// The primary DC will lazily perform the update to assign AccessorIDs,
151		// and that new update will come across the wire locally as a new
152		// insert.
153		//
154		// We simulate that scenario here with 'token0' having no AccessorID in
155		// the secondary (local) DC and having an AccessorID assigned in the
156		// payload retrieved from the primary (remote) DC.
157		&structs.ACLToken{
158			AccessorID:  "",
159			SecretID:    "5128289f-c22c-4d32-936e-7662443f1a55",
160			Description: "token0 - old and not yet upgraded",
161			Hash:        []byte{1, 2, 3, 4},
162			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 3},
163		},
164		&structs.ACLToken{
165			AccessorID:  "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
166			SecretID:    "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
167			Description: "token1 - already in sync",
168			Hash:        []byte{1, 2, 3, 4},
169			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2},
170		},
171		&structs.ACLToken{
172			AccessorID:  "8ea41efb-8519-4091-bc91-c42da0cda9ae",
173			SecretID:    "8ea41efb-8519-4091-bc91-c42da0cda9ae",
174			Description: "token2 - updated but not changed",
175			Hash:        []byte{1, 2, 3, 4},
176			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25},
177		},
178		&structs.ACLToken{
179			AccessorID:  "539f1cb6-40aa-464f-ae66-a900d26bc1b2",
180			SecretID:    "539f1cb6-40aa-464f-ae66-a900d26bc1b2",
181			Description: "token3 - updated and changed",
182			Hash:        []byte{1, 2, 3, 4},
183			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25},
184		},
185		&structs.ACLToken{
186			AccessorID:  "e9d33298-6490-4466-99cb-ba93af64fa76",
187			SecretID:    "e9d33298-6490-4466-99cb-ba93af64fa76",
188			Description: "token4 - needs deleting",
189			Hash:        []byte{1, 2, 3, 4},
190			RaftIndex:   structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25},
191		},
192	}
193
194	remote := structs.ACLTokenListStubs{
195		&structs.ACLTokenListStub{
196			AccessorID: "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d",
197			//SecretID:    "5128289f-c22c-4d32-936e-7662443f1a55", (formerly)
198			Description: "token0 - old and not yet upgraded locally",
199			Hash:        []byte{1, 2, 3, 4},
200			CreateIndex: 1,
201			ModifyIndex: 3,
202		},
203		&structs.ACLTokenListStub{
204			AccessorID:  "44ef9aec-7654-4401-901b-4d4a8b3c80fc",
205			Description: "token1 - already in sync",
206			Hash:        []byte{1, 2, 3, 4},
207			CreateIndex: 1,
208			ModifyIndex: 2,
209		},
210		&structs.ACLTokenListStub{
211			AccessorID:  "8ea41efb-8519-4091-bc91-c42da0cda9ae",
212			Description: "token2 - updated but not changed",
213			Hash:        []byte{1, 2, 3, 4},
214			CreateIndex: 1,
215			ModifyIndex: 50,
216		},
217		&structs.ACLTokenListStub{
218			AccessorID:  "539f1cb6-40aa-464f-ae66-a900d26bc1b2",
219			Description: "token3 - updated and changed",
220			Hash:        []byte{5, 6, 7, 8},
221			CreateIndex: 1,
222			ModifyIndex: 50,
223		},
224		&structs.ACLTokenListStub{
225			AccessorID:  "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926",
226			Description: "token5 - needs adding",
227			Hash:        []byte{1, 2, 3, 4},
228			CreateIndex: 1,
229			ModifyIndex: 50,
230		},
231		// When a 1.4+ secondary DC is replicating from a 1.4+ primary DC,
232		// tokens created using the legacy APIs will not initially have
233		// AccessorIDs assigned. That assignment is lazy (but in quick
234		// succession).
235		//
236		// The secondary (local) will see these in the api response as a stub
237		// with "" as the AccessorID.
238		//
239		// We simulate that here to verify that the secondary does the right
240		// thing by skipping them until it sees them with nonempty AccessorIDs.
241		&structs.ACLTokenListStub{
242			AccessorID:  "",
243			Description: "token6 - pending async AccessorID assignment",
244			Hash:        []byte{1, 2, 3, 4},
245			CreateIndex: 51,
246			ModifyIndex: 51,
247		},
248	}
249
250	// Do the full diff. This full exercises the main body of the loop
251	t.Run("full-diff", func(t *testing.T) {
252		res := diffACLTokens(local, remote, 28)
253		require.Equal(t, 1, res.LocalSkipped)
254		require.Equal(t, 1, res.RemoteSkipped)
255		require.Len(t, res.LocalUpserts, 3)
256		require.ElementsMatch(t, res.LocalUpserts, []string{
257			"72fac6a3-a014-41c8-9cb2-8d9a5e935f3d",
258			"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926",
259			"539f1cb6-40aa-464f-ae66-a900d26bc1b2"})
260
261		require.Len(t, res.LocalDeletes, 1)
262		require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", res.LocalDeletes[0])
263	})
264
265	t.Run("only-local", func(t *testing.T) {
266		res := diffACLTokens(local, nil, 28)
267		require.Equal(t, 1, res.LocalSkipped)
268		require.Equal(t, 0, res.RemoteSkipped)
269		require.Len(t, res.LocalUpserts, 0)
270		require.Len(t, res.LocalDeletes, 4)
271		require.ElementsMatch(t, res.LocalDeletes, []string{
272			"44ef9aec-7654-4401-901b-4d4a8b3c80fc",
273			"8ea41efb-8519-4091-bc91-c42da0cda9ae",
274			"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
275			"e9d33298-6490-4466-99cb-ba93af64fa76"})
276	})
277
278	t.Run("only-remote", func(t *testing.T) {
279		res := diffACLTokens(nil, remote, 28)
280		require.Equal(t, 0, res.LocalSkipped)
281		require.Equal(t, 1, res.RemoteSkipped)
282		require.Len(t, res.LocalDeletes, 0)
283		require.Len(t, res.LocalUpserts, 5)
284		require.ElementsMatch(t, res.LocalUpserts, []string{
285			"72fac6a3-a014-41c8-9cb2-8d9a5e935f3d",
286			"44ef9aec-7654-4401-901b-4d4a8b3c80fc",
287			"8ea41efb-8519-4091-bc91-c42da0cda9ae",
288			"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
289			"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"})
290	})
291}
292
293func TestACLReplication_Tokens(t *testing.T) {
294	t.Parallel()
295	dir1, s1 := testServerWithConfig(t, func(c *Config) {
296		c.ACLDatacenter = "dc1"
297		c.ACLsEnabled = true
298		c.ACLMasterToken = "root"
299	})
300	defer os.RemoveAll(dir1)
301	defer s1.Shutdown()
302	testrpc.WaitForLeader(t, s1.RPC, "dc1")
303	client := rpcClient(t, s1)
304	defer client.Close()
305
306	dir2, s2 := testServerWithConfig(t, func(c *Config) {
307		c.Datacenter = "dc2"
308		c.ACLDatacenter = "dc1"
309		c.ACLsEnabled = true
310		c.ACLTokenReplication = true
311		c.ACLReplicationRate = 100
312		c.ACLReplicationBurst = 100
313		c.ACLReplicationApplyLimit = 1000000
314	})
315	s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
316	testrpc.WaitForLeader(t, s2.RPC, "dc2")
317	defer os.RemoveAll(dir2)
318	defer s2.Shutdown()
319
320	// Try to join.
321	joinWAN(t, s2, s1)
322	testrpc.WaitForLeader(t, s1.RPC, "dc1")
323	testrpc.WaitForLeader(t, s1.RPC, "dc2")
324
325	// Wait for legacy acls to be disabled so we are clear that
326	// legacy replication isn't meddling.
327	waitForNewACLs(t, s1)
328	waitForNewACLs(t, s2)
329	waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
330
331	// Create a bunch of new tokens and policies
332	var tokens structs.ACLTokens
333	for i := 0; i < 50; i++ {
334		arg := structs.ACLTokenSetRequest{
335			Datacenter: "dc1",
336			ACLToken: structs.ACLToken{
337				Description: fmt.Sprintf("token-%d", i),
338				Policies: []structs.ACLTokenPolicyLink{
339					structs.ACLTokenPolicyLink{
340						ID: structs.ACLPolicyGlobalManagementID,
341					},
342				},
343				Local: false,
344			},
345			WriteRequest: structs.WriteRequest{Token: "root"},
346		}
347		var token structs.ACLToken
348		require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
349		tokens = append(tokens, &token)
350	}
351
352	checkSame := func(t *retry.R) {
353		// only account for global tokens - local tokens shouldn't be replicated
354		index, remote, err := s1.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil)
355		require.NoError(t, err)
356		_, local, err := s2.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil)
357		require.NoError(t, err)
358
359		require.Len(t, local, len(remote))
360		for i, token := range remote {
361			require.Equal(t, token.Hash, local[i].Hash)
362		}
363
364		s2.aclReplicationStatusLock.RLock()
365		status := s2.aclReplicationStatus
366		s2.aclReplicationStatusLock.RUnlock()
367
368		require.True(t, status.Enabled)
369		require.True(t, status.Running)
370		require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens)
371		require.Equal(t, status.ReplicatedTokenIndex, index)
372		require.Equal(t, status.SourceDatacenter, "dc1")
373	}
374	// Wait for the replica to converge.
375	retry.Run(t, func(r *retry.R) {
376		checkSame(r)
377	})
378
379	// Wait for s2 global-management policy
380	retry.Run(t, func(r *retry.R) {
381		_, policy, err := s2.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, nil)
382		require.NoError(r, err)
383		require.NotNil(r, policy)
384	})
385
386	// add some local tokens to the secondary DC
387	// these shouldn't be deleted by replication
388	for i := 0; i < 50; i++ {
389		arg := structs.ACLTokenSetRequest{
390			Datacenter: "dc2",
391			ACLToken: structs.ACLToken{
392				Description: fmt.Sprintf("token-%d", i),
393				Policies: []structs.ACLTokenPolicyLink{
394					structs.ACLTokenPolicyLink{
395						ID: structs.ACLPolicyGlobalManagementID,
396					},
397				},
398				Local: true,
399			},
400			WriteRequest: structs.WriteRequest{Token: "root"},
401		}
402		var token structs.ACLToken
403		require.NoError(t, s2.RPC("ACL.TokenSet", &arg, &token))
404	}
405
406	// add some local tokens to the primary DC
407	// these shouldn't be replicated to the secondary DC
408	for i := 0; i < 50; i++ {
409		arg := structs.ACLTokenSetRequest{
410			Datacenter: "dc1",
411			ACLToken: structs.ACLToken{
412				Description: fmt.Sprintf("token-%d", i),
413				Policies: []structs.ACLTokenPolicyLink{
414					structs.ACLTokenPolicyLink{
415						ID: structs.ACLPolicyGlobalManagementID,
416					},
417				},
418				Local: true,
419			},
420			WriteRequest: structs.WriteRequest{Token: "root"},
421		}
422		var token structs.ACLToken
423		require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
424	}
425
426	// Update those other tokens
427	for i := 0; i < 50; i++ {
428		arg := structs.ACLTokenSetRequest{
429			Datacenter: "dc1",
430			ACLToken: structs.ACLToken{
431				AccessorID:  tokens[i].AccessorID,
432				SecretID:    tokens[i].SecretID,
433				Description: fmt.Sprintf("token-%d-modified", i),
434				Policies: []structs.ACLTokenPolicyLink{
435					structs.ACLTokenPolicyLink{
436						ID: structs.ACLPolicyGlobalManagementID,
437					},
438				},
439				Local: false,
440			},
441			WriteRequest: structs.WriteRequest{Token: "root"},
442		}
443		var token structs.ACLToken
444		require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
445	}
446
447	// Wait for the replica to converge.
448	// this time it also verifies the local tokens from the primary were not replicated.
449	retry.Run(t, func(r *retry.R) {
450		checkSame(r)
451	})
452
453	// verify dc2 local tokens didn't get blown away
454	_, local, err := s2.fsm.State().ACLTokenList(nil, true, false, "", "", "", nil, nil)
455	require.NoError(t, err)
456	require.Len(t, local, 50)
457
458	for _, token := range tokens {
459		arg := structs.ACLTokenDeleteRequest{
460			Datacenter:   "dc1",
461			TokenID:      token.AccessorID,
462			WriteRequest: structs.WriteRequest{Token: "root"},
463		}
464
465		var dontCare string
466		require.NoError(t, s1.RPC("ACL.TokenDelete", &arg, &dontCare))
467	}
468
469	// Wait for the replica to converge.
470	retry.Run(t, func(r *retry.R) {
471		checkSame(r)
472	})
473}
474
475func TestACLReplication_Policies(t *testing.T) {
476	t.Parallel()
477	dir1, s1 := testServerWithConfig(t, func(c *Config) {
478		c.ACLDatacenter = "dc1"
479		c.ACLsEnabled = true
480		c.ACLMasterToken = "root"
481	})
482	defer os.RemoveAll(dir1)
483	defer s1.Shutdown()
484	testrpc.WaitForLeader(t, s1.RPC, "dc1")
485	client := rpcClient(t, s1)
486	defer client.Close()
487
488	dir2, s2 := testServerWithConfig(t, func(c *Config) {
489		c.Datacenter = "dc2"
490		c.ACLDatacenter = "dc1"
491		c.ACLsEnabled = true
492		c.ACLTokenReplication = false
493		c.ACLReplicationRate = 100
494		c.ACLReplicationBurst = 100
495		c.ACLReplicationApplyLimit = 1000000
496	})
497	s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
498	testrpc.WaitForLeader(t, s2.RPC, "dc2")
499	defer os.RemoveAll(dir2)
500	defer s2.Shutdown()
501
502	// Try to join.
503	joinWAN(t, s2, s1)
504	testrpc.WaitForLeader(t, s1.RPC, "dc1")
505	testrpc.WaitForLeader(t, s1.RPC, "dc2")
506
507	// Wait for legacy acls to be disabled so we are clear that
508	// legacy replication isn't meddling.
509	waitForNewACLs(t, s1)
510	waitForNewACLs(t, s2)
511	waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0)
512
513	// Create a bunch of new policies
514	var policies structs.ACLPolicies
515	for i := 0; i < 50; i++ {
516		arg := structs.ACLPolicySetRequest{
517			Datacenter: "dc1",
518			Policy: structs.ACLPolicy{
519				Name:        fmt.Sprintf("token-%d", i),
520				Description: fmt.Sprintf("token-%d", i),
521				Rules:       fmt.Sprintf(`service "app-%d" { policy = "read" }`, i),
522			},
523			WriteRequest: structs.WriteRequest{Token: "root"},
524		}
525		var policy structs.ACLPolicy
526		require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy))
527		policies = append(policies, &policy)
528	}
529
530	checkSame := func(t *retry.R) {
531		// only account for global tokens - local tokens shouldn't be replicated
532		index, remote, err := s1.fsm.State().ACLPolicyList(nil, nil)
533		require.NoError(t, err)
534		_, local, err := s2.fsm.State().ACLPolicyList(nil, nil)
535		require.NoError(t, err)
536
537		require.Len(t, local, len(remote))
538		for i, policy := range remote {
539			require.Equal(t, policy.Hash, local[i].Hash)
540		}
541
542		s2.aclReplicationStatusLock.RLock()
543		status := s2.aclReplicationStatus
544		s2.aclReplicationStatusLock.RUnlock()
545
546		require.True(t, status.Enabled)
547		require.True(t, status.Running)
548		require.Equal(t, status.ReplicationType, structs.ACLReplicatePolicies)
549		require.Equal(t, status.ReplicatedIndex, index)
550		require.Equal(t, status.SourceDatacenter, "dc1")
551	}
552	// Wait for the replica to converge.
553	retry.Run(t, func(r *retry.R) {
554		checkSame(r)
555	})
556
557	// Update those policies
558	for i := 0; i < 50; i++ {
559		arg := structs.ACLPolicySetRequest{
560			Datacenter: "dc1",
561			Policy: structs.ACLPolicy{
562				ID:          policies[i].ID,
563				Name:        fmt.Sprintf("token-%d-modified", i),
564				Description: fmt.Sprintf("token-%d-modified", i),
565				Rules:       policies[i].Rules,
566			},
567			WriteRequest: structs.WriteRequest{Token: "root"},
568		}
569		var policy structs.ACLPolicy
570		require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy))
571	}
572
573	// Wait for the replica to converge.
574	// this time it also verifies the local tokens from the primary were not replicated.
575	retry.Run(t, func(r *retry.R) {
576		checkSame(r)
577	})
578
579	for _, policy := range policies {
580		arg := structs.ACLPolicyDeleteRequest{
581			Datacenter:   "dc1",
582			PolicyID:     policy.ID,
583			WriteRequest: structs.WriteRequest{Token: "root"},
584		}
585
586		var dontCare string
587		require.NoError(t, s1.RPC("ACL.PolicyDelete", &arg, &dontCare))
588	}
589
590	// Wait for the replica to converge.
591	retry.Run(t, func(r *retry.R) {
592		checkSame(r)
593	})
594}
595
596func TestACLReplication_TokensRedacted(t *testing.T) {
597	t.Parallel()
598	dir1, s1 := testServerWithConfig(t, func(c *Config) {
599		c.ACLDatacenter = "dc1"
600		c.ACLsEnabled = true
601		c.ACLMasterToken = "root"
602	})
603	defer os.RemoveAll(dir1)
604	defer s1.Shutdown()
605	testrpc.WaitForLeader(t, s1.RPC, "dc1")
606	client := rpcClient(t, s1)
607	defer client.Close()
608
609	// Create the ACL Write Policy
610	policyArg := structs.ACLPolicySetRequest{
611		Datacenter: "dc1",
612		Policy: structs.ACLPolicy{
613			Name:        "token-replication-redacted",
614			Description: "token-replication-redacted",
615			Rules:       `acl = "write"`,
616		},
617		WriteRequest: structs.WriteRequest{Token: "root"},
618	}
619	var policy structs.ACLPolicy
620	require.NoError(t, s1.RPC("ACL.PolicySet", &policyArg, &policy))
621
622	// Create the dc2 replication token
623	tokenArg := structs.ACLTokenSetRequest{
624		Datacenter: "dc1",
625		ACLToken: structs.ACLToken{
626			Description: "dc2-replication",
627			Policies: []structs.ACLTokenPolicyLink{
628				structs.ACLTokenPolicyLink{
629					ID: policy.ID,
630				},
631			},
632			Local: false,
633		},
634		WriteRequest: structs.WriteRequest{Token: "root"},
635	}
636
637	var token structs.ACLToken
638	require.NoError(t, s1.RPC("ACL.TokenSet", &tokenArg, &token))
639
640	dir2, s2 := testServerWithConfig(t, func(c *Config) {
641		c.Datacenter = "dc2"
642		c.ACLDatacenter = "dc1"
643		c.ACLsEnabled = true
644		c.ACLTokenReplication = true
645		c.ACLReplicationRate = 100
646		c.ACLReplicationBurst = 100
647		c.ACLReplicationApplyLimit = 1000000
648	})
649	s2.tokens.UpdateReplicationToken(token.SecretID, tokenStore.TokenSourceConfig)
650	testrpc.WaitForLeader(t, s2.RPC, "dc2")
651	defer os.RemoveAll(dir2)
652	defer s2.Shutdown()
653
654	// Try to join.
655	joinWAN(t, s2, s1)
656	testrpc.WaitForLeader(t, s2.RPC, "dc2")
657	testrpc.WaitForLeader(t, s2.RPC, "dc1")
658	waitForNewACLs(t, s2)
659
660	// ensures replication is working ok
661	retry.Run(t, func(r *retry.R) {
662		var tokenResp structs.ACLTokenResponse
663		req := structs.ACLTokenGetRequest{
664			Datacenter:   "dc2",
665			TokenID:      "root",
666			TokenIDType:  structs.ACLTokenSecret,
667			QueryOptions: structs.QueryOptions{Token: "root"},
668		}
669		err := s2.RPC("ACL.TokenRead", &req, &tokenResp)
670		require.NoError(r, err)
671		require.NotNil(r, tokenResp.Token)
672		require.Equal(r, "root", tokenResp.Token.SecretID)
673
674		var status structs.ACLReplicationStatus
675		statusReq := structs.DCSpecificRequest{
676			Datacenter: "dc2",
677		}
678		require.NoError(r, s2.RPC("ACL.ReplicationStatus", &statusReq, &status))
679		// ensures that tokens are not being synced
680		require.True(r, status.ReplicatedTokenIndex > 0, "ReplicatedTokenIndex not greater than 0")
681
682	})
683
684	// modify the replication policy to change to only granting read privileges
685	policyArg = structs.ACLPolicySetRequest{
686		Datacenter: "dc1",
687		Policy: structs.ACLPolicy{
688			ID:          policy.ID,
689			Name:        "token-replication-redacted",
690			Description: "token-replication-redacted",
691			Rules:       `acl = "read"`,
692		},
693		WriteRequest: structs.WriteRequest{Token: "root"},
694	}
695	require.NoError(t, s1.RPC("ACL.PolicySet", &policyArg, &policy))
696
697	// Create the another token so that replication will attempt to read it.
698	tokenArg = structs.ACLTokenSetRequest{
699		Datacenter: "dc1",
700		ACLToken: structs.ACLToken{
701			Description: "management",
702			Policies: []structs.ACLTokenPolicyLink{
703				structs.ACLTokenPolicyLink{
704					ID: structs.ACLPolicyGlobalManagementID,
705				},
706			},
707			Local: false,
708		},
709		WriteRequest: structs.WriteRequest{Token: "root"},
710	}
711	var token2 structs.ACLToken
712
713	// record the time right before we are touching the token
714	minErrorTime := time.Now()
715	require.NoError(t, s1.RPC("ACL.TokenSet", &tokenArg, &token2))
716
717	retry.Run(t, func(r *retry.R) {
718		var tokenResp structs.ACLTokenResponse
719		req := structs.ACLTokenGetRequest{
720			Datacenter:   "dc2",
721			TokenID:      redactedToken,
722			TokenIDType:  structs.ACLTokenSecret,
723			QueryOptions: structs.QueryOptions{Token: redactedToken},
724		}
725		err := s2.RPC("ACL.TokenRead", &req, &tokenResp)
726		// its not an error for the secret to not be found.
727		require.NoError(r, err)
728		require.Nil(r, tokenResp.Token)
729
730		var status structs.ACLReplicationStatus
731		statusReq := structs.DCSpecificRequest{
732			Datacenter: "dc2",
733		}
734		require.NoError(r, s2.RPC("ACL.ReplicationStatus", &statusReq, &status))
735		// ensures that tokens are not being synced
736		require.True(r, status.ReplicatedTokenIndex < token2.CreateIndex, "ReplicatedTokenIndex is not less than the token2s create index")
737		// ensures that token replication is erroring
738		require.True(r, status.LastError.After(minErrorTime), "Replication LastError not after the minErrorTime")
739	})
740}
741
742func TestACLReplication_AllTypes(t *testing.T) {
743	t.Parallel()
744	dir1, s1 := testServerWithConfig(t, func(c *Config) {
745		c.ACLDatacenter = "dc1"
746		c.ACLsEnabled = true
747		c.ACLMasterToken = "root"
748	})
749	defer os.RemoveAll(dir1)
750	defer s1.Shutdown()
751	testrpc.WaitForLeader(t, s1.RPC, "dc1")
752	client := rpcClient(t, s1)
753	defer client.Close()
754
755	dir2, s2 := testServerWithConfig(t, func(c *Config) {
756		c.Datacenter = "dc2"
757		c.ACLDatacenter = "dc1"
758		c.ACLsEnabled = true
759		c.ACLTokenReplication = true
760		c.ACLReplicationRate = 100
761		c.ACLReplicationBurst = 25
762		c.ACLReplicationApplyLimit = 1000000
763	})
764	s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
765	testrpc.WaitForLeader(t, s2.RPC, "dc2")
766	defer os.RemoveAll(dir2)
767	defer s2.Shutdown()
768
769	// Try to join.
770	joinWAN(t, s2, s1)
771	testrpc.WaitForLeader(t, s1.RPC, "dc1")
772	testrpc.WaitForLeader(t, s1.RPC, "dc2")
773
774	// Wait for legacy acls to be disabled so we are clear that
775	// legacy replication isn't meddling.
776	waitForNewACLs(t, s1)
777	waitForNewACLs(t, s2)
778	waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
779
780	const (
781		numItems             = 50
782		numItemsThatAreLocal = 10
783	)
784
785	// Create some data.
786	policyIDs, roleIDs, tokenIDs := createACLTestData(t, s1, "b1", numItems, numItemsThatAreLocal)
787
788	checkSameTokens := func(t *retry.R) {
789		// only account for global tokens - local tokens shouldn't be replicated
790		index, remote, err := s1.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil)
791		require.NoError(t, err)
792		// Query for all of them, so that we can prove that no globals snuck in.
793		_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "", "", "", nil, nil)
794		require.NoError(t, err)
795
796		require.Len(t, remote, len(local))
797		for i, token := range remote {
798			require.Equal(t, token.Hash, local[i].Hash)
799		}
800
801		s2.aclReplicationStatusLock.RLock()
802		status := s2.aclReplicationStatus
803		s2.aclReplicationStatusLock.RUnlock()
804
805		require.True(t, status.Enabled)
806		require.True(t, status.Running)
807		require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens)
808		require.Equal(t, status.ReplicatedTokenIndex, index)
809		require.Equal(t, status.SourceDatacenter, "dc1")
810	}
811	checkSamePolicies := func(t *retry.R) {
812		index, remote, err := s1.fsm.State().ACLPolicyList(nil, nil)
813		require.NoError(t, err)
814		_, local, err := s2.fsm.State().ACLPolicyList(nil, nil)
815		require.NoError(t, err)
816
817		require.Len(t, remote, len(local))
818		for i, policy := range remote {
819			require.Equal(t, policy.Hash, local[i].Hash)
820		}
821
822		s2.aclReplicationStatusLock.RLock()
823		status := s2.aclReplicationStatus
824		s2.aclReplicationStatusLock.RUnlock()
825
826		require.True(t, status.Enabled)
827		require.True(t, status.Running)
828		require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens)
829		require.Equal(t, status.ReplicatedIndex, index)
830		require.Equal(t, status.SourceDatacenter, "dc1")
831	}
832	checkSameRoles := func(t *retry.R) {
833		index, remote, err := s1.fsm.State().ACLRoleList(nil, "", nil)
834		require.NoError(t, err)
835		_, local, err := s2.fsm.State().ACLRoleList(nil, "", nil)
836		require.NoError(t, err)
837
838		require.Len(t, remote, len(local))
839		for i, role := range remote {
840			require.Equal(t, role.Hash, local[i].Hash)
841		}
842
843		s2.aclReplicationStatusLock.RLock()
844		status := s2.aclReplicationStatus
845		s2.aclReplicationStatusLock.RUnlock()
846
847		require.True(t, status.Enabled)
848		require.True(t, status.Running)
849		require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens)
850		require.Equal(t, status.ReplicatedRoleIndex, index)
851		require.Equal(t, status.SourceDatacenter, "dc1")
852	}
853	checkSame := func(t *retry.R) {
854		checkSameTokens(t)
855		checkSamePolicies(t)
856		checkSameRoles(t)
857	}
858	// Wait for the replica to converge.
859	retry.Run(t, func(r *retry.R) {
860		checkSame(r)
861	})
862
863	// Create additional data to replicate.
864	_, _, _ = createACLTestData(t, s1, "b2", numItems, numItemsThatAreLocal)
865
866	// Wait for the replica to converge.
867	retry.Run(t, func(r *retry.R) {
868		checkSame(r)
869	})
870
871	// Delete one piece of each type of data from batch 1.
872	const itemToDelete = numItems - 1
873	{
874		id := tokenIDs[itemToDelete]
875
876		arg := structs.ACLTokenDeleteRequest{
877			Datacenter:   "dc1",
878			TokenID:      id,
879			WriteRequest: structs.WriteRequest{Token: "root"},
880		}
881		var dontCare string
882		if err := s1.RPC("ACL.TokenDelete", &arg, &dontCare); err != nil {
883			t.Fatalf("err: %v", err)
884		}
885	}
886	{
887		id := roleIDs[itemToDelete]
888
889		arg := structs.ACLRoleDeleteRequest{
890			Datacenter:   "dc1",
891			RoleID:       id,
892			WriteRequest: structs.WriteRequest{Token: "root"},
893		}
894		var dontCare string
895		if err := s1.RPC("ACL.RoleDelete", &arg, &dontCare); err != nil {
896			t.Fatalf("err: %v", err)
897		}
898	}
899	{
900		id := policyIDs[itemToDelete]
901
902		arg := structs.ACLPolicyDeleteRequest{
903			Datacenter:   "dc1",
904			PolicyID:     id,
905			WriteRequest: structs.WriteRequest{Token: "root"},
906		}
907		var dontCare string
908		if err := s1.RPC("ACL.PolicyDelete", &arg, &dontCare); err != nil {
909			t.Fatalf("err: %v", err)
910		}
911	}
912	// Wait for the replica to converge.
913	retry.Run(t, func(r *retry.R) {
914		checkSame(r)
915	})
916}
917
918func createACLTestData(t *testing.T, srv *Server, namePrefix string, numObjects, numItemsThatAreLocal int) (policyIDs, roleIDs, tokenIDs []string) {
919	require.True(t, numItemsThatAreLocal <= numObjects, 0, "numItemsThatAreLocal <= numObjects")
920
921	// Create some policies.
922	for i := 0; i < numObjects; i++ {
923		str := strconv.Itoa(i)
924		arg := structs.ACLPolicySetRequest{
925			Datacenter: "dc1",
926			Policy: structs.ACLPolicy{
927				Name:        namePrefix + "-policy-" + str,
928				Description: namePrefix + "-policy " + str,
929				Rules:       testACLPolicyNew,
930			},
931			WriteRequest: structs.WriteRequest{Token: "root"},
932		}
933		var out structs.ACLPolicy
934		if err := srv.RPC("ACL.PolicySet", &arg, &out); err != nil {
935			t.Fatalf("err: %v", err)
936		}
937		policyIDs = append(policyIDs, out.ID)
938	}
939
940	// Create some roles.
941	for i := 0; i < numObjects; i++ {
942		str := strconv.Itoa(i)
943		arg := structs.ACLRoleSetRequest{
944			Datacenter: "dc1",
945			Role: structs.ACLRole{
946				Name:        namePrefix + "-role-" + str,
947				Description: namePrefix + "-role " + str,
948				Policies: []structs.ACLRolePolicyLink{
949					{ID: policyIDs[i]},
950				},
951			},
952			WriteRequest: structs.WriteRequest{Token: "root"},
953		}
954		var out structs.ACLRole
955		if err := srv.RPC("ACL.RoleSet", &arg, &out); err != nil {
956			t.Fatalf("err: %v", err)
957		}
958		roleIDs = append(roleIDs, out.ID)
959	}
960
961	// Create a bunch of new tokens.
962	for i := 0; i < numObjects; i++ {
963		str := strconv.Itoa(i)
964		arg := structs.ACLTokenSetRequest{
965			Datacenter: "dc1",
966			ACLToken: structs.ACLToken{
967				Description: namePrefix + "-token " + str,
968				Policies: []structs.ACLTokenPolicyLink{
969					{ID: policyIDs[i]},
970				},
971				Roles: []structs.ACLTokenRoleLink{
972					{ID: roleIDs[i]},
973				},
974				Local: (i < numItemsThatAreLocal),
975			},
976			WriteRequest: structs.WriteRequest{Token: "root"},
977		}
978		var out structs.ACLToken
979		if err := srv.RPC("ACL.TokenSet", &arg, &out); err != nil {
980			t.Fatalf("err: %v", err)
981		}
982		tokenIDs = append(tokenIDs, out.AccessorID)
983	}
984
985	return policyIDs, roleIDs, tokenIDs
986}
987