1package consul
2
3import (
4	"os"
5	"testing"
6	"time"
7
8	"github.com/hashicorp/consul/agent/structs"
9	"github.com/hashicorp/consul/agent/token"
10	"github.com/hashicorp/consul/api"
11	"github.com/hashicorp/consul/sdk/testutil/retry"
12	"github.com/hashicorp/consul/testrpc"
13	"github.com/stretchr/testify/require"
14)
15
16func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) {
17	t.Parallel()
18
19	dir1, s1 := testServerWithConfig(t, func(c *Config) {
20		c.PrimaryDatacenter = "dc1"
21	})
22	defer os.RemoveAll(dir1)
23	defer s1.Shutdown()
24	testrpc.WaitForLeader(t, s1.RPC, "dc1")
25
26	dir2, s2 := testServerWithConfig(t, func(c *Config) {
27		c.Datacenter = "dc2"
28		c.PrimaryDatacenter = "dc1"
29		c.FederationStateReplicationRate = 100
30		c.FederationStateReplicationBurst = 100
31		c.FederationStateReplicationApplyLimit = 1000000
32		c.DisableFederationStateAntiEntropy = true
33	})
34	testrpc.WaitForLeader(t, s2.RPC, "dc2")
35	defer os.RemoveAll(dir2)
36	defer s2.Shutdown()
37
38	// Try to join.
39	joinWAN(t, s2, s1)
40	testrpc.WaitForLeader(t, s1.RPC, "dc1")
41	testrpc.WaitForLeader(t, s1.RPC, "dc2")
42
43	checkSame := func(t *testing.T, expectN, expectGatewaysInDC2 int) {
44		t.Helper()
45		retry.Run(t, func(r *retry.R) {
46			_, remote, err := s1.fsm.State().FederationStateList(nil)
47			require.NoError(r, err)
48			require.Len(r, remote, expectN)
49
50			_, local, err := s2.fsm.State().FederationStateList(nil)
51			require.NoError(r, err)
52			require.Len(r, local, expectN)
53
54			var fs2 *structs.FederationState
55			for _, fs := range local {
56				if fs.Datacenter == "dc2" {
57					fs2 = fs
58					break
59				}
60			}
61			if expectGatewaysInDC2 < 0 {
62				require.Nil(r, fs2)
63			} else {
64				require.NotNil(r, fs2)
65				require.Len(r, fs2.MeshGateways, expectGatewaysInDC2)
66			}
67		})
68	}
69
70	gatewayCSN1 := newTestMeshGatewayNode(
71		"dc2", "gateway1", "1.2.3.4", 443, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
72	)
73	gatewayCSN2 := newTestMeshGatewayNode(
74		"dc2", "gateway2", "4.3.2.1", 443, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
75	)
76
77	// populate with some stuff
78	makeFedState := func(t *testing.T, dc string, csn ...structs.CheckServiceNode) {
79		t.Helper()
80		arg := structs.FederationStateRequest{
81			Datacenter: "dc1",
82			Op:         structs.FederationStateUpsert,
83			State: &structs.FederationState{
84				Datacenter:   dc,
85				MeshGateways: csn,
86				UpdatedAt:    time.Now().UTC(),
87			},
88		}
89
90		out := false
91		require.NoError(t, s1.RPC("FederationState.Apply", &arg, &out))
92	}
93
94	makeGateways := func(t *testing.T, csn structs.CheckServiceNode) {
95		t.Helper()
96		const dc = "dc2"
97
98		arg := structs.RegisterRequest{
99			Datacenter: csn.Node.Datacenter,
100			Node:       csn.Node.Node,
101			Address:    csn.Node.Address,
102			Service:    csn.Service,
103			Checks:     csn.Checks,
104		}
105		var out struct{}
106		require.NoError(t, s2.RPC("Catalog.Register", &arg, &out))
107	}
108
109	type result struct {
110		idx        uint64
111		prev, curr *structs.FederationState
112		err        error
113	}
114
115	blockAgain := func(last uint64) <-chan result {
116		ch := make(chan result, 1)
117		go func() {
118			var res result
119			res.idx, res.prev, res.curr, res.err = s2.fetchFederationStateAntiEntropyDetails(&structs.QueryOptions{
120				MinQueryIndex:     last,
121				RequireConsistent: true,
122			})
123			ch <- res
124		}()
125		return ch
126	}
127
128	// wait for the primary to do one round of AE and replicate it
129	checkSame(t, 1, -1)
130
131	// // wait for change to be reflected as well
132	// makeFedState(t, "dc2")
133	// checkSame(t, 1)
134
135	// Do the initial fetch (len0 local gateways, upstream has nil fedstate)
136	res0 := <-blockAgain(0)
137	require.NoError(t, res0.err)
138
139	ch := blockAgain(res0.idx)
140
141	// bump the local mesh gateways; should unblock query
142	makeGateways(t, gatewayCSN1)
143
144	res1 := <-ch
145	require.NoError(t, res1.err)
146	require.NotEqual(t, res1.idx, res0.idx)
147	require.Nil(t, res1.prev)
148	require.Len(t, res1.curr.MeshGateways, 1)
149
150	checkSame(t, 1, -1) // no fed state update yet
151
152	ch = blockAgain(res1.idx)
153
154	// do manual AE
155	makeFedState(t, "dc2", gatewayCSN1)
156
157	res2 := <-ch
158	require.NoError(t, res2.err)
159	require.NotEqual(t, res2.idx, res1.idx)
160	require.Len(t, res2.prev.MeshGateways, 1)
161	require.Len(t, res2.curr.MeshGateways, 1)
162
163	checkSame(t, 2, 1)
164
165	ch = blockAgain(res2.idx)
166
167	// add another local mesh gateway
168	makeGateways(t, gatewayCSN2)
169
170	res3 := <-ch
171	require.NoError(t, res3.err)
172	require.NotEqual(t, res3.idx, res2.idx)
173	require.Len(t, res3.prev.MeshGateways, 1)
174	require.Len(t, res3.curr.MeshGateways, 2)
175
176	checkSame(t, 2, 1)
177
178	ch = blockAgain(res3.idx)
179
180	// do manual AE
181	makeFedState(t, "dc2", gatewayCSN1, gatewayCSN2)
182
183	res4 := <-ch
184	require.NoError(t, res4.err)
185	require.NotEqual(t, res4.idx, res3.idx)
186	require.Len(t, res4.prev.MeshGateways, 2)
187	require.Len(t, res4.curr.MeshGateways, 2)
188
189	checkSame(t, 2, 2)
190}
191
192func TestLeader_FederationStateAntiEntropyPruning(t *testing.T) {
193	t.Parallel()
194
195	dir1, s1 := testServerWithConfig(t, func(c *Config) {
196		c.PrimaryDatacenter = "dc1"
197	})
198	defer os.RemoveAll(dir1)
199	defer s1.Shutdown()
200	testrpc.WaitForLeader(t, s1.RPC, "dc1")
201	client := rpcClient(t, s1)
202	defer client.Close()
203
204	dir2, s2 := testServerWithConfig(t, func(c *Config) {
205		c.Datacenter = "dc2"
206		c.PrimaryDatacenter = "dc1"
207	})
208	testrpc.WaitForLeader(t, s2.RPC, "dc2")
209	defer os.RemoveAll(dir2)
210	defer s2.Shutdown()
211
212	// Try to join.
213	joinWAN(t, s2, s1)
214	testrpc.WaitForLeader(t, s1.RPC, "dc1")
215	testrpc.WaitForLeader(t, s1.RPC, "dc2")
216
217	checkSame := func(r *retry.R) error {
218		_, remote, err := s1.fsm.State().FederationStateList(nil)
219		require.NoError(r, err)
220		_, local, err := s2.fsm.State().FederationStateList(nil)
221		require.NoError(r, err)
222
223		require.Len(r, remote, 2)
224		require.Len(r, local, 2)
225		for i, _ := range remote {
226			// zero out the raft data for future comparisons
227			remote[i].RaftIndex = structs.RaftIndex{}
228			local[i].RaftIndex = structs.RaftIndex{}
229			require.Equal(r, remote[i], local[i])
230		}
231		return nil
232	}
233
234	// Wait for the replica to converge.
235	retry.Run(t, func(r *retry.R) {
236		checkSame(r)
237	})
238
239	// Now leave and shutdown dc2.
240	require.NoError(t, s2.Leave())
241	require.NoError(t, s2.Shutdown())
242
243	// Wait until we know the router is updated.
244	retry.Run(t, func(r *retry.R) {
245		dcs := s1.router.GetDatacenters()
246		require.Len(r, dcs, 1)
247		require.Equal(r, "dc1", dcs[0])
248	})
249
250	// Since the background routine is going to run every hour, it likely is
251	// not going to run during this test, so it's safe to directly invoke the
252	// core method.
253	require.NoError(t, s1.pruneStaleFederationStates())
254
255	// Wait for dc2 to drop out.
256	retry.Run(t, func(r *retry.R) {
257		_, mine, err := s1.fsm.State().FederationStateList(nil)
258		require.NoError(r, err)
259
260		require.Len(r, mine, 1)
261		require.Equal(r, "dc1", mine[0].Datacenter)
262	})
263}
264
265func TestLeader_FederationStateAntiEntropyPruning_ACLDeny(t *testing.T) {
266	t.Parallel()
267
268	dir1, s1 := testServerWithConfig(t, func(c *Config) {
269		c.PrimaryDatacenter = "dc1"
270		c.ACLDatacenter = "dc1"
271		c.ACLsEnabled = true
272		c.ACLMasterToken = "root"
273		c.ACLDefaultPolicy = "deny"
274	})
275	defer os.RemoveAll(dir1)
276	defer s1.Shutdown()
277	testrpc.WaitForLeader(t, s1.RPC, "dc1")
278	client := rpcClient(t, s1)
279	defer client.Close()
280
281	dir2, s2 := testServerWithConfig(t, func(c *Config) {
282		c.Datacenter = "dc2"
283		c.PrimaryDatacenter = "dc1"
284		c.ACLDatacenter = "dc1"
285		c.ACLsEnabled = true
286		c.ACLMasterToken = "root"
287		c.ACLDefaultPolicy = "deny"
288	})
289	testrpc.WaitForLeader(t, s2.RPC, "dc2")
290	defer os.RemoveAll(dir2)
291	defer s2.Shutdown()
292
293	// Try to join.
294	joinWAN(t, s2, s1)
295	testrpc.WaitForLeader(t, s1.RPC, "dc1")
296	testrpc.WaitForLeader(t, s1.RPC, "dc2")
297
298	// Create the ACL token.
299	opWriteToken, err := upsertTestTokenWithPolicyRules(client, "root", "dc1", `operator = "write"`)
300	require.NoError(t, err)
301
302	require.True(t, s1.tokens.UpdateReplicationToken(opWriteToken.SecretID, token.TokenSourceAPI))
303	require.True(t, s2.tokens.UpdateReplicationToken(opWriteToken.SecretID, token.TokenSourceAPI))
304
305	checkSame := func(r *retry.R) error {
306		_, remote, err := s1.fsm.State().FederationStateList(nil)
307		require.NoError(r, err)
308		_, local, err := s2.fsm.State().FederationStateList(nil)
309		require.NoError(r, err)
310
311		require.Len(r, remote, 2)
312		require.Len(r, local, 2)
313		for i, _ := range remote {
314			// zero out the raft data for future comparisons
315			remote[i].RaftIndex = structs.RaftIndex{}
316			local[i].RaftIndex = structs.RaftIndex{}
317			require.Equal(r, remote[i], local[i])
318		}
319		return nil
320	}
321
322	// Wait for the replica to converge.
323	retry.Run(t, func(r *retry.R) {
324		checkSame(r)
325	})
326
327	// Now leave and shutdown dc2.
328	require.NoError(t, s2.Leave())
329	require.NoError(t, s2.Shutdown())
330
331	// Wait until we know the router is updated.
332	retry.Run(t, func(r *retry.R) {
333		dcs := s1.router.GetDatacenters()
334		require.Len(r, dcs, 1)
335		require.Equal(r, "dc1", dcs[0])
336	})
337
338	// Since the background routine is going to run every hour, it likely is
339	// not going to run during this test, so it's safe to directly invoke the
340	// core method.
341	require.NoError(t, s1.pruneStaleFederationStates())
342
343	// Wait for dc2 to drop out.
344	retry.Run(t, func(r *retry.R) {
345		_, mine, err := s1.fsm.State().FederationStateList(nil)
346		require.NoError(r, err)
347
348		require.Len(r, mine, 1)
349		require.Equal(r, "dc1", mine[0].Datacenter)
350	})
351}
352