1package consul 2 3import ( 4 "fmt" 5 "math" 6 "math/rand" 7 "net/rpc" 8 "os" 9 "strings" 10 "testing" 11 "time" 12 13 "github.com/hashicorp/consul/acl" 14 "github.com/hashicorp/consul/agent/structs" 15 "github.com/hashicorp/consul/lib" 16 "github.com/hashicorp/consul/testrpc" 17 "github.com/hashicorp/consul/testutil/retry" 18 "github.com/hashicorp/net-rpc-msgpackrpc" 19 "github.com/hashicorp/serf/coordinate" 20 "github.com/pascaldekloe/goe/verify" 21) 22 23// generateRandomCoordinate creates a random coordinate. This mucks with the 24// underlying structure directly, so it's not really useful for any particular 25// position in the network, but it's a good payload to send through to make 26// sure things come out the other side or get stored correctly. 27func generateRandomCoordinate() *coordinate.Coordinate { 28 config := coordinate.DefaultConfig() 29 coord := coordinate.NewCoordinate(config) 30 for i := range coord.Vec { 31 coord.Vec[i] = rand.NormFloat64() 32 } 33 coord.Error = rand.NormFloat64() 34 coord.Adjustment = rand.NormFloat64() 35 return coord 36} 37 38func TestCoordinate_Update(t *testing.T) { 39 t.Parallel() 40 dir1, s1 := testServerWithConfig(t, func(c *Config) { 41 c.CoordinateUpdatePeriod = 500 * time.Millisecond 42 c.CoordinateUpdateBatchSize = 5 43 c.CoordinateUpdateMaxBatches = 2 44 }) 45 defer os.RemoveAll(dir1) 46 defer s1.Shutdown() 47 48 codec := rpcClient(t, s1) 49 defer codec.Close() 50 testrpc.WaitForTestAgent(t, s1.RPC, "dc1") 51 52 // Register some nodes. 53 nodes := []string{"node1", "node2"} 54 if err := registerNodes(nodes, codec); err != nil { 55 t.Fatal(err) 56 } 57 58 // Send an update for the first node. 59 arg1 := structs.CoordinateUpdateRequest{ 60 Datacenter: "dc1", 61 Node: "node1", 62 Coord: generateRandomCoordinate(), 63 } 64 var out struct{} 65 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { 66 t.Fatalf("err: %v", err) 67 } 68 69 // Send an update for the second node. 70 arg2 := structs.CoordinateUpdateRequest{ 71 Datacenter: "dc1", 72 Node: "node2", 73 Coord: generateRandomCoordinate(), 74 } 75 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { 76 t.Fatalf("err: %v", err) 77 } 78 79 // Make sure the updates did not yet apply because the update period 80 // hasn't expired. 81 state := s1.fsm.State() 82 _, c, err := state.Coordinate("node1", nil) 83 if err != nil { 84 t.Fatalf("err: %v", err) 85 } 86 verify.Values(t, "", c, lib.CoordinateSet{}) 87 88 _, c, err = state.Coordinate("node2", nil) 89 if err != nil { 90 t.Fatalf("err: %v", err) 91 } 92 verify.Values(t, "", c, lib.CoordinateSet{}) 93 94 // Send another update for the second node. It should take precedence 95 // since there will be two updates in the same batch. 96 arg2.Coord = generateRandomCoordinate() 97 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { 98 t.Fatalf("err: %v", err) 99 } 100 101 // Wait a while and the updates should get picked up. 102 time.Sleep(3 * s1.config.CoordinateUpdatePeriod) 103 _, c, err = state.Coordinate("node1", nil) 104 if err != nil { 105 t.Fatalf("err: %v", err) 106 } 107 expected := lib.CoordinateSet{ 108 "": arg1.Coord, 109 } 110 verify.Values(t, "", c, expected) 111 112 _, c, err = state.Coordinate("node2", nil) 113 if err != nil { 114 t.Fatalf("err: %v", err) 115 } 116 expected = lib.CoordinateSet{ 117 "": arg2.Coord, 118 } 119 verify.Values(t, "", c, expected) 120 121 // Register a bunch of additional nodes. 122 spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1 123 for i := 0; i < spamLen; i++ { 124 req := structs.RegisterRequest{ 125 Datacenter: "dc1", 126 Node: fmt.Sprintf("bogusnode%d", i), 127 Address: "127.0.0.1", 128 } 129 var reply struct{} 130 if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { 131 t.Fatalf("err: %v", err) 132 } 133 } 134 135 // Now spam some coordinate updates and make sure it starts throwing 136 // them away if they exceed the batch allowance. Note we have to make 137 // unique names since these are held in map by node name. 138 for i := 0; i < spamLen; i++ { 139 arg1.Node = fmt.Sprintf("bogusnode%d", i) 140 arg1.Coord = generateRandomCoordinate() 141 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { 142 t.Fatalf("err: %v", err) 143 } 144 } 145 146 // Wait a little while for the batch routine to run, then make sure 147 // exactly one of the updates got dropped (we won't know which one). 148 time.Sleep(3 * s1.config.CoordinateUpdatePeriod) 149 numDropped := 0 150 for i := 0; i < spamLen; i++ { 151 _, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil) 152 if err != nil { 153 t.Fatalf("err: %v", err) 154 } 155 if len(c) == 0 { 156 numDropped++ 157 } 158 } 159 if numDropped != 1 { 160 t.Fatalf("wrong number of coordinates dropped, %d != 1", numDropped) 161 } 162 163 // Send a coordinate with a NaN to make sure that we don't absorb that 164 // into the database. 165 arg2.Coord.Vec[0] = math.NaN() 166 err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out) 167 if err == nil || !strings.Contains(err.Error(), "invalid coordinate") { 168 t.Fatalf("should have failed with an error, got %v", err) 169 } 170 171 // Finally, send a coordinate with the wrong dimensionality to make sure 172 // there are no panics, and that it gets rejected. 173 arg2.Coord.Vec = make([]float64, 2*len(arg2.Coord.Vec)) 174 err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out) 175 if err == nil || !strings.Contains(err.Error(), "incompatible coordinate") { 176 t.Fatalf("should have failed with an error, got %v", err) 177 } 178} 179 180func TestCoordinate_Update_ACLDeny(t *testing.T) { 181 t.Parallel() 182 dir1, s1 := testServerWithConfig(t, func(c *Config) { 183 c.ACLDatacenter = "dc1" 184 c.ACLMasterToken = "root" 185 c.ACLDefaultPolicy = "deny" 186 c.ACLEnforceVersion8 = false 187 }) 188 defer os.RemoveAll(dir1) 189 defer s1.Shutdown() 190 codec := rpcClient(t, s1) 191 defer codec.Close() 192 193 testrpc.WaitForLeader(t, s1.RPC, "dc1") 194 195 // Register some nodes. 196 nodes := []string{"node1", "node2"} 197 if err := registerNodes(nodes, codec); err != nil { 198 t.Fatal(err) 199 } 200 201 // Send an update for the first node. This should go through since we 202 // don't have version 8 ACLs enforced yet. 203 req := structs.CoordinateUpdateRequest{ 204 Datacenter: "dc1", 205 Node: "node1", 206 Coord: generateRandomCoordinate(), 207 } 208 var out struct{} 209 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { 210 t.Fatalf("err: %v", err) 211 } 212 213 // Now turn on version 8 enforcement and try again. 214 s1.config.ACLEnforceVersion8 = true 215 err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) 216 if !acl.IsErrPermissionDenied(err) { 217 t.Fatalf("err: %v", err) 218 } 219 220 // Create an ACL that can write to the node. 221 arg := structs.ACLRequest{ 222 Datacenter: "dc1", 223 Op: structs.ACLSet, 224 ACL: structs.ACL{ 225 Name: "User token", 226 Type: structs.ACLTypeClient, 227 Rules: ` 228node "node1" { 229 policy = "write" 230} 231`, 232 }, 233 WriteRequest: structs.WriteRequest{Token: "root"}, 234 } 235 var id string 236 if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { 237 t.Fatalf("err: %v", err) 238 } 239 240 // With the token, it should now go through. 241 req.Token = id 242 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { 243 t.Fatalf("err: %v", err) 244 } 245 246 // But it should be blocked for the other node. 247 req.Node = "node2" 248 err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) 249 if !acl.IsErrPermissionDenied(err) { 250 t.Fatalf("err: %v", err) 251 } 252} 253 254func TestCoordinate_ListDatacenters(t *testing.T) { 255 t.Parallel() 256 dir1, s1 := testServer(t) 257 defer os.RemoveAll(dir1) 258 defer s1.Shutdown() 259 codec := rpcClient(t, s1) 260 defer codec.Close() 261 262 testrpc.WaitForLeader(t, s1.RPC, "dc1") 263 264 // It's super hard to force the Serfs into a known configuration of 265 // coordinates, so the best we can do is make sure our own DC shows 266 // up in the list with the proper coordinates. The guts of the algorithm 267 // are extensively tested in rtt_test.go using a mock database. 268 var out []structs.DatacenterMap 269 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListDatacenters", struct{}{}, &out); err != nil { 270 t.Fatalf("err: %v", err) 271 } 272 if len(out) != 1 || 273 out[0].Datacenter != "dc1" || 274 len(out[0].Coordinates) != 1 || 275 out[0].Coordinates[0].Node != s1.config.NodeName { 276 t.Fatalf("bad: %v", out) 277 } 278 c, err := s1.serfWAN.GetCoordinate() 279 if err != nil { 280 t.Fatalf("bad: %v", err) 281 } 282 verify.Values(t, "", c, out[0].Coordinates[0].Coord) 283} 284 285func TestCoordinate_ListNodes(t *testing.T) { 286 t.Parallel() 287 dir1, s1 := testServer(t) 288 defer os.RemoveAll(dir1) 289 defer s1.Shutdown() 290 291 codec := rpcClient(t, s1) 292 defer codec.Close() 293 testrpc.WaitForLeader(t, s1.RPC, "dc1") 294 295 // Register some nodes. 296 nodes := []string{"foo", "bar", "baz"} 297 if err := registerNodes(nodes, codec); err != nil { 298 t.Fatal(err) 299 } 300 301 // Send coordinate updates for a few nodes. 302 arg1 := structs.CoordinateUpdateRequest{ 303 Datacenter: "dc1", 304 Node: "foo", 305 Coord: generateRandomCoordinate(), 306 } 307 var out struct{} 308 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { 309 t.Fatalf("err: %v", err) 310 } 311 312 arg2 := structs.CoordinateUpdateRequest{ 313 Datacenter: "dc1", 314 Node: "bar", 315 Coord: generateRandomCoordinate(), 316 } 317 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { 318 t.Fatalf("err: %v", err) 319 } 320 321 arg3 := structs.CoordinateUpdateRequest{ 322 Datacenter: "dc1", 323 Node: "baz", 324 Coord: generateRandomCoordinate(), 325 } 326 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil { 327 t.Fatalf("err: %v", err) 328 } 329 // Now query back for all the nodes. 330 retry.Run(t, func(r *retry.R) { 331 arg := structs.DCSpecificRequest{ 332 Datacenter: "dc1", 333 } 334 resp := structs.IndexedCoordinates{} 335 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { 336 r.Fatalf("err: %v", err) 337 } 338 if len(resp.Coordinates) != 3 || 339 resp.Coordinates[0].Node != "bar" || 340 resp.Coordinates[1].Node != "baz" || 341 resp.Coordinates[2].Node != "foo" { 342 r.Fatalf("bad: %v", resp.Coordinates) 343 } 344 verify.Values(t, "", resp.Coordinates[0].Coord, arg2.Coord) // bar 345 verify.Values(t, "", resp.Coordinates[1].Coord, arg3.Coord) // baz 346 verify.Values(t, "", resp.Coordinates[2].Coord, arg1.Coord) // foo 347 }) 348} 349 350func TestCoordinate_ListNodes_ACLFilter(t *testing.T) { 351 t.Parallel() 352 dir1, s1 := testServerWithConfig(t, func(c *Config) { 353 c.ACLDatacenter = "dc1" 354 c.ACLMasterToken = "root" 355 c.ACLDefaultPolicy = "deny" 356 c.ACLEnforceVersion8 = false 357 }) 358 defer os.RemoveAll(dir1) 359 defer s1.Shutdown() 360 codec := rpcClient(t, s1) 361 defer codec.Close() 362 363 testrpc.WaitForLeader(t, s1.RPC, "dc1") 364 365 // Register some nodes. 366 nodes := []string{"foo", "bar", "baz"} 367 for _, node := range nodes { 368 req := structs.RegisterRequest{ 369 Datacenter: "dc1", 370 Node: node, 371 Address: "127.0.0.1", 372 WriteRequest: structs.WriteRequest{ 373 Token: "root", 374 }, 375 } 376 var reply struct{} 377 if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { 378 t.Fatalf("err: %v", err) 379 } 380 } 381 382 // Send coordinate updates for a few nodes. 383 arg1 := structs.CoordinateUpdateRequest{ 384 Datacenter: "dc1", 385 Node: "foo", 386 Coord: generateRandomCoordinate(), 387 WriteRequest: structs.WriteRequest{ 388 Token: "root", 389 }, 390 } 391 var out struct{} 392 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { 393 t.Fatalf("err: %v", err) 394 } 395 396 arg2 := structs.CoordinateUpdateRequest{ 397 Datacenter: "dc1", 398 Node: "bar", 399 Coord: generateRandomCoordinate(), 400 WriteRequest: structs.WriteRequest{ 401 Token: "root", 402 }, 403 } 404 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { 405 t.Fatalf("err: %v", err) 406 } 407 408 arg3 := structs.CoordinateUpdateRequest{ 409 Datacenter: "dc1", 410 Node: "baz", 411 Coord: generateRandomCoordinate(), 412 WriteRequest: structs.WriteRequest{ 413 Token: "root", 414 }, 415 } 416 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil { 417 t.Fatalf("err: %v", err) 418 } 419 // Wait for all the coordinate updates to apply. Since we aren't 420 // enforcing version 8 ACLs, this should also allow us to read 421 // everything back without a token. 422 retry.Run(t, func(r *retry.R) { 423 arg := structs.DCSpecificRequest{ 424 Datacenter: "dc1", 425 } 426 resp := structs.IndexedCoordinates{} 427 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { 428 r.Fatalf("err: %v", err) 429 } 430 if got, want := len(resp.Coordinates), 3; got != want { 431 r.Fatalf("got %d coordinates want %d", got, want) 432 } 433 }) 434 435 // Now that we've waited for the batch processing to ingest the 436 // coordinates we can do the rest of the requests without the loop. We 437 // will start by turning on version 8 ACL support which should block 438 // everything. 439 s1.config.ACLEnforceVersion8 = true 440 arg := structs.DCSpecificRequest{ 441 Datacenter: "dc1", 442 } 443 resp := structs.IndexedCoordinates{} 444 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { 445 t.Fatalf("err: %v", err) 446 } 447 if len(resp.Coordinates) != 0 { 448 t.Fatalf("bad: %#v", resp.Coordinates) 449 } 450 451 // Create an ACL that can read one of the nodes. 452 var id string 453 { 454 req := structs.ACLRequest{ 455 Datacenter: "dc1", 456 Op: structs.ACLSet, 457 ACL: structs.ACL{ 458 Name: "User token", 459 Type: structs.ACLTypeClient, 460 Rules: ` 461node "foo" { 462 policy = "read" 463} 464`, 465 }, 466 WriteRequest: structs.WriteRequest{Token: "root"}, 467 } 468 if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &id); err != nil { 469 t.Fatalf("err: %v", err) 470 } 471 } 472 473 // With the token, it should now go through. 474 arg.Token = id 475 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { 476 t.Fatalf("err: %v", err) 477 } 478 if len(resp.Coordinates) != 1 || resp.Coordinates[0].Node != "foo" { 479 t.Fatalf("bad: %#v", resp.Coordinates) 480 } 481} 482 483func TestCoordinate_Node(t *testing.T) { 484 t.Parallel() 485 dir1, s1 := testServer(t) 486 defer os.RemoveAll(dir1) 487 defer s1.Shutdown() 488 489 codec := rpcClient(t, s1) 490 defer codec.Close() 491 testrpc.WaitForTestAgent(t, s1.RPC, "dc1") 492 493 // Register some nodes. 494 nodes := []string{"foo", "bar"} 495 if err := registerNodes(nodes, codec); err != nil { 496 t.Fatal(err) 497 } 498 499 // Send coordinate updates for each node. 500 arg1 := structs.CoordinateUpdateRequest{ 501 Datacenter: "dc1", 502 Node: "foo", 503 Coord: generateRandomCoordinate(), 504 } 505 var out struct{} 506 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { 507 t.Fatalf("err: %v", err) 508 } 509 510 arg2 := structs.CoordinateUpdateRequest{ 511 Datacenter: "dc1", 512 Node: "bar", 513 Coord: generateRandomCoordinate(), 514 } 515 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { 516 t.Fatalf("err: %v", err) 517 } 518 519 // Now query back for a specific node (make sure we only get coordinates for foo). 520 retry.Run(t, func(r *retry.R) { 521 arg := structs.NodeSpecificRequest{ 522 Node: "foo", 523 Datacenter: "dc1", 524 } 525 resp := structs.IndexedCoordinates{} 526 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil { 527 r.Fatalf("err: %v", err) 528 } 529 if len(resp.Coordinates) != 1 || 530 resp.Coordinates[0].Node != "foo" { 531 r.Fatalf("bad: %v", resp.Coordinates) 532 } 533 verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo 534 }) 535} 536 537func TestCoordinate_Node_ACLDeny(t *testing.T) { 538 t.Parallel() 539 dir1, s1 := testServerWithConfig(t, func(c *Config) { 540 c.ACLDatacenter = "dc1" 541 c.ACLMasterToken = "root" 542 c.ACLDefaultPolicy = "deny" 543 c.ACLEnforceVersion8 = false 544 }) 545 defer os.RemoveAll(dir1) 546 defer s1.Shutdown() 547 codec := rpcClient(t, s1) 548 defer codec.Close() 549 550 testrpc.WaitForLeader(t, s1.RPC, "dc1") 551 552 // Register some nodes. 553 nodes := []string{"node1", "node2"} 554 if err := registerNodes(nodes, codec); err != nil { 555 t.Fatal(err) 556 } 557 558 coord := generateRandomCoordinate() 559 req := structs.CoordinateUpdateRequest{ 560 Datacenter: "dc1", 561 Node: "node1", 562 Coord: coord, 563 } 564 var out struct{} 565 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { 566 t.Fatalf("err: %v", err) 567 } 568 569 // Try a read for the first node. This should go through since we 570 // don't have version 8 ACLs enforced yet. 571 arg := structs.NodeSpecificRequest{ 572 Node: "node1", 573 Datacenter: "dc1", 574 } 575 resp := structs.IndexedCoordinates{} 576 retry.Run(t, func(r *retry.R) { 577 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil { 578 r.Fatalf("err: %v", err) 579 } 580 if len(resp.Coordinates) != 1 || 581 resp.Coordinates[0].Node != "node1" { 582 r.Fatalf("bad: %v", resp.Coordinates) 583 } 584 verify.Values(t, "", resp.Coordinates[0].Coord, coord) 585 }) 586 587 // Now turn on version 8 enforcement and try again. 588 s1.config.ACLEnforceVersion8 = true 589 err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp) 590 if !acl.IsErrPermissionDenied(err) { 591 t.Fatalf("err: %v", err) 592 } 593 594 // Create an ACL that can read from the node. 595 aclReq := structs.ACLRequest{ 596 Datacenter: "dc1", 597 Op: structs.ACLSet, 598 ACL: structs.ACL{ 599 Name: "User token", 600 Type: structs.ACLTypeClient, 601 Rules: ` 602node "node1" { 603 policy = "read" 604} 605`, 606 }, 607 WriteRequest: structs.WriteRequest{Token: "root"}, 608 } 609 var id string 610 if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &aclReq, &id); err != nil { 611 t.Fatalf("err: %v", err) 612 } 613 614 // With the token, it should now go through. 615 arg.Token = id 616 if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil { 617 t.Fatalf("err: %v", err) 618 } 619 620 // But it should be blocked for the other node. 621 arg.Node = "node2" 622 err = msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp) 623 if !acl.IsErrPermissionDenied(err) { 624 t.Fatalf("err: %v", err) 625 } 626} 627 628func registerNodes(nodes []string, codec rpc.ClientCodec) error { 629 for _, node := range nodes { 630 req := structs.RegisterRequest{ 631 Datacenter: "dc1", 632 Node: node, 633 Address: "127.0.0.1", 634 } 635 var reply struct{} 636 if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { 637 return err 638 } 639 } 640 641 return nil 642} 643