1package consul 2 3import ( 4 "fmt" 5 "os" 6 "strconv" 7 "testing" 8 "time" 9 10 "github.com/hashicorp/consul/acl" 11 "github.com/hashicorp/consul/agent/structs" 12 tokenStore "github.com/hashicorp/consul/agent/token" 13 "github.com/hashicorp/consul/sdk/testutil/retry" 14 "github.com/hashicorp/consul/testrpc" 15 "github.com/stretchr/testify/require" 16) 17 18func TestACLReplication_diffACLPolicies(t *testing.T) { 19 diffACLPolicies := func(local structs.ACLPolicies, remote structs.ACLPolicyListStubs, lastRemoteIndex uint64) ([]string, []string) { 20 tr := &aclPolicyReplicator{local: local, remote: remote} 21 res := diffACLType(tr, lastRemoteIndex) 22 return res.LocalDeletes, res.LocalUpserts 23 } 24 local := structs.ACLPolicies{ 25 &structs.ACLPolicy{ 26 ID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 27 Name: "policy1", 28 Description: "policy1 - already in sync", 29 Rules: `acl = "read"`, 30 Syntax: acl.SyntaxCurrent, 31 Datacenters: nil, 32 Hash: []byte{1, 2, 3, 4}, 33 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, 34 }, 35 &structs.ACLPolicy{ 36 ID: "8ea41efb-8519-4091-bc91-c42da0cda9ae", 37 Name: "policy2", 38 Description: "policy2 - updated but not changed", 39 Rules: `acl = "read"`, 40 Syntax: acl.SyntaxCurrent, 41 Datacenters: nil, 42 Hash: []byte{1, 2, 3, 4}, 43 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25}, 44 }, 45 &structs.ACLPolicy{ 46 ID: "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 47 Name: "policy3", 48 Description: "policy3 - updated and changed", 49 Rules: `acl = "read"`, 50 Syntax: acl.SyntaxCurrent, 51 Datacenters: nil, 52 Hash: []byte{1, 2, 3, 4}, 53 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25}, 54 }, 55 &structs.ACLPolicy{ 56 ID: "e9d33298-6490-4466-99cb-ba93af64fa76", 57 Name: "policy4", 58 Description: "policy4 - needs deleting", 59 Rules: `acl = "read"`, 60 Syntax: acl.SyntaxCurrent, 61 Datacenters: nil, 62 Hash: []byte{1, 2, 3, 4}, 63 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25}, 64 }, 65 } 66 67 remote := structs.ACLPolicyListStubs{ 68 &structs.ACLPolicyListStub{ 69 ID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 70 Name: "policy1", 71 Description: "policy1 - already in sync", 72 Datacenters: nil, 73 Hash: []byte{1, 2, 3, 4}, 74 CreateIndex: 1, 75 ModifyIndex: 2, 76 }, 77 &structs.ACLPolicyListStub{ 78 ID: "8ea41efb-8519-4091-bc91-c42da0cda9ae", 79 Name: "policy2", 80 Description: "policy2 - updated but not changed", 81 Datacenters: nil, 82 Hash: []byte{1, 2, 3, 4}, 83 CreateIndex: 1, 84 ModifyIndex: 50, 85 }, 86 &structs.ACLPolicyListStub{ 87 ID: "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 88 Name: "policy3", 89 Description: "policy3 - updated and changed", 90 Datacenters: nil, 91 Hash: []byte{5, 6, 7, 8}, 92 CreateIndex: 1, 93 ModifyIndex: 50, 94 }, 95 &structs.ACLPolicyListStub{ 96 ID: "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926", 97 Name: "policy5", 98 Description: "policy5 - needs adding", 99 Datacenters: nil, 100 Hash: []byte{1, 2, 3, 4}, 101 CreateIndex: 1, 102 ModifyIndex: 50, 103 }, 104 } 105 106 // Do the full diff. This full exercises the main body of the loop 107 deletions, updates := diffACLPolicies(local, remote, 28) 108 require.Len(t, updates, 2) 109 require.ElementsMatch(t, updates, []string{ 110 "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926", 111 "539f1cb6-40aa-464f-ae66-a900d26bc1b2"}) 112 113 require.Len(t, deletions, 1) 114 require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", deletions[0]) 115 116 deletions, updates = diffACLPolicies(local, nil, 28) 117 require.Len(t, updates, 0) 118 require.Len(t, deletions, 4) 119 require.ElementsMatch(t, deletions, []string{ 120 "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 121 "8ea41efb-8519-4091-bc91-c42da0cda9ae", 122 "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 123 "e9d33298-6490-4466-99cb-ba93af64fa76"}) 124 125 deletions, updates = diffACLPolicies(nil, remote, 28) 126 require.Len(t, deletions, 0) 127 require.Len(t, updates, 4) 128 require.ElementsMatch(t, updates, []string{ 129 "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 130 "8ea41efb-8519-4091-bc91-c42da0cda9ae", 131 "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 132 "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"}) 133} 134 135func TestACLReplication_diffACLTokens(t *testing.T) { 136 diffACLTokens := func( 137 local structs.ACLTokens, 138 remote structs.ACLTokenListStubs, 139 lastRemoteIndex uint64, 140 ) itemDiffResults { 141 tr := &aclTokenReplicator{local: local, remote: remote} 142 return diffACLType(tr, lastRemoteIndex) 143 } 144 145 local := structs.ACLTokens{ 146 // When a just-upgraded (1.3->1.4+) secondary DC is replicating from an 147 // upgraded primary DC (1.4+), the local state for tokens predating the 148 // upgrade will lack AccessorIDs. 149 // 150 // The primary DC will lazily perform the update to assign AccessorIDs, 151 // and that new update will come across the wire locally as a new 152 // insert. 153 // 154 // We simulate that scenario here with 'token0' having no AccessorID in 155 // the secondary (local) DC and having an AccessorID assigned in the 156 // payload retrieved from the primary (remote) DC. 157 &structs.ACLToken{ 158 AccessorID: "", 159 SecretID: "5128289f-c22c-4d32-936e-7662443f1a55", 160 Description: "token0 - old and not yet upgraded", 161 Hash: []byte{1, 2, 3, 4}, 162 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 3}, 163 }, 164 &structs.ACLToken{ 165 AccessorID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 166 SecretID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 167 Description: "token1 - already in sync", 168 Hash: []byte{1, 2, 3, 4}, 169 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 2}, 170 }, 171 &structs.ACLToken{ 172 AccessorID: "8ea41efb-8519-4091-bc91-c42da0cda9ae", 173 SecretID: "8ea41efb-8519-4091-bc91-c42da0cda9ae", 174 Description: "token2 - updated but not changed", 175 Hash: []byte{1, 2, 3, 4}, 176 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25}, 177 }, 178 &structs.ACLToken{ 179 AccessorID: "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 180 SecretID: "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 181 Description: "token3 - updated and changed", 182 Hash: []byte{1, 2, 3, 4}, 183 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25}, 184 }, 185 &structs.ACLToken{ 186 AccessorID: "e9d33298-6490-4466-99cb-ba93af64fa76", 187 SecretID: "e9d33298-6490-4466-99cb-ba93af64fa76", 188 Description: "token4 - needs deleting", 189 Hash: []byte{1, 2, 3, 4}, 190 RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 25}, 191 }, 192 } 193 194 remote := structs.ACLTokenListStubs{ 195 &structs.ACLTokenListStub{ 196 AccessorID: "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d", 197 //SecretID: "5128289f-c22c-4d32-936e-7662443f1a55", (formerly) 198 Description: "token0 - old and not yet upgraded locally", 199 Hash: []byte{1, 2, 3, 4}, 200 CreateIndex: 1, 201 ModifyIndex: 3, 202 }, 203 &structs.ACLTokenListStub{ 204 AccessorID: "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 205 Description: "token1 - already in sync", 206 Hash: []byte{1, 2, 3, 4}, 207 CreateIndex: 1, 208 ModifyIndex: 2, 209 }, 210 &structs.ACLTokenListStub{ 211 AccessorID: "8ea41efb-8519-4091-bc91-c42da0cda9ae", 212 Description: "token2 - updated but not changed", 213 Hash: []byte{1, 2, 3, 4}, 214 CreateIndex: 1, 215 ModifyIndex: 50, 216 }, 217 &structs.ACLTokenListStub{ 218 AccessorID: "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 219 Description: "token3 - updated and changed", 220 Hash: []byte{5, 6, 7, 8}, 221 CreateIndex: 1, 222 ModifyIndex: 50, 223 }, 224 &structs.ACLTokenListStub{ 225 AccessorID: "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926", 226 Description: "token5 - needs adding", 227 Hash: []byte{1, 2, 3, 4}, 228 CreateIndex: 1, 229 ModifyIndex: 50, 230 }, 231 // When a 1.4+ secondary DC is replicating from a 1.4+ primary DC, 232 // tokens created using the legacy APIs will not initially have 233 // AccessorIDs assigned. That assignment is lazy (but in quick 234 // succession). 235 // 236 // The secondary (local) will see these in the api response as a stub 237 // with "" as the AccessorID. 238 // 239 // We simulate that here to verify that the secondary does the right 240 // thing by skipping them until it sees them with nonempty AccessorIDs. 241 &structs.ACLTokenListStub{ 242 AccessorID: "", 243 Description: "token6 - pending async AccessorID assignment", 244 Hash: []byte{1, 2, 3, 4}, 245 CreateIndex: 51, 246 ModifyIndex: 51, 247 }, 248 } 249 250 // Do the full diff. This full exercises the main body of the loop 251 t.Run("full-diff", func(t *testing.T) { 252 res := diffACLTokens(local, remote, 28) 253 require.Equal(t, 1, res.LocalSkipped) 254 require.Equal(t, 1, res.RemoteSkipped) 255 require.Len(t, res.LocalUpserts, 3) 256 require.ElementsMatch(t, res.LocalUpserts, []string{ 257 "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d", 258 "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926", 259 "539f1cb6-40aa-464f-ae66-a900d26bc1b2"}) 260 261 require.Len(t, res.LocalDeletes, 1) 262 require.Equal(t, "e9d33298-6490-4466-99cb-ba93af64fa76", res.LocalDeletes[0]) 263 }) 264 265 t.Run("only-local", func(t *testing.T) { 266 res := diffACLTokens(local, nil, 28) 267 require.Equal(t, 1, res.LocalSkipped) 268 require.Equal(t, 0, res.RemoteSkipped) 269 require.Len(t, res.LocalUpserts, 0) 270 require.Len(t, res.LocalDeletes, 4) 271 require.ElementsMatch(t, res.LocalDeletes, []string{ 272 "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 273 "8ea41efb-8519-4091-bc91-c42da0cda9ae", 274 "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 275 "e9d33298-6490-4466-99cb-ba93af64fa76"}) 276 }) 277 278 t.Run("only-remote", func(t *testing.T) { 279 res := diffACLTokens(nil, remote, 28) 280 require.Equal(t, 0, res.LocalSkipped) 281 require.Equal(t, 1, res.RemoteSkipped) 282 require.Len(t, res.LocalDeletes, 0) 283 require.Len(t, res.LocalUpserts, 5) 284 require.ElementsMatch(t, res.LocalUpserts, []string{ 285 "72fac6a3-a014-41c8-9cb2-8d9a5e935f3d", 286 "44ef9aec-7654-4401-901b-4d4a8b3c80fc", 287 "8ea41efb-8519-4091-bc91-c42da0cda9ae", 288 "539f1cb6-40aa-464f-ae66-a900d26bc1b2", 289 "c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"}) 290 }) 291} 292 293func TestACLReplication_Tokens(t *testing.T) { 294 t.Parallel() 295 dir1, s1 := testServerWithConfig(t, func(c *Config) { 296 c.ACLDatacenter = "dc1" 297 c.ACLsEnabled = true 298 c.ACLMasterToken = "root" 299 }) 300 defer os.RemoveAll(dir1) 301 defer s1.Shutdown() 302 testrpc.WaitForLeader(t, s1.RPC, "dc1") 303 client := rpcClient(t, s1) 304 defer client.Close() 305 306 dir2, s2 := testServerWithConfig(t, func(c *Config) { 307 c.Datacenter = "dc2" 308 c.ACLDatacenter = "dc1" 309 c.ACLsEnabled = true 310 c.ACLTokenReplication = true 311 c.ACLReplicationRate = 100 312 c.ACLReplicationBurst = 100 313 c.ACLReplicationApplyLimit = 1000000 314 }) 315 s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) 316 testrpc.WaitForLeader(t, s2.RPC, "dc2") 317 defer os.RemoveAll(dir2) 318 defer s2.Shutdown() 319 320 // Try to join. 321 joinWAN(t, s2, s1) 322 testrpc.WaitForLeader(t, s1.RPC, "dc1") 323 testrpc.WaitForLeader(t, s1.RPC, "dc2") 324 325 // Wait for legacy acls to be disabled so we are clear that 326 // legacy replication isn't meddling. 327 waitForNewACLs(t, s1) 328 waitForNewACLs(t, s2) 329 waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) 330 331 // Create a bunch of new tokens and policies 332 var tokens structs.ACLTokens 333 for i := 0; i < 50; i++ { 334 arg := structs.ACLTokenSetRequest{ 335 Datacenter: "dc1", 336 ACLToken: structs.ACLToken{ 337 Description: fmt.Sprintf("token-%d", i), 338 Policies: []structs.ACLTokenPolicyLink{ 339 structs.ACLTokenPolicyLink{ 340 ID: structs.ACLPolicyGlobalManagementID, 341 }, 342 }, 343 Local: false, 344 }, 345 WriteRequest: structs.WriteRequest{Token: "root"}, 346 } 347 var token structs.ACLToken 348 require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token)) 349 tokens = append(tokens, &token) 350 } 351 352 checkSame := func(t *retry.R) { 353 // only account for global tokens - local tokens shouldn't be replicated 354 index, remote, err := s1.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil) 355 require.NoError(t, err) 356 _, local, err := s2.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil) 357 require.NoError(t, err) 358 359 require.Len(t, local, len(remote)) 360 for i, token := range remote { 361 require.Equal(t, token.Hash, local[i].Hash) 362 } 363 364 s2.aclReplicationStatusLock.RLock() 365 status := s2.aclReplicationStatus 366 s2.aclReplicationStatusLock.RUnlock() 367 368 require.True(t, status.Enabled) 369 require.True(t, status.Running) 370 require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens) 371 require.Equal(t, status.ReplicatedTokenIndex, index) 372 require.Equal(t, status.SourceDatacenter, "dc1") 373 } 374 // Wait for the replica to converge. 375 retry.Run(t, func(r *retry.R) { 376 checkSame(r) 377 }) 378 379 // Wait for s2 global-management policy 380 retry.Run(t, func(r *retry.R) { 381 _, policy, err := s2.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, nil) 382 require.NoError(r, err) 383 require.NotNil(r, policy) 384 }) 385 386 // add some local tokens to the secondary DC 387 // these shouldn't be deleted by replication 388 for i := 0; i < 50; i++ { 389 arg := structs.ACLTokenSetRequest{ 390 Datacenter: "dc2", 391 ACLToken: structs.ACLToken{ 392 Description: fmt.Sprintf("token-%d", i), 393 Policies: []structs.ACLTokenPolicyLink{ 394 structs.ACLTokenPolicyLink{ 395 ID: structs.ACLPolicyGlobalManagementID, 396 }, 397 }, 398 Local: true, 399 }, 400 WriteRequest: structs.WriteRequest{Token: "root"}, 401 } 402 var token structs.ACLToken 403 require.NoError(t, s2.RPC("ACL.TokenSet", &arg, &token)) 404 } 405 406 // add some local tokens to the primary DC 407 // these shouldn't be replicated to the secondary DC 408 for i := 0; i < 50; i++ { 409 arg := structs.ACLTokenSetRequest{ 410 Datacenter: "dc1", 411 ACLToken: structs.ACLToken{ 412 Description: fmt.Sprintf("token-%d", i), 413 Policies: []structs.ACLTokenPolicyLink{ 414 structs.ACLTokenPolicyLink{ 415 ID: structs.ACLPolicyGlobalManagementID, 416 }, 417 }, 418 Local: true, 419 }, 420 WriteRequest: structs.WriteRequest{Token: "root"}, 421 } 422 var token structs.ACLToken 423 require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token)) 424 } 425 426 // Update those other tokens 427 for i := 0; i < 50; i++ { 428 arg := structs.ACLTokenSetRequest{ 429 Datacenter: "dc1", 430 ACLToken: structs.ACLToken{ 431 AccessorID: tokens[i].AccessorID, 432 SecretID: tokens[i].SecretID, 433 Description: fmt.Sprintf("token-%d-modified", i), 434 Policies: []structs.ACLTokenPolicyLink{ 435 structs.ACLTokenPolicyLink{ 436 ID: structs.ACLPolicyGlobalManagementID, 437 }, 438 }, 439 Local: false, 440 }, 441 WriteRequest: structs.WriteRequest{Token: "root"}, 442 } 443 var token structs.ACLToken 444 require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token)) 445 } 446 447 // Wait for the replica to converge. 448 // this time it also verifies the local tokens from the primary were not replicated. 449 retry.Run(t, func(r *retry.R) { 450 checkSame(r) 451 }) 452 453 // verify dc2 local tokens didn't get blown away 454 _, local, err := s2.fsm.State().ACLTokenList(nil, true, false, "", "", "", nil, nil) 455 require.NoError(t, err) 456 require.Len(t, local, 50) 457 458 for _, token := range tokens { 459 arg := structs.ACLTokenDeleteRequest{ 460 Datacenter: "dc1", 461 TokenID: token.AccessorID, 462 WriteRequest: structs.WriteRequest{Token: "root"}, 463 } 464 465 var dontCare string 466 require.NoError(t, s1.RPC("ACL.TokenDelete", &arg, &dontCare)) 467 } 468 469 // Wait for the replica to converge. 470 retry.Run(t, func(r *retry.R) { 471 checkSame(r) 472 }) 473} 474 475func TestACLReplication_Policies(t *testing.T) { 476 t.Parallel() 477 dir1, s1 := testServerWithConfig(t, func(c *Config) { 478 c.ACLDatacenter = "dc1" 479 c.ACLsEnabled = true 480 c.ACLMasterToken = "root" 481 }) 482 defer os.RemoveAll(dir1) 483 defer s1.Shutdown() 484 testrpc.WaitForLeader(t, s1.RPC, "dc1") 485 client := rpcClient(t, s1) 486 defer client.Close() 487 488 dir2, s2 := testServerWithConfig(t, func(c *Config) { 489 c.Datacenter = "dc2" 490 c.ACLDatacenter = "dc1" 491 c.ACLsEnabled = true 492 c.ACLTokenReplication = false 493 c.ACLReplicationRate = 100 494 c.ACLReplicationBurst = 100 495 c.ACLReplicationApplyLimit = 1000000 496 }) 497 s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) 498 testrpc.WaitForLeader(t, s2.RPC, "dc2") 499 defer os.RemoveAll(dir2) 500 defer s2.Shutdown() 501 502 // Try to join. 503 joinWAN(t, s2, s1) 504 testrpc.WaitForLeader(t, s1.RPC, "dc1") 505 testrpc.WaitForLeader(t, s1.RPC, "dc2") 506 507 // Wait for legacy acls to be disabled so we are clear that 508 // legacy replication isn't meddling. 509 waitForNewACLs(t, s1) 510 waitForNewACLs(t, s2) 511 waitForNewACLReplication(t, s2, structs.ACLReplicatePolicies, 1, 0, 0) 512 513 // Create a bunch of new policies 514 var policies structs.ACLPolicies 515 for i := 0; i < 50; i++ { 516 arg := structs.ACLPolicySetRequest{ 517 Datacenter: "dc1", 518 Policy: structs.ACLPolicy{ 519 Name: fmt.Sprintf("token-%d", i), 520 Description: fmt.Sprintf("token-%d", i), 521 Rules: fmt.Sprintf(`service "app-%d" { policy = "read" }`, i), 522 }, 523 WriteRequest: structs.WriteRequest{Token: "root"}, 524 } 525 var policy structs.ACLPolicy 526 require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy)) 527 policies = append(policies, &policy) 528 } 529 530 checkSame := func(t *retry.R) { 531 // only account for global tokens - local tokens shouldn't be replicated 532 index, remote, err := s1.fsm.State().ACLPolicyList(nil, nil) 533 require.NoError(t, err) 534 _, local, err := s2.fsm.State().ACLPolicyList(nil, nil) 535 require.NoError(t, err) 536 537 require.Len(t, local, len(remote)) 538 for i, policy := range remote { 539 require.Equal(t, policy.Hash, local[i].Hash) 540 } 541 542 s2.aclReplicationStatusLock.RLock() 543 status := s2.aclReplicationStatus 544 s2.aclReplicationStatusLock.RUnlock() 545 546 require.True(t, status.Enabled) 547 require.True(t, status.Running) 548 require.Equal(t, status.ReplicationType, structs.ACLReplicatePolicies) 549 require.Equal(t, status.ReplicatedIndex, index) 550 require.Equal(t, status.SourceDatacenter, "dc1") 551 } 552 // Wait for the replica to converge. 553 retry.Run(t, func(r *retry.R) { 554 checkSame(r) 555 }) 556 557 // Update those policies 558 for i := 0; i < 50; i++ { 559 arg := structs.ACLPolicySetRequest{ 560 Datacenter: "dc1", 561 Policy: structs.ACLPolicy{ 562 ID: policies[i].ID, 563 Name: fmt.Sprintf("token-%d-modified", i), 564 Description: fmt.Sprintf("token-%d-modified", i), 565 Rules: policies[i].Rules, 566 }, 567 WriteRequest: structs.WriteRequest{Token: "root"}, 568 } 569 var policy structs.ACLPolicy 570 require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy)) 571 } 572 573 // Wait for the replica to converge. 574 // this time it also verifies the local tokens from the primary were not replicated. 575 retry.Run(t, func(r *retry.R) { 576 checkSame(r) 577 }) 578 579 for _, policy := range policies { 580 arg := structs.ACLPolicyDeleteRequest{ 581 Datacenter: "dc1", 582 PolicyID: policy.ID, 583 WriteRequest: structs.WriteRequest{Token: "root"}, 584 } 585 586 var dontCare string 587 require.NoError(t, s1.RPC("ACL.PolicyDelete", &arg, &dontCare)) 588 } 589 590 // Wait for the replica to converge. 591 retry.Run(t, func(r *retry.R) { 592 checkSame(r) 593 }) 594} 595 596func TestACLReplication_TokensRedacted(t *testing.T) { 597 t.Parallel() 598 dir1, s1 := testServerWithConfig(t, func(c *Config) { 599 c.ACLDatacenter = "dc1" 600 c.ACLsEnabled = true 601 c.ACLMasterToken = "root" 602 }) 603 defer os.RemoveAll(dir1) 604 defer s1.Shutdown() 605 testrpc.WaitForLeader(t, s1.RPC, "dc1") 606 client := rpcClient(t, s1) 607 defer client.Close() 608 609 // Create the ACL Write Policy 610 policyArg := structs.ACLPolicySetRequest{ 611 Datacenter: "dc1", 612 Policy: structs.ACLPolicy{ 613 Name: "token-replication-redacted", 614 Description: "token-replication-redacted", 615 Rules: `acl = "write"`, 616 }, 617 WriteRequest: structs.WriteRequest{Token: "root"}, 618 } 619 var policy structs.ACLPolicy 620 require.NoError(t, s1.RPC("ACL.PolicySet", &policyArg, &policy)) 621 622 // Create the dc2 replication token 623 tokenArg := structs.ACLTokenSetRequest{ 624 Datacenter: "dc1", 625 ACLToken: structs.ACLToken{ 626 Description: "dc2-replication", 627 Policies: []structs.ACLTokenPolicyLink{ 628 structs.ACLTokenPolicyLink{ 629 ID: policy.ID, 630 }, 631 }, 632 Local: false, 633 }, 634 WriteRequest: structs.WriteRequest{Token: "root"}, 635 } 636 637 var token structs.ACLToken 638 require.NoError(t, s1.RPC("ACL.TokenSet", &tokenArg, &token)) 639 640 dir2, s2 := testServerWithConfig(t, func(c *Config) { 641 c.Datacenter = "dc2" 642 c.ACLDatacenter = "dc1" 643 c.ACLsEnabled = true 644 c.ACLTokenReplication = true 645 c.ACLReplicationRate = 100 646 c.ACLReplicationBurst = 100 647 c.ACLReplicationApplyLimit = 1000000 648 }) 649 s2.tokens.UpdateReplicationToken(token.SecretID, tokenStore.TokenSourceConfig) 650 testrpc.WaitForLeader(t, s2.RPC, "dc2") 651 defer os.RemoveAll(dir2) 652 defer s2.Shutdown() 653 654 // Try to join. 655 joinWAN(t, s2, s1) 656 testrpc.WaitForLeader(t, s2.RPC, "dc2") 657 testrpc.WaitForLeader(t, s2.RPC, "dc1") 658 waitForNewACLs(t, s2) 659 660 // ensures replication is working ok 661 retry.Run(t, func(r *retry.R) { 662 var tokenResp structs.ACLTokenResponse 663 req := structs.ACLTokenGetRequest{ 664 Datacenter: "dc2", 665 TokenID: "root", 666 TokenIDType: structs.ACLTokenSecret, 667 QueryOptions: structs.QueryOptions{Token: "root"}, 668 } 669 err := s2.RPC("ACL.TokenRead", &req, &tokenResp) 670 require.NoError(r, err) 671 require.NotNil(r, tokenResp.Token) 672 require.Equal(r, "root", tokenResp.Token.SecretID) 673 674 var status structs.ACLReplicationStatus 675 statusReq := structs.DCSpecificRequest{ 676 Datacenter: "dc2", 677 } 678 require.NoError(r, s2.RPC("ACL.ReplicationStatus", &statusReq, &status)) 679 // ensures that tokens are not being synced 680 require.True(r, status.ReplicatedTokenIndex > 0, "ReplicatedTokenIndex not greater than 0") 681 682 }) 683 684 // modify the replication policy to change to only granting read privileges 685 policyArg = structs.ACLPolicySetRequest{ 686 Datacenter: "dc1", 687 Policy: structs.ACLPolicy{ 688 ID: policy.ID, 689 Name: "token-replication-redacted", 690 Description: "token-replication-redacted", 691 Rules: `acl = "read"`, 692 }, 693 WriteRequest: structs.WriteRequest{Token: "root"}, 694 } 695 require.NoError(t, s1.RPC("ACL.PolicySet", &policyArg, &policy)) 696 697 // Create the another token so that replication will attempt to read it. 698 tokenArg = structs.ACLTokenSetRequest{ 699 Datacenter: "dc1", 700 ACLToken: structs.ACLToken{ 701 Description: "management", 702 Policies: []structs.ACLTokenPolicyLink{ 703 structs.ACLTokenPolicyLink{ 704 ID: structs.ACLPolicyGlobalManagementID, 705 }, 706 }, 707 Local: false, 708 }, 709 WriteRequest: structs.WriteRequest{Token: "root"}, 710 } 711 var token2 structs.ACLToken 712 713 // record the time right before we are touching the token 714 minErrorTime := time.Now() 715 require.NoError(t, s1.RPC("ACL.TokenSet", &tokenArg, &token2)) 716 717 retry.Run(t, func(r *retry.R) { 718 var tokenResp structs.ACLTokenResponse 719 req := structs.ACLTokenGetRequest{ 720 Datacenter: "dc2", 721 TokenID: redactedToken, 722 TokenIDType: structs.ACLTokenSecret, 723 QueryOptions: structs.QueryOptions{Token: redactedToken}, 724 } 725 err := s2.RPC("ACL.TokenRead", &req, &tokenResp) 726 // its not an error for the secret to not be found. 727 require.NoError(r, err) 728 require.Nil(r, tokenResp.Token) 729 730 var status structs.ACLReplicationStatus 731 statusReq := structs.DCSpecificRequest{ 732 Datacenter: "dc2", 733 } 734 require.NoError(r, s2.RPC("ACL.ReplicationStatus", &statusReq, &status)) 735 // ensures that tokens are not being synced 736 require.True(r, status.ReplicatedTokenIndex < token2.CreateIndex, "ReplicatedTokenIndex is not less than the token2s create index") 737 // ensures that token replication is erroring 738 require.True(r, status.LastError.After(minErrorTime), "Replication LastError not after the minErrorTime") 739 }) 740} 741 742func TestACLReplication_AllTypes(t *testing.T) { 743 t.Parallel() 744 dir1, s1 := testServerWithConfig(t, func(c *Config) { 745 c.ACLDatacenter = "dc1" 746 c.ACLsEnabled = true 747 c.ACLMasterToken = "root" 748 }) 749 defer os.RemoveAll(dir1) 750 defer s1.Shutdown() 751 testrpc.WaitForLeader(t, s1.RPC, "dc1") 752 client := rpcClient(t, s1) 753 defer client.Close() 754 755 dir2, s2 := testServerWithConfig(t, func(c *Config) { 756 c.Datacenter = "dc2" 757 c.ACLDatacenter = "dc1" 758 c.ACLsEnabled = true 759 c.ACLTokenReplication = true 760 c.ACLReplicationRate = 100 761 c.ACLReplicationBurst = 25 762 c.ACLReplicationApplyLimit = 1000000 763 }) 764 s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig) 765 testrpc.WaitForLeader(t, s2.RPC, "dc2") 766 defer os.RemoveAll(dir2) 767 defer s2.Shutdown() 768 769 // Try to join. 770 joinWAN(t, s2, s1) 771 testrpc.WaitForLeader(t, s1.RPC, "dc1") 772 testrpc.WaitForLeader(t, s1.RPC, "dc2") 773 774 // Wait for legacy acls to be disabled so we are clear that 775 // legacy replication isn't meddling. 776 waitForNewACLs(t, s1) 777 waitForNewACLs(t, s2) 778 waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0) 779 780 const ( 781 numItems = 50 782 numItemsThatAreLocal = 10 783 ) 784 785 // Create some data. 786 policyIDs, roleIDs, tokenIDs := createACLTestData(t, s1, "b1", numItems, numItemsThatAreLocal) 787 788 checkSameTokens := func(t *retry.R) { 789 // only account for global tokens - local tokens shouldn't be replicated 790 index, remote, err := s1.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil) 791 require.NoError(t, err) 792 // Query for all of them, so that we can prove that no globals snuck in. 793 _, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "", "", "", nil, nil) 794 require.NoError(t, err) 795 796 require.Len(t, remote, len(local)) 797 for i, token := range remote { 798 require.Equal(t, token.Hash, local[i].Hash) 799 } 800 801 s2.aclReplicationStatusLock.RLock() 802 status := s2.aclReplicationStatus 803 s2.aclReplicationStatusLock.RUnlock() 804 805 require.True(t, status.Enabled) 806 require.True(t, status.Running) 807 require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens) 808 require.Equal(t, status.ReplicatedTokenIndex, index) 809 require.Equal(t, status.SourceDatacenter, "dc1") 810 } 811 checkSamePolicies := func(t *retry.R) { 812 index, remote, err := s1.fsm.State().ACLPolicyList(nil, nil) 813 require.NoError(t, err) 814 _, local, err := s2.fsm.State().ACLPolicyList(nil, nil) 815 require.NoError(t, err) 816 817 require.Len(t, remote, len(local)) 818 for i, policy := range remote { 819 require.Equal(t, policy.Hash, local[i].Hash) 820 } 821 822 s2.aclReplicationStatusLock.RLock() 823 status := s2.aclReplicationStatus 824 s2.aclReplicationStatusLock.RUnlock() 825 826 require.True(t, status.Enabled) 827 require.True(t, status.Running) 828 require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens) 829 require.Equal(t, status.ReplicatedIndex, index) 830 require.Equal(t, status.SourceDatacenter, "dc1") 831 } 832 checkSameRoles := func(t *retry.R) { 833 index, remote, err := s1.fsm.State().ACLRoleList(nil, "", nil) 834 require.NoError(t, err) 835 _, local, err := s2.fsm.State().ACLRoleList(nil, "", nil) 836 require.NoError(t, err) 837 838 require.Len(t, remote, len(local)) 839 for i, role := range remote { 840 require.Equal(t, role.Hash, local[i].Hash) 841 } 842 843 s2.aclReplicationStatusLock.RLock() 844 status := s2.aclReplicationStatus 845 s2.aclReplicationStatusLock.RUnlock() 846 847 require.True(t, status.Enabled) 848 require.True(t, status.Running) 849 require.Equal(t, status.ReplicationType, structs.ACLReplicateTokens) 850 require.Equal(t, status.ReplicatedRoleIndex, index) 851 require.Equal(t, status.SourceDatacenter, "dc1") 852 } 853 checkSame := func(t *retry.R) { 854 checkSameTokens(t) 855 checkSamePolicies(t) 856 checkSameRoles(t) 857 } 858 // Wait for the replica to converge. 859 retry.Run(t, func(r *retry.R) { 860 checkSame(r) 861 }) 862 863 // Create additional data to replicate. 864 _, _, _ = createACLTestData(t, s1, "b2", numItems, numItemsThatAreLocal) 865 866 // Wait for the replica to converge. 867 retry.Run(t, func(r *retry.R) { 868 checkSame(r) 869 }) 870 871 // Delete one piece of each type of data from batch 1. 872 const itemToDelete = numItems - 1 873 { 874 id := tokenIDs[itemToDelete] 875 876 arg := structs.ACLTokenDeleteRequest{ 877 Datacenter: "dc1", 878 TokenID: id, 879 WriteRequest: structs.WriteRequest{Token: "root"}, 880 } 881 var dontCare string 882 if err := s1.RPC("ACL.TokenDelete", &arg, &dontCare); err != nil { 883 t.Fatalf("err: %v", err) 884 } 885 } 886 { 887 id := roleIDs[itemToDelete] 888 889 arg := structs.ACLRoleDeleteRequest{ 890 Datacenter: "dc1", 891 RoleID: id, 892 WriteRequest: structs.WriteRequest{Token: "root"}, 893 } 894 var dontCare string 895 if err := s1.RPC("ACL.RoleDelete", &arg, &dontCare); err != nil { 896 t.Fatalf("err: %v", err) 897 } 898 } 899 { 900 id := policyIDs[itemToDelete] 901 902 arg := structs.ACLPolicyDeleteRequest{ 903 Datacenter: "dc1", 904 PolicyID: id, 905 WriteRequest: structs.WriteRequest{Token: "root"}, 906 } 907 var dontCare string 908 if err := s1.RPC("ACL.PolicyDelete", &arg, &dontCare); err != nil { 909 t.Fatalf("err: %v", err) 910 } 911 } 912 // Wait for the replica to converge. 913 retry.Run(t, func(r *retry.R) { 914 checkSame(r) 915 }) 916} 917 918func createACLTestData(t *testing.T, srv *Server, namePrefix string, numObjects, numItemsThatAreLocal int) (policyIDs, roleIDs, tokenIDs []string) { 919 require.True(t, numItemsThatAreLocal <= numObjects, 0, "numItemsThatAreLocal <= numObjects") 920 921 // Create some policies. 922 for i := 0; i < numObjects; i++ { 923 str := strconv.Itoa(i) 924 arg := structs.ACLPolicySetRequest{ 925 Datacenter: "dc1", 926 Policy: structs.ACLPolicy{ 927 Name: namePrefix + "-policy-" + str, 928 Description: namePrefix + "-policy " + str, 929 Rules: testACLPolicyNew, 930 }, 931 WriteRequest: structs.WriteRequest{Token: "root"}, 932 } 933 var out structs.ACLPolicy 934 if err := srv.RPC("ACL.PolicySet", &arg, &out); err != nil { 935 t.Fatalf("err: %v", err) 936 } 937 policyIDs = append(policyIDs, out.ID) 938 } 939 940 // Create some roles. 941 for i := 0; i < numObjects; i++ { 942 str := strconv.Itoa(i) 943 arg := structs.ACLRoleSetRequest{ 944 Datacenter: "dc1", 945 Role: structs.ACLRole{ 946 Name: namePrefix + "-role-" + str, 947 Description: namePrefix + "-role " + str, 948 Policies: []structs.ACLRolePolicyLink{ 949 {ID: policyIDs[i]}, 950 }, 951 }, 952 WriteRequest: structs.WriteRequest{Token: "root"}, 953 } 954 var out structs.ACLRole 955 if err := srv.RPC("ACL.RoleSet", &arg, &out); err != nil { 956 t.Fatalf("err: %v", err) 957 } 958 roleIDs = append(roleIDs, out.ID) 959 } 960 961 // Create a bunch of new tokens. 962 for i := 0; i < numObjects; i++ { 963 str := strconv.Itoa(i) 964 arg := structs.ACLTokenSetRequest{ 965 Datacenter: "dc1", 966 ACLToken: structs.ACLToken{ 967 Description: namePrefix + "-token " + str, 968 Policies: []structs.ACLTokenPolicyLink{ 969 {ID: policyIDs[i]}, 970 }, 971 Roles: []structs.ACLTokenRoleLink{ 972 {ID: roleIDs[i]}, 973 }, 974 Local: (i < numItemsThatAreLocal), 975 }, 976 WriteRequest: structs.WriteRequest{Token: "root"}, 977 } 978 var out structs.ACLToken 979 if err := srv.RPC("ACL.TokenSet", &arg, &out); err != nil { 980 t.Fatalf("err: %v", err) 981 } 982 tokenIDs = append(tokenIDs, out.AccessorID) 983 } 984 985 return policyIDs, roleIDs, tokenIDs 986} 987