1package consul 2 3import ( 4 "os" 5 "testing" 6 "time" 7 8 "github.com/hashicorp/consul/agent/structs" 9 "github.com/hashicorp/consul/agent/token" 10 "github.com/hashicorp/consul/api" 11 "github.com/hashicorp/consul/sdk/testutil/retry" 12 "github.com/hashicorp/consul/testrpc" 13 "github.com/stretchr/testify/require" 14) 15 16func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) { 17 t.Parallel() 18 19 dir1, s1 := testServerWithConfig(t, func(c *Config) { 20 c.PrimaryDatacenter = "dc1" 21 }) 22 defer os.RemoveAll(dir1) 23 defer s1.Shutdown() 24 testrpc.WaitForLeader(t, s1.RPC, "dc1") 25 26 dir2, s2 := testServerWithConfig(t, func(c *Config) { 27 c.Datacenter = "dc2" 28 c.PrimaryDatacenter = "dc1" 29 c.FederationStateReplicationRate = 100 30 c.FederationStateReplicationBurst = 100 31 c.FederationStateReplicationApplyLimit = 1000000 32 c.DisableFederationStateAntiEntropy = true 33 }) 34 testrpc.WaitForLeader(t, s2.RPC, "dc2") 35 defer os.RemoveAll(dir2) 36 defer s2.Shutdown() 37 38 // Try to join. 39 joinWAN(t, s2, s1) 40 testrpc.WaitForLeader(t, s1.RPC, "dc1") 41 testrpc.WaitForLeader(t, s1.RPC, "dc2") 42 43 checkSame := func(t *testing.T, expectN, expectGatewaysInDC2 int) { 44 t.Helper() 45 retry.Run(t, func(r *retry.R) { 46 _, remote, err := s1.fsm.State().FederationStateList(nil) 47 require.NoError(r, err) 48 require.Len(r, remote, expectN) 49 50 _, local, err := s2.fsm.State().FederationStateList(nil) 51 require.NoError(r, err) 52 require.Len(r, local, expectN) 53 54 var fs2 *structs.FederationState 55 for _, fs := range local { 56 if fs.Datacenter == "dc2" { 57 fs2 = fs 58 break 59 } 60 } 61 if expectGatewaysInDC2 < 0 { 62 require.Nil(r, fs2) 63 } else { 64 require.NotNil(r, fs2) 65 require.Len(r, fs2.MeshGateways, expectGatewaysInDC2) 66 } 67 }) 68 } 69 70 gatewayCSN1 := newTestMeshGatewayNode( 71 "dc2", "gateway1", "1.2.3.4", 443, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing, 72 ) 73 gatewayCSN2 := newTestMeshGatewayNode( 74 "dc2", "gateway2", "4.3.2.1", 443, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing, 75 ) 76 77 // populate with some stuff 78 makeFedState := func(t *testing.T, dc string, csn ...structs.CheckServiceNode) { 79 t.Helper() 80 arg := structs.FederationStateRequest{ 81 Datacenter: "dc1", 82 Op: structs.FederationStateUpsert, 83 State: &structs.FederationState{ 84 Datacenter: dc, 85 MeshGateways: csn, 86 UpdatedAt: time.Now().UTC(), 87 }, 88 } 89 90 out := false 91 require.NoError(t, s1.RPC("FederationState.Apply", &arg, &out)) 92 } 93 94 makeGateways := func(t *testing.T, csn structs.CheckServiceNode) { 95 t.Helper() 96 const dc = "dc2" 97 98 arg := structs.RegisterRequest{ 99 Datacenter: csn.Node.Datacenter, 100 Node: csn.Node.Node, 101 Address: csn.Node.Address, 102 Service: csn.Service, 103 Checks: csn.Checks, 104 } 105 var out struct{} 106 require.NoError(t, s2.RPC("Catalog.Register", &arg, &out)) 107 } 108 109 type result struct { 110 idx uint64 111 prev, curr *structs.FederationState 112 err error 113 } 114 115 blockAgain := func(last uint64) <-chan result { 116 ch := make(chan result, 1) 117 go func() { 118 var res result 119 res.idx, res.prev, res.curr, res.err = s2.fetchFederationStateAntiEntropyDetails(&structs.QueryOptions{ 120 MinQueryIndex: last, 121 RequireConsistent: true, 122 }) 123 ch <- res 124 }() 125 return ch 126 } 127 128 // wait for the primary to do one round of AE and replicate it 129 checkSame(t, 1, -1) 130 131 // // wait for change to be reflected as well 132 // makeFedState(t, "dc2") 133 // checkSame(t, 1) 134 135 // Do the initial fetch (len0 local gateways, upstream has nil fedstate) 136 res0 := <-blockAgain(0) 137 require.NoError(t, res0.err) 138 139 ch := blockAgain(res0.idx) 140 141 // bump the local mesh gateways; should unblock query 142 makeGateways(t, gatewayCSN1) 143 144 res1 := <-ch 145 require.NoError(t, res1.err) 146 require.NotEqual(t, res1.idx, res0.idx) 147 require.Nil(t, res1.prev) 148 require.Len(t, res1.curr.MeshGateways, 1) 149 150 checkSame(t, 1, -1) // no fed state update yet 151 152 ch = blockAgain(res1.idx) 153 154 // do manual AE 155 makeFedState(t, "dc2", gatewayCSN1) 156 157 res2 := <-ch 158 require.NoError(t, res2.err) 159 require.NotEqual(t, res2.idx, res1.idx) 160 require.Len(t, res2.prev.MeshGateways, 1) 161 require.Len(t, res2.curr.MeshGateways, 1) 162 163 checkSame(t, 2, 1) 164 165 ch = blockAgain(res2.idx) 166 167 // add another local mesh gateway 168 makeGateways(t, gatewayCSN2) 169 170 res3 := <-ch 171 require.NoError(t, res3.err) 172 require.NotEqual(t, res3.idx, res2.idx) 173 require.Len(t, res3.prev.MeshGateways, 1) 174 require.Len(t, res3.curr.MeshGateways, 2) 175 176 checkSame(t, 2, 1) 177 178 ch = blockAgain(res3.idx) 179 180 // do manual AE 181 makeFedState(t, "dc2", gatewayCSN1, gatewayCSN2) 182 183 res4 := <-ch 184 require.NoError(t, res4.err) 185 require.NotEqual(t, res4.idx, res3.idx) 186 require.Len(t, res4.prev.MeshGateways, 2) 187 require.Len(t, res4.curr.MeshGateways, 2) 188 189 checkSame(t, 2, 2) 190} 191 192func TestLeader_FederationStateAntiEntropyPruning(t *testing.T) { 193 t.Parallel() 194 195 dir1, s1 := testServerWithConfig(t, func(c *Config) { 196 c.PrimaryDatacenter = "dc1" 197 }) 198 defer os.RemoveAll(dir1) 199 defer s1.Shutdown() 200 testrpc.WaitForLeader(t, s1.RPC, "dc1") 201 client := rpcClient(t, s1) 202 defer client.Close() 203 204 dir2, s2 := testServerWithConfig(t, func(c *Config) { 205 c.Datacenter = "dc2" 206 c.PrimaryDatacenter = "dc1" 207 }) 208 testrpc.WaitForLeader(t, s2.RPC, "dc2") 209 defer os.RemoveAll(dir2) 210 defer s2.Shutdown() 211 212 // Try to join. 213 joinWAN(t, s2, s1) 214 testrpc.WaitForLeader(t, s1.RPC, "dc1") 215 testrpc.WaitForLeader(t, s1.RPC, "dc2") 216 217 checkSame := func(r *retry.R) error { 218 _, remote, err := s1.fsm.State().FederationStateList(nil) 219 require.NoError(r, err) 220 _, local, err := s2.fsm.State().FederationStateList(nil) 221 require.NoError(r, err) 222 223 require.Len(r, remote, 2) 224 require.Len(r, local, 2) 225 for i, _ := range remote { 226 // zero out the raft data for future comparisons 227 remote[i].RaftIndex = structs.RaftIndex{} 228 local[i].RaftIndex = structs.RaftIndex{} 229 require.Equal(r, remote[i], local[i]) 230 } 231 return nil 232 } 233 234 // Wait for the replica to converge. 235 retry.Run(t, func(r *retry.R) { 236 checkSame(r) 237 }) 238 239 // Now leave and shutdown dc2. 240 require.NoError(t, s2.Leave()) 241 require.NoError(t, s2.Shutdown()) 242 243 // Wait until we know the router is updated. 244 retry.Run(t, func(r *retry.R) { 245 dcs := s1.router.GetDatacenters() 246 require.Len(r, dcs, 1) 247 require.Equal(r, "dc1", dcs[0]) 248 }) 249 250 // Since the background routine is going to run every hour, it likely is 251 // not going to run during this test, so it's safe to directly invoke the 252 // core method. 253 require.NoError(t, s1.pruneStaleFederationStates()) 254 255 // Wait for dc2 to drop out. 256 retry.Run(t, func(r *retry.R) { 257 _, mine, err := s1.fsm.State().FederationStateList(nil) 258 require.NoError(r, err) 259 260 require.Len(r, mine, 1) 261 require.Equal(r, "dc1", mine[0].Datacenter) 262 }) 263} 264 265func TestLeader_FederationStateAntiEntropyPruning_ACLDeny(t *testing.T) { 266 t.Parallel() 267 268 dir1, s1 := testServerWithConfig(t, func(c *Config) { 269 c.PrimaryDatacenter = "dc1" 270 c.ACLDatacenter = "dc1" 271 c.ACLsEnabled = true 272 c.ACLMasterToken = "root" 273 c.ACLDefaultPolicy = "deny" 274 }) 275 defer os.RemoveAll(dir1) 276 defer s1.Shutdown() 277 testrpc.WaitForLeader(t, s1.RPC, "dc1") 278 client := rpcClient(t, s1) 279 defer client.Close() 280 281 dir2, s2 := testServerWithConfig(t, func(c *Config) { 282 c.Datacenter = "dc2" 283 c.PrimaryDatacenter = "dc1" 284 c.ACLDatacenter = "dc1" 285 c.ACLsEnabled = true 286 c.ACLMasterToken = "root" 287 c.ACLDefaultPolicy = "deny" 288 }) 289 testrpc.WaitForLeader(t, s2.RPC, "dc2") 290 defer os.RemoveAll(dir2) 291 defer s2.Shutdown() 292 293 // Try to join. 294 joinWAN(t, s2, s1) 295 testrpc.WaitForLeader(t, s1.RPC, "dc1") 296 testrpc.WaitForLeader(t, s1.RPC, "dc2") 297 298 // Create the ACL token. 299 opWriteToken, err := upsertTestTokenWithPolicyRules(client, "root", "dc1", `operator = "write"`) 300 require.NoError(t, err) 301 302 require.True(t, s1.tokens.UpdateReplicationToken(opWriteToken.SecretID, token.TokenSourceAPI)) 303 require.True(t, s2.tokens.UpdateReplicationToken(opWriteToken.SecretID, token.TokenSourceAPI)) 304 305 checkSame := func(r *retry.R) error { 306 _, remote, err := s1.fsm.State().FederationStateList(nil) 307 require.NoError(r, err) 308 _, local, err := s2.fsm.State().FederationStateList(nil) 309 require.NoError(r, err) 310 311 require.Len(r, remote, 2) 312 require.Len(r, local, 2) 313 for i, _ := range remote { 314 // zero out the raft data for future comparisons 315 remote[i].RaftIndex = structs.RaftIndex{} 316 local[i].RaftIndex = structs.RaftIndex{} 317 require.Equal(r, remote[i], local[i]) 318 } 319 return nil 320 } 321 322 // Wait for the replica to converge. 323 retry.Run(t, func(r *retry.R) { 324 checkSame(r) 325 }) 326 327 // Now leave and shutdown dc2. 328 require.NoError(t, s2.Leave()) 329 require.NoError(t, s2.Shutdown()) 330 331 // Wait until we know the router is updated. 332 retry.Run(t, func(r *retry.R) { 333 dcs := s1.router.GetDatacenters() 334 require.Len(r, dcs, 1) 335 require.Equal(r, "dc1", dcs[0]) 336 }) 337 338 // Since the background routine is going to run every hour, it likely is 339 // not going to run during this test, so it's safe to directly invoke the 340 // core method. 341 require.NoError(t, s1.pruneStaleFederationStates()) 342 343 // Wait for dc2 to drop out. 344 retry.Run(t, func(r *retry.R) { 345 _, mine, err := s1.fsm.State().FederationStateList(nil) 346 require.NoError(r, err) 347 348 require.Len(r, mine, 1) 349 require.Equal(r, "dc1", mine[0].Datacenter) 350 }) 351} 352