1package nomad 2 3import ( 4 "errors" 5 "fmt" 6 "net" 7 "reflect" 8 "strings" 9 "testing" 10 "time" 11 12 memdb "github.com/hashicorp/go-memdb" 13 msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" 14 vapi "github.com/hashicorp/vault/api" 15 "github.com/kr/pretty" 16 "github.com/stretchr/testify/assert" 17 "github.com/stretchr/testify/require" 18 19 "github.com/hashicorp/nomad/acl" 20 "github.com/hashicorp/nomad/command/agent/consul" 21 "github.com/hashicorp/nomad/helper" 22 "github.com/hashicorp/nomad/helper/uuid" 23 "github.com/hashicorp/nomad/nomad/mock" 24 "github.com/hashicorp/nomad/nomad/state" 25 "github.com/hashicorp/nomad/nomad/structs" 26 "github.com/hashicorp/nomad/testutil" 27) 28 29func TestClientEndpoint_Register(t *testing.T) { 30 t.Parallel() 31 require := require.New(t) 32 33 s1, cleanupS1 := TestServer(t, nil) 34 defer cleanupS1() 35 codec := rpcClient(t, s1) 36 testutil.WaitForLeader(t, s1.RPC) 37 38 // Check that we have no client connections 39 require.Empty(s1.connectedNodes()) 40 41 // Create the register request 42 node := mock.Node() 43 req := &structs.NodeRegisterRequest{ 44 Node: node, 45 WriteRequest: structs.WriteRequest{Region: "global"}, 46 } 47 48 // Fetch the response 49 var resp structs.GenericResponse 50 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil { 51 t.Fatalf("err: %v", err) 52 } 53 if resp.Index == 0 { 54 t.Fatalf("bad index: %d", resp.Index) 55 } 56 57 // Check that we have the client connections 58 nodes := s1.connectedNodes() 59 require.Len(nodes, 1) 60 require.Contains(nodes, node.ID) 61 62 // Check for the node in the FSM 63 state := s1.fsm.State() 64 ws := memdb.NewWatchSet() 65 out, err := state.NodeByID(ws, node.ID) 66 if err != nil { 67 t.Fatalf("err: %v", err) 68 } 69 if out == nil { 70 t.Fatalf("expected node") 71 } 72 if out.CreateIndex != resp.Index { 73 t.Fatalf("index mis-match") 74 } 75 if out.ComputedClass == "" { 76 t.Fatal("ComputedClass not set") 77 } 78 79 // Close the connection and check that we remove the client connections 80 require.Nil(codec.Close()) 81 testutil.WaitForResult(func() (bool, error) { 82 nodes := s1.connectedNodes() 83 return len(nodes) == 0, nil 84 }, func(err error) { 85 t.Fatalf("should have no clients") 86 }) 87} 88 89// This test asserts that we only track node connections if they are not from 90// forwarded RPCs. This is essential otherwise we will think a Yamux session to 91// a Nomad server is actually the session to the node. 92func TestClientEndpoint_Register_NodeConn_Forwarded(t *testing.T) { 93 t.Parallel() 94 require := require.New(t) 95 96 s1, cleanupS1 := TestServer(t, func(c *Config) { 97 c.BootstrapExpect = 2 98 }) 99 100 defer cleanupS1() 101 s2, cleanupS2 := TestServer(t, func(c *Config) { 102 c.BootstrapExpect = 2 103 }) 104 defer cleanupS2() 105 TestJoin(t, s1, s2) 106 testutil.WaitForLeader(t, s1.RPC) 107 testutil.WaitForLeader(t, s2.RPC) 108 109 // Determine the non-leader server 110 var leader, nonLeader *Server 111 if s1.IsLeader() { 112 leader = s1 113 nonLeader = s2 114 } else { 115 leader = s2 116 nonLeader = s1 117 } 118 119 // Send the requests to the non-leader 120 codec := rpcClient(t, nonLeader) 121 122 // Check that we have no client connections 123 require.Empty(nonLeader.connectedNodes()) 124 require.Empty(leader.connectedNodes()) 125 126 // Create the register request 127 node := mock.Node() 128 req := &structs.NodeRegisterRequest{ 129 Node: node, 130 WriteRequest: structs.WriteRequest{Region: "global"}, 131 } 132 133 // Fetch the response 134 var resp structs.GenericResponse 135 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil { 136 t.Fatalf("err: %v", err) 137 } 138 if resp.Index == 0 { 139 t.Fatalf("bad index: %d", resp.Index) 140 } 141 142 // Check that we have the client connections on the non leader 143 nodes := nonLeader.connectedNodes() 144 require.Len(nodes, 1) 145 require.Contains(nodes, node.ID) 146 147 // Check that we have no client connections on the leader 148 nodes = leader.connectedNodes() 149 require.Empty(nodes) 150 151 // Check for the node in the FSM 152 state := leader.State() 153 testutil.WaitForResult(func() (bool, error) { 154 out, err := state.NodeByID(nil, node.ID) 155 if err != nil { 156 return false, err 157 } 158 if out == nil { 159 return false, fmt.Errorf("expected node") 160 } 161 if out.CreateIndex != resp.Index { 162 return false, fmt.Errorf("index mis-match") 163 } 164 if out.ComputedClass == "" { 165 return false, fmt.Errorf("ComputedClass not set") 166 } 167 168 return true, nil 169 }, func(err error) { 170 t.Fatalf("err: %v", err) 171 }) 172 173 // Close the connection and check that we remove the client connections 174 require.Nil(codec.Close()) 175 testutil.WaitForResult(func() (bool, error) { 176 nodes := nonLeader.connectedNodes() 177 return len(nodes) == 0, nil 178 }, func(err error) { 179 t.Fatalf("should have no clients") 180 }) 181} 182 183func TestClientEndpoint_Register_SecretMismatch(t *testing.T) { 184 t.Parallel() 185 186 s1, cleanupS1 := TestServer(t, nil) 187 defer cleanupS1() 188 codec := rpcClient(t, s1) 189 testutil.WaitForLeader(t, s1.RPC) 190 191 // Create the register request 192 node := mock.Node() 193 req := &structs.NodeRegisterRequest{ 194 Node: node, 195 WriteRequest: structs.WriteRequest{Region: "global"}, 196 } 197 198 // Fetch the response 199 var resp structs.GenericResponse 200 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp); err != nil { 201 t.Fatalf("err: %v", err) 202 } 203 204 // Update the nodes SecretID 205 node.SecretID = uuid.Generate() 206 err := msgpackrpc.CallWithCodec(codec, "Node.Register", req, &resp) 207 if err == nil || !strings.Contains(err.Error(), "Not registering") { 208 t.Fatalf("Expecting error regarding mismatching secret id: %v", err) 209 } 210} 211 212// Test the deprecated single node deregistration path 213func TestClientEndpoint_DeregisterOne(t *testing.T) { 214 t.Parallel() 215 216 s1, cleanupS1 := TestServer(t, nil) 217 defer cleanupS1() 218 codec := rpcClient(t, s1) 219 testutil.WaitForLeader(t, s1.RPC) 220 221 // Create the register request 222 node := mock.Node() 223 reg := &structs.NodeRegisterRequest{ 224 Node: node, 225 WriteRequest: structs.WriteRequest{Region: "global"}, 226 } 227 228 // Fetch the response 229 var resp structs.GenericResponse 230 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 231 t.Fatalf("err: %v", err) 232 } 233 234 // Deregister 235 dereg := &structs.NodeDeregisterRequest{ 236 NodeID: node.ID, 237 WriteRequest: structs.WriteRequest{Region: "global"}, 238 } 239 var resp2 structs.GenericResponse 240 if err := msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2); err != nil { 241 t.Fatalf("err: %v", err) 242 } 243 if resp2.Index == 0 { 244 t.Fatalf("bad index: %d", resp2.Index) 245 } 246 247 // Check for the node in the FSM 248 state := s1.fsm.State() 249 ws := memdb.NewWatchSet() 250 out, err := state.NodeByID(ws, node.ID) 251 if err != nil { 252 t.Fatalf("err: %v", err) 253 } 254 if out != nil { 255 t.Fatalf("unexpected node") 256 } 257} 258 259func TestClientEndpoint_Deregister_ACL(t *testing.T) { 260 t.Parallel() 261 262 s1, root, cleanupS1 := TestACLServer(t, nil) 263 defer cleanupS1() 264 codec := rpcClient(t, s1) 265 testutil.WaitForLeader(t, s1.RPC) 266 267 // Create the node 268 node := mock.Node() 269 node1 := mock.Node() 270 state := s1.fsm.State() 271 if err := state.UpsertNode(structs.MsgTypeTestSetup, 1, node); err != nil { 272 t.Fatalf("err: %v", err) 273 } 274 if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node1); err != nil { 275 t.Fatalf("err: %v", err) 276 } 277 278 // Create the policy and tokens 279 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite)) 280 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) 281 282 // Deregister without any token and expect it to fail 283 dereg := &structs.NodeBatchDeregisterRequest{ 284 NodeIDs: []string{node.ID}, 285 WriteRequest: structs.WriteRequest{Region: "global"}, 286 } 287 var resp structs.GenericResponse 288 if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err == nil { 289 t.Fatalf("node de-register succeeded") 290 } 291 292 // Deregister with a valid token 293 dereg.AuthToken = validToken.SecretID 294 if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp); err != nil { 295 t.Fatalf("err: %v", err) 296 } 297 298 // Check for the node in the FSM 299 ws := memdb.NewWatchSet() 300 out, err := state.NodeByID(ws, node.ID) 301 if err != nil { 302 t.Fatalf("err: %v", err) 303 } 304 if out != nil { 305 t.Fatalf("unexpected node") 306 } 307 308 // Deregister with an invalid token. 309 dereg1 := &structs.NodeBatchDeregisterRequest{ 310 NodeIDs: []string{node1.ID}, 311 WriteRequest: structs.WriteRequest{Region: "global"}, 312 } 313 dereg1.AuthToken = invalidToken.SecretID 314 if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err == nil { 315 t.Fatalf("rpc should not have succeeded") 316 } 317 318 // Try with a root token 319 dereg1.AuthToken = root.SecretID 320 if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg1, &resp); err != nil { 321 t.Fatalf("err: %v", err) 322 } 323} 324 325func TestClientEndpoint_Deregister_Vault(t *testing.T) { 326 t.Parallel() 327 328 s1, cleanupS1 := TestServer(t, nil) 329 defer cleanupS1() 330 codec := rpcClient(t, s1) 331 testutil.WaitForLeader(t, s1.RPC) 332 333 // Create the register request 334 node := mock.Node() 335 reg := &structs.NodeRegisterRequest{ 336 Node: node, 337 WriteRequest: structs.WriteRequest{Region: "global"}, 338 } 339 340 // Fetch the response 341 var resp structs.GenericResponse 342 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 343 t.Fatalf("err: %v", err) 344 } 345 346 // Swap the servers Vault Client 347 tvc := &TestVaultClient{} 348 s1.vault = tvc 349 350 // Put some Vault accessors in the state store for that node 351 state := s1.fsm.State() 352 va1 := mock.VaultAccessor() 353 va1.NodeID = node.ID 354 va2 := mock.VaultAccessor() 355 va2.NodeID = node.ID 356 state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2}) 357 358 // Deregister 359 dereg := &structs.NodeBatchDeregisterRequest{ 360 NodeIDs: []string{node.ID}, 361 WriteRequest: structs.WriteRequest{Region: "global"}, 362 } 363 var resp2 structs.GenericResponse 364 if err := msgpackrpc.CallWithCodec(codec, "Node.BatchDeregister", dereg, &resp2); err != nil { 365 t.Fatalf("err: %v", err) 366 } 367 if resp2.Index == 0 { 368 t.Fatalf("bad index: %d", resp2.Index) 369 } 370 371 // Check for the node in the FSM 372 ws := memdb.NewWatchSet() 373 out, err := state.NodeByID(ws, node.ID) 374 if err != nil { 375 t.Fatalf("err: %v", err) 376 } 377 if out != nil { 378 t.Fatalf("unexpected node") 379 } 380 381 // Check that the endpoint revoked the tokens 382 if l := len(tvc.RevokedTokens); l != 2 { 383 t.Fatalf("Deregister revoked %d tokens; want 2", l) 384 } 385} 386 387func TestClientEndpoint_UpdateStatus(t *testing.T) { 388 t.Parallel() 389 require := require.New(t) 390 391 s1, cleanupS1 := TestServer(t, nil) 392 defer cleanupS1() 393 codec := rpcClient(t, s1) 394 testutil.WaitForLeader(t, s1.RPC) 395 396 // Check that we have no client connections 397 require.Empty(s1.connectedNodes()) 398 399 // Create the register request 400 node := mock.Node() 401 reg := &structs.NodeRegisterRequest{ 402 Node: node, 403 WriteRequest: structs.WriteRequest{Region: "global"}, 404 } 405 406 // Fetch the response 407 var resp structs.NodeUpdateResponse 408 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 409 t.Fatalf("err: %v", err) 410 } 411 412 // Check for heartbeat interval 413 ttl := resp.HeartbeatTTL 414 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 415 t.Fatalf("bad: %#v", ttl) 416 } 417 418 // Update the status 419 dereg := &structs.NodeUpdateStatusRequest{ 420 NodeID: node.ID, 421 Status: structs.NodeStatusInit, 422 WriteRequest: structs.WriteRequest{Region: "global"}, 423 } 424 var resp2 structs.NodeUpdateResponse 425 if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2); err != nil { 426 t.Fatalf("err: %v", err) 427 } 428 if resp2.Index == 0 { 429 t.Fatalf("bad index: %d", resp2.Index) 430 } 431 432 // Check for heartbeat interval 433 ttl = resp2.HeartbeatTTL 434 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 435 t.Fatalf("bad: %#v", ttl) 436 } 437 438 // Check that we have the client connections 439 nodes := s1.connectedNodes() 440 require.Len(nodes, 1) 441 require.Contains(nodes, node.ID) 442 443 // Check for the node in the FSM 444 state := s1.fsm.State() 445 ws := memdb.NewWatchSet() 446 out, err := state.NodeByID(ws, node.ID) 447 if err != nil { 448 t.Fatalf("err: %v", err) 449 } 450 if out == nil { 451 t.Fatalf("expected node") 452 } 453 if out.ModifyIndex != resp2.Index { 454 t.Fatalf("index mis-match") 455 } 456 457 // Close the connection and check that we remove the client connections 458 require.Nil(codec.Close()) 459 testutil.WaitForResult(func() (bool, error) { 460 nodes := s1.connectedNodes() 461 return len(nodes) == 0, nil 462 }, func(err error) { 463 t.Fatalf("should have no clients") 464 }) 465} 466 467func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) { 468 t.Parallel() 469 470 s1, cleanupS1 := TestServer(t, nil) 471 defer cleanupS1() 472 codec := rpcClient(t, s1) 473 testutil.WaitForLeader(t, s1.RPC) 474 475 // Create the register request 476 node := mock.Node() 477 reg := &structs.NodeRegisterRequest{ 478 Node: node, 479 WriteRequest: structs.WriteRequest{Region: "global"}, 480 } 481 482 // Fetch the response 483 var resp structs.NodeUpdateResponse 484 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 485 t.Fatalf("err: %v", err) 486 } 487 488 // Check for heartbeat interval 489 ttl := resp.HeartbeatTTL 490 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 491 t.Fatalf("bad: %#v", ttl) 492 } 493 494 // Swap the servers Vault Client 495 tvc := &TestVaultClient{} 496 s1.vault = tvc 497 498 // Put some Vault accessors in the state store for that node 499 state := s1.fsm.State() 500 va1 := mock.VaultAccessor() 501 va1.NodeID = node.ID 502 va2 := mock.VaultAccessor() 503 va2.NodeID = node.ID 504 state.UpsertVaultAccessor(100, []*structs.VaultAccessor{va1, va2}) 505 506 // Update the status to be down 507 dereg := &structs.NodeUpdateStatusRequest{ 508 NodeID: node.ID, 509 Status: structs.NodeStatusDown, 510 WriteRequest: structs.WriteRequest{Region: "global"}, 511 } 512 var resp2 structs.NodeUpdateResponse 513 if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2); err != nil { 514 t.Fatalf("err: %v", err) 515 } 516 if resp2.Index == 0 { 517 t.Fatalf("bad index: %d", resp2.Index) 518 } 519 520 // Check that the endpoint revoked the tokens 521 if l := len(tvc.RevokedTokens); l != 2 { 522 t.Fatalf("Deregister revoked %d tokens; want 2", l) 523 } 524} 525 526func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) { 527 t.Parallel() 528 require := require.New(t) 529 530 s1, cleanupS1 := TestServer(t, nil) 531 defer cleanupS1() 532 codec := rpcClient(t, s1) 533 testutil.WaitForLeader(t, s1.RPC) 534 535 // Check that we have no client connections 536 require.Empty(s1.connectedNodes()) 537 538 // Create the register request but make the node down 539 node := mock.Node() 540 node.Status = structs.NodeStatusDown 541 reg := &structs.NodeRegisterRequest{ 542 Node: node, 543 WriteRequest: structs.WriteRequest{Region: "global"}, 544 } 545 546 // Fetch the response 547 var resp structs.NodeUpdateResponse 548 require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 549 550 // Update the status 551 dereg := &structs.NodeUpdateStatusRequest{ 552 NodeID: node.ID, 553 Status: structs.NodeStatusInit, 554 WriteRequest: structs.WriteRequest{Region: "global"}, 555 } 556 var resp2 structs.NodeUpdateResponse 557 require.NoError(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2)) 558 require.NotZero(resp2.Index) 559 560 // Check for heartbeat interval 561 ttl := resp2.HeartbeatTTL 562 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 563 t.Fatalf("bad: %#v", ttl) 564 } 565 566 // Check for the node in the FSM 567 state := s1.fsm.State() 568 ws := memdb.NewWatchSet() 569 out, err := state.NodeByID(ws, node.ID) 570 require.NoError(err) 571 require.NotNil(out) 572 require.EqualValues(resp2.Index, out.ModifyIndex) 573 require.Len(out.Events, 2) 574 require.Equal(NodeHeartbeatEventReregistered, out.Events[1].Message) 575} 576 577func TestClientEndpoint_Register_GetEvals(t *testing.T) { 578 t.Parallel() 579 580 s1, cleanupS1 := TestServer(t, nil) 581 defer cleanupS1() 582 codec := rpcClient(t, s1) 583 testutil.WaitForLeader(t, s1.RPC) 584 585 // Register a system job. 586 job := mock.SystemJob() 587 state := s1.fsm.State() 588 if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil { 589 t.Fatalf("err: %v", err) 590 } 591 592 // Create the register request going directly to ready 593 node := mock.Node() 594 node.Status = structs.NodeStatusReady 595 reg := &structs.NodeRegisterRequest{ 596 Node: node, 597 WriteRequest: structs.WriteRequest{Region: "global"}, 598 } 599 600 // Fetch the response 601 var resp structs.NodeUpdateResponse 602 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 603 t.Fatalf("err: %v", err) 604 } 605 606 // Check for heartbeat interval 607 ttl := resp.HeartbeatTTL 608 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 609 t.Fatalf("bad: %#v", ttl) 610 } 611 612 // Check for an eval caused by the system job. 613 if len(resp.EvalIDs) != 1 { 614 t.Fatalf("expected one eval; got %#v", resp.EvalIDs) 615 } 616 617 evalID := resp.EvalIDs[0] 618 ws := memdb.NewWatchSet() 619 eval, err := state.EvalByID(ws, evalID) 620 if err != nil { 621 t.Fatalf("could not get eval %v", evalID) 622 } 623 624 if eval.Type != "system" { 625 t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system") 626 } 627 628 // Check for the node in the FSM 629 out, err := state.NodeByID(ws, node.ID) 630 if err != nil { 631 t.Fatalf("err: %v", err) 632 } 633 if out == nil { 634 t.Fatalf("expected node") 635 } 636 if out.ModifyIndex != resp.Index { 637 t.Fatalf("index mis-match") 638 } 639 640 // Transition it to down and then ready 641 node.Status = structs.NodeStatusDown 642 reg = &structs.NodeRegisterRequest{ 643 Node: node, 644 WriteRequest: structs.WriteRequest{Region: "global"}, 645 } 646 647 // Fetch the response 648 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 649 t.Fatalf("err: %v", err) 650 } 651 652 if len(resp.EvalIDs) != 1 { 653 t.Fatalf("expected one eval; got %#v", resp.EvalIDs) 654 } 655 656 node.Status = structs.NodeStatusReady 657 reg = &structs.NodeRegisterRequest{ 658 Node: node, 659 WriteRequest: structs.WriteRequest{Region: "global"}, 660 } 661 662 // Fetch the response 663 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 664 t.Fatalf("err: %v", err) 665 } 666 667 if len(resp.EvalIDs) != 1 { 668 t.Fatalf("expected one eval; got %#v", resp.EvalIDs) 669 } 670} 671 672func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) { 673 t.Parallel() 674 675 s1, cleanupS1 := TestServer(t, nil) 676 defer cleanupS1() 677 codec := rpcClient(t, s1) 678 testutil.WaitForLeader(t, s1.RPC) 679 680 // Register a system job. 681 job := mock.SystemJob() 682 state := s1.fsm.State() 683 if err := state.UpsertJob(structs.MsgTypeTestSetup, 1, job); err != nil { 684 t.Fatalf("err: %v", err) 685 } 686 687 // Create the register request 688 node := mock.Node() 689 node.Status = structs.NodeStatusInit 690 reg := &structs.NodeRegisterRequest{ 691 Node: node, 692 WriteRequest: structs.WriteRequest{Region: "global"}, 693 } 694 695 // Fetch the response 696 var resp structs.NodeUpdateResponse 697 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 698 t.Fatalf("err: %v", err) 699 } 700 701 // Check for heartbeat interval 702 ttl := resp.HeartbeatTTL 703 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 704 t.Fatalf("bad: %#v", ttl) 705 } 706 707 // Update the status 708 update := &structs.NodeUpdateStatusRequest{ 709 NodeID: node.ID, 710 Status: structs.NodeStatusReady, 711 WriteRequest: structs.WriteRequest{Region: "global"}, 712 } 713 var resp2 structs.NodeUpdateResponse 714 if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", update, &resp2); err != nil { 715 t.Fatalf("err: %v", err) 716 } 717 if resp2.Index == 0 { 718 t.Fatalf("bad index: %d", resp2.Index) 719 } 720 721 // Check for an eval caused by the system job. 722 if len(resp2.EvalIDs) != 1 { 723 t.Fatalf("expected one eval; got %#v", resp2.EvalIDs) 724 } 725 726 evalID := resp2.EvalIDs[0] 727 ws := memdb.NewWatchSet() 728 eval, err := state.EvalByID(ws, evalID) 729 if err != nil { 730 t.Fatalf("could not get eval %v", evalID) 731 } 732 733 if eval.Type != "system" { 734 t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system") 735 } 736 737 // Check for heartbeat interval 738 ttl = resp2.HeartbeatTTL 739 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 740 t.Fatalf("bad: %#v", ttl) 741 } 742 743 // Check for the node in the FSM 744 out, err := state.NodeByID(ws, node.ID) 745 if err != nil { 746 t.Fatalf("err: %v", err) 747 } 748 if out == nil { 749 t.Fatalf("expected node") 750 } 751 if out.ModifyIndex != resp2.Index { 752 t.Fatalf("index mis-match") 753 } 754} 755 756func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) { 757 t.Parallel() 758 759 s1, cleanupS1 := TestServer(t, func(c *Config) { 760 c.BootstrapExpect = 3 761 }) 762 defer cleanupS1() 763 764 s2, cleanupS2 := TestServer(t, func(c *Config) { 765 c.BootstrapExpect = 3 766 }) 767 defer cleanupS2() 768 769 s3, cleanupS3 := TestServer(t, func(c *Config) { 770 c.BootstrapExpect = 3 771 }) 772 defer cleanupS3() 773 servers := []*Server{s1, s2, s3} 774 TestJoin(t, s1, s2, s3) 775 776 for _, s := range servers { 777 testutil.WaitForResult(func() (bool, error) { 778 peers, _ := s.numPeers() 779 return peers == 3, nil 780 }, func(err error) { 781 t.Fatalf("should have 3 peers") 782 }) 783 } 784 785 codec := rpcClient(t, s1) 786 testutil.WaitForLeader(t, s1.RPC) 787 788 // Create the register request 789 node := mock.Node() 790 reg := &structs.NodeRegisterRequest{ 791 Node: node, 792 WriteRequest: structs.WriteRequest{Region: "global"}, 793 } 794 795 // Fetch the response 796 var resp structs.NodeUpdateResponse 797 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 798 t.Fatalf("err: %v", err) 799 } 800 801 // Check for heartbeat interval 802 ttl := resp.HeartbeatTTL 803 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 804 t.Fatalf("bad: %#v", ttl) 805 } 806 807 // Check for heartbeat servers 808 serverAddrs := resp.Servers 809 if len(serverAddrs) == 0 { 810 t.Fatalf("bad: %#v", serverAddrs) 811 } 812 813 // Update the status, static state 814 dereg := &structs.NodeUpdateStatusRequest{ 815 NodeID: node.ID, 816 Status: node.Status, 817 WriteRequest: structs.WriteRequest{Region: "global"}, 818 } 819 var resp2 structs.NodeUpdateResponse 820 if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", dereg, &resp2); err != nil { 821 t.Fatalf("err: %v", err) 822 } 823 if resp2.Index != 0 { 824 t.Fatalf("bad index: %d", resp2.Index) 825 } 826 827 // Check for heartbeat interval 828 ttl = resp2.HeartbeatTTL 829 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 830 t.Fatalf("bad: %#v", ttl) 831 } 832} 833 834func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) { 835 t.Parallel() 836 require := require.New(t) 837 838 advAddr := "127.0.1.1:1234" 839 adv, err := net.ResolveTCPAddr("tcp", advAddr) 840 require.Nil(err) 841 842 s1, cleanupS1 := TestServer(t, func(c *Config) { 843 c.ClientRPCAdvertise = adv 844 }) 845 defer cleanupS1() 846 codec := rpcClient(t, s1) 847 testutil.WaitForLeader(t, s1.RPC) 848 849 // Create the register request 850 node := mock.Node() 851 reg := &structs.NodeRegisterRequest{ 852 Node: node, 853 WriteRequest: structs.WriteRequest{Region: "global"}, 854 } 855 856 // Fetch the response 857 var resp structs.NodeUpdateResponse 858 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 859 t.Fatalf("err: %v", err) 860 } 861 862 // Check for heartbeat interval 863 ttl := resp.HeartbeatTTL 864 if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { 865 t.Fatalf("bad: %#v", ttl) 866 } 867 868 // Check for heartbeat servers 869 require.Len(resp.Servers, 1) 870 require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr) 871} 872 873// TestClientEndpoint_UpdateDrain asserts the ability to initiate drain 874// against a node and cancel that drain. It also asserts: 875// * an evaluation is created when the node becomes eligible 876// * drain metadata is properly persisted in Node.LastDrain 877func TestClientEndpoint_UpdateDrain(t *testing.T) { 878 t.Parallel() 879 require := require.New(t) 880 881 s1, cleanupS1 := TestServer(t, nil) 882 defer cleanupS1() 883 codec := rpcClient(t, s1) 884 testutil.WaitForLeader(t, s1.RPC) 885 886 // Disable drainer to prevent drain from completing during test 887 s1.nodeDrainer.SetEnabled(false, nil) 888 889 // Create the register request 890 node := mock.Node() 891 reg := &structs.NodeRegisterRequest{ 892 Node: node, 893 WriteRequest: structs.WriteRequest{Region: "global"}, 894 } 895 896 // Fetch the response 897 var resp structs.NodeUpdateResponse 898 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 899 900 beforeUpdate := time.Now() 901 strategy := &structs.DrainStrategy{ 902 DrainSpec: structs.DrainSpec{ 903 Deadline: 10 * time.Second, 904 }, 905 } 906 907 // Update the status 908 dereg := &structs.NodeUpdateDrainRequest{ 909 NodeID: node.ID, 910 DrainStrategy: strategy, 911 Meta: map[string]string{"message": "this node is not needed"}, 912 WriteRequest: structs.WriteRequest{Region: "global"}, 913 } 914 var resp2 structs.NodeDrainUpdateResponse 915 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) 916 require.NotZero(resp2.Index) 917 918 // Check for the node in the FSM 919 state := s1.fsm.State() 920 ws := memdb.NewWatchSet() 921 out, err := state.NodeByID(ws, node.ID) 922 require.Nil(err) 923 require.NotNil(out.DrainStrategy) 924 require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) 925 require.Len(out.Events, 2) 926 require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) 927 require.NotNil(out.LastDrain) 928 require.Equal(structs.DrainMetadata{ 929 StartedAt: out.LastDrain.UpdatedAt, 930 UpdatedAt: out.LastDrain.StartedAt, 931 Status: structs.DrainStatusDraining, 932 Meta: map[string]string{"message": "this node is not needed"}, 933 }, *out.LastDrain) 934 935 // before+deadline should be before the forced deadline 936 require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline)) 937 938 // now+deadline should be after the forced deadline 939 require.True(time.Now().Add(strategy.Deadline).After(out.DrainStrategy.ForceDeadline)) 940 941 drainStartedAt := out.DrainStrategy.StartedAt 942 // StartedAt should be close to the time the drain started 943 require.WithinDuration(beforeUpdate, drainStartedAt, 1*time.Second) 944 945 // StartedAt shouldn't change if a new request comes while still draining 946 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) 947 ws = memdb.NewWatchSet() 948 out, err = state.NodeByID(ws, node.ID) 949 require.NoError(err) 950 require.True(out.DrainStrategy.StartedAt.Equal(drainStartedAt)) 951 952 // Register a system job 953 job := mock.SystemJob() 954 require.Nil(s1.State().UpsertJob(structs.MsgTypeTestSetup, 10, job)) 955 956 // Update the eligibility and expect evals 957 dereg.DrainStrategy = nil 958 dereg.MarkEligible = true 959 dereg.Meta = map[string]string{"cancelled": "yes"} 960 var resp3 structs.NodeDrainUpdateResponse 961 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp3)) 962 require.NotZero(resp3.Index) 963 require.NotZero(resp3.EvalCreateIndex) 964 require.Len(resp3.EvalIDs, 1) 965 966 // Check for updated node in the FSM 967 ws = memdb.NewWatchSet() 968 out, err = state.NodeByID(ws, node.ID) 969 require.NoError(err) 970 require.Len(out.Events, 4) 971 require.Equal(NodeDrainEventDrainDisabled, out.Events[3].Message) 972 require.NotNil(out.LastDrain) 973 require.NotNil(out.LastDrain) 974 require.False(out.LastDrain.UpdatedAt.Before(out.LastDrain.StartedAt)) 975 require.Equal(structs.DrainMetadata{ 976 StartedAt: out.LastDrain.StartedAt, 977 UpdatedAt: out.LastDrain.UpdatedAt, 978 Status: structs.DrainStatusCanceled, 979 Meta: map[string]string{"cancelled": "yes"}, 980 }, *out.LastDrain) 981 982 // Check that calling UpdateDrain with the same DrainStrategy does not emit 983 // a node event. 984 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp3)) 985 ws = memdb.NewWatchSet() 986 out, err = state.NodeByID(ws, node.ID) 987 require.NoError(err) 988 require.Len(out.Events, 4) 989} 990 991// TestClientEndpoint_UpdatedDrainAndCompleted asserts that drain metadata 992// is properly persisted in Node.LastDrain as the node drain is updated and 993// completes. 994func TestClientEndpoint_UpdatedDrainAndCompleted(t *testing.T) { 995 t.Parallel() 996 require := require.New(t) 997 998 s1, cleanupS1 := TestServer(t, nil) 999 defer cleanupS1() 1000 codec := rpcClient(t, s1) 1001 testutil.WaitForLeader(t, s1.RPC) 1002 state := s1.fsm.State() 1003 1004 // Disable drainer for now 1005 s1.nodeDrainer.SetEnabled(false, nil) 1006 1007 // Create the register request 1008 node := mock.Node() 1009 reg := &structs.NodeRegisterRequest{ 1010 Node: node, 1011 WriteRequest: structs.WriteRequest{Region: "global"}, 1012 } 1013 1014 // Fetch the response 1015 var resp structs.NodeUpdateResponse 1016 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 1017 1018 strategy := &structs.DrainStrategy{ 1019 DrainSpec: structs.DrainSpec{ 1020 Deadline: 10 * time.Second, 1021 }, 1022 } 1023 1024 // Update the status 1025 dereg := &structs.NodeUpdateDrainRequest{ 1026 NodeID: node.ID, 1027 DrainStrategy: strategy, 1028 Meta: map[string]string{ 1029 "message": "first drain", 1030 }, 1031 WriteRequest: structs.WriteRequest{Region: "global"}, 1032 } 1033 var resp2 structs.NodeDrainUpdateResponse 1034 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) 1035 require.NotZero(resp2.Index) 1036 1037 // Check for the node in the FSM 1038 out, err := state.NodeByID(nil, node.ID) 1039 require.Nil(err) 1040 require.NotNil(out.DrainStrategy) 1041 require.NotNil(out.LastDrain) 1042 firstDrainUpdate := out.LastDrain.UpdatedAt 1043 require.Equal(structs.DrainMetadata{ 1044 StartedAt: firstDrainUpdate, 1045 UpdatedAt: firstDrainUpdate, 1046 Status: structs.DrainStatusDraining, 1047 Meta: map[string]string{"message": "first drain"}, 1048 }, *out.LastDrain) 1049 1050 time.Sleep(1 * time.Second) 1051 1052 // Update the drain 1053 dereg.DrainStrategy.DrainSpec.Deadline *= 2 1054 dereg.Meta["message"] = "second drain" 1055 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) 1056 require.NotZero(resp2.Index) 1057 1058 out, err = state.NodeByID(nil, node.ID) 1059 require.Nil(err) 1060 require.NotNil(out.DrainStrategy) 1061 require.NotNil(out.LastDrain) 1062 secondDrainUpdate := out.LastDrain.UpdatedAt 1063 require.True(secondDrainUpdate.After(firstDrainUpdate)) 1064 require.Equal(structs.DrainMetadata{ 1065 StartedAt: firstDrainUpdate, 1066 UpdatedAt: secondDrainUpdate, 1067 Status: structs.DrainStatusDraining, 1068 Meta: map[string]string{"message": "second drain"}, 1069 }, *out.LastDrain) 1070 1071 time.Sleep(1 * time.Second) 1072 1073 // Enable the drainer, wait for completion 1074 s1.nodeDrainer.SetEnabled(true, state) 1075 1076 testutil.WaitForResult(func() (bool, error) { 1077 out, err = state.NodeByID(nil, node.ID) 1078 if err != nil { 1079 return false, err 1080 } 1081 if out == nil { 1082 return false, fmt.Errorf("could not find node") 1083 } 1084 return out.DrainStrategy == nil, nil 1085 }, func(err error) { 1086 t.Fatalf("err: %v", err) 1087 }) 1088 1089 require.True(out.LastDrain.UpdatedAt.After(secondDrainUpdate)) 1090 require.Equal(structs.DrainMetadata{ 1091 StartedAt: firstDrainUpdate, 1092 UpdatedAt: out.LastDrain.UpdatedAt, 1093 Status: structs.DrainStatusComplete, 1094 Meta: map[string]string{"message": "second drain"}, 1095 }, *out.LastDrain) 1096} 1097 1098// TestClientEndpoint_UpdatedDrainNoop asserts that drain metadata is properly 1099// persisted in Node.LastDrain when calls to Node.UpdateDrain() don't affect 1100// the drain status. 1101func TestClientEndpoint_UpdatedDrainNoop(t *testing.T) { 1102 t.Parallel() 1103 require := require.New(t) 1104 1105 s1, cleanupS1 := TestServer(t, nil) 1106 defer cleanupS1() 1107 codec := rpcClient(t, s1) 1108 testutil.WaitForLeader(t, s1.RPC) 1109 state := s1.fsm.State() 1110 1111 // Create the register request 1112 node := mock.Node() 1113 reg := &structs.NodeRegisterRequest{ 1114 Node: node, 1115 WriteRequest: structs.WriteRequest{Region: "global"}, 1116 } 1117 1118 // Fetch the response 1119 var resp structs.NodeUpdateResponse 1120 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 1121 1122 // Update the status 1123 dereg := &structs.NodeUpdateDrainRequest{ 1124 NodeID: node.ID, 1125 DrainStrategy: &structs.DrainStrategy{ 1126 DrainSpec: structs.DrainSpec{ 1127 Deadline: 10 * time.Second, 1128 }, 1129 }, 1130 Meta: map[string]string{ 1131 "message": "drain", 1132 }, 1133 WriteRequest: structs.WriteRequest{Region: "global"}, 1134 } 1135 var drainResp structs.NodeDrainUpdateResponse 1136 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &drainResp)) 1137 require.NotZero(drainResp.Index) 1138 1139 var out *structs.Node 1140 testutil.WaitForResult(func() (bool, error) { 1141 var err error 1142 out, err = state.NodeByID(nil, node.ID) 1143 if err != nil { 1144 return false, err 1145 } 1146 if out == nil { 1147 return false, fmt.Errorf("could not find node") 1148 } 1149 return out.DrainStrategy == nil && out.SchedulingEligibility == structs.NodeSchedulingIneligible, nil 1150 }, func(err error) { 1151 t.Fatalf("err: %v", err) 1152 }) 1153 1154 require.Equal(structs.DrainStatusComplete, out.LastDrain.Status) 1155 require.Equal(map[string]string{"message": "drain"}, out.LastDrain.Meta) 1156 prevDrain := out.LastDrain 1157 1158 // call again with Drain Strategy nil; should be a no-op because drain is already complete 1159 dereg.DrainStrategy = nil 1160 dereg.Meta = map[string]string{ 1161 "new_message": "is new", 1162 } 1163 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &drainResp)) 1164 require.NotZero(drainResp.Index) 1165 1166 out, err := state.NodeByID(nil, node.ID) 1167 require.Nil(err) 1168 require.Nil(out.DrainStrategy) 1169 require.NotNil(out.LastDrain) 1170 require.Equal(prevDrain, out.LastDrain) 1171} 1172 1173// TestClientEndpoint_UpdateDrain_ACL asserts that Node.UpdateDrain() enforces 1174// node.write ACLs, and that token accessor ID is properly persisted in 1175// Node.LastDrain.AccessorID 1176func TestClientEndpoint_UpdateDrain_ACL(t *testing.T) { 1177 t.Parallel() 1178 1179 s1, root, cleanupS1 := TestACLServer(t, nil) 1180 defer cleanupS1() 1181 codec := rpcClient(t, s1) 1182 testutil.WaitForLeader(t, s1.RPC) 1183 require := require.New(t) 1184 1185 // Create the node 1186 node := mock.Node() 1187 state := s1.fsm.State() 1188 1189 require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode") 1190 1191 // Create the policy and tokens 1192 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite)) 1193 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) 1194 1195 // Update the status without a token and expect failure 1196 dereg := &structs.NodeUpdateDrainRequest{ 1197 NodeID: node.ID, 1198 DrainStrategy: &structs.DrainStrategy{ 1199 DrainSpec: structs.DrainSpec{ 1200 Deadline: 10 * time.Second, 1201 }, 1202 }, 1203 WriteRequest: structs.WriteRequest{Region: "global"}, 1204 } 1205 { 1206 var resp structs.NodeDrainUpdateResponse 1207 err := msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp) 1208 require.NotNil(err, "RPC") 1209 require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1210 } 1211 1212 // Try with a valid token 1213 dereg.AuthToken = validToken.SecretID 1214 { 1215 var resp structs.NodeDrainUpdateResponse 1216 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp), "RPC") 1217 out, err := state.NodeByID(nil, node.ID) 1218 require.NoError(err) 1219 require.Equal(validToken.AccessorID, out.LastDrain.AccessorID) 1220 } 1221 1222 // Try with a invalid token 1223 dereg.AuthToken = invalidToken.SecretID 1224 { 1225 var resp structs.NodeDrainUpdateResponse 1226 err := msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp) 1227 require.NotNil(err, "RPC") 1228 require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1229 } 1230 1231 // Try with a root token 1232 dereg.DrainStrategy.DrainSpec.Deadline = 20 * time.Second 1233 dereg.AuthToken = root.SecretID 1234 { 1235 var resp structs.NodeDrainUpdateResponse 1236 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp), "RPC") 1237 out, err := state.NodeByID(nil, node.ID) 1238 require.NoError(err) 1239 require.Equal(root.AccessorID, out.LastDrain.AccessorID) 1240 } 1241} 1242 1243// This test ensures that Nomad marks client state of allocations which are in 1244// pending/running state to lost when a node is marked as down. 1245func TestClientEndpoint_Drain_Down(t *testing.T) { 1246 t.Parallel() 1247 1248 s1, cleanupS1 := TestServer(t, nil) 1249 defer cleanupS1() 1250 codec := rpcClient(t, s1) 1251 testutil.WaitForLeader(t, s1.RPC) 1252 require := require.New(t) 1253 1254 // Register a node 1255 node := mock.Node() 1256 reg := &structs.NodeRegisterRequest{ 1257 Node: node, 1258 WriteRequest: structs.WriteRequest{Region: "global"}, 1259 } 1260 // Fetch the response 1261 var resp structs.NodeUpdateResponse 1262 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 1263 1264 // Register a service job 1265 var jobResp structs.JobRegisterResponse 1266 job := mock.Job() 1267 job.TaskGroups[0].Count = 1 1268 jobReq := &structs.JobRegisterRequest{ 1269 Job: job, 1270 WriteRequest: structs.WriteRequest{ 1271 Region: "global", 1272 Namespace: job.Namespace, 1273 }, 1274 } 1275 require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)) 1276 1277 // Register a system job 1278 var jobResp1 structs.JobRegisterResponse 1279 job1 := mock.SystemJob() 1280 job1.TaskGroups[0].Count = 1 1281 jobReq1 := &structs.JobRegisterRequest{ 1282 Job: job1, 1283 WriteRequest: structs.WriteRequest{ 1284 Region: "global", 1285 Namespace: job1.Namespace, 1286 }, 1287 } 1288 require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq1, &jobResp1)) 1289 1290 // Wait for the scheduler to create an allocation 1291 testutil.WaitForResult(func() (bool, error) { 1292 ws := memdb.NewWatchSet() 1293 allocs, err := s1.fsm.state.AllocsByJob(ws, job.Namespace, job.ID, true) 1294 if err != nil { 1295 return false, err 1296 } 1297 allocs1, err := s1.fsm.state.AllocsByJob(ws, job1.Namespace, job1.ID, true) 1298 if err != nil { 1299 return false, err 1300 } 1301 return len(allocs) > 0 && len(allocs1) > 0, nil 1302 }, func(err error) { 1303 t.Fatalf("err: %v", err) 1304 }) 1305 1306 // Drain the node 1307 dereg := &structs.NodeUpdateDrainRequest{ 1308 NodeID: node.ID, 1309 DrainStrategy: &structs.DrainStrategy{ 1310 DrainSpec: structs.DrainSpec{ 1311 Deadline: -1 * time.Second, 1312 }, 1313 }, 1314 WriteRequest: structs.WriteRequest{Region: "global"}, 1315 } 1316 var resp2 structs.NodeDrainUpdateResponse 1317 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2)) 1318 1319 // Mark the node as down 1320 node.Status = structs.NodeStatusDown 1321 reg = &structs.NodeRegisterRequest{ 1322 Node: node, 1323 WriteRequest: structs.WriteRequest{Region: "global"}, 1324 } 1325 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 1326 1327 // Ensure that the allocation has transitioned to lost 1328 testutil.WaitForResult(func() (bool, error) { 1329 ws := memdb.NewWatchSet() 1330 summary, err := s1.fsm.state.JobSummaryByID(ws, job.Namespace, job.ID) 1331 if err != nil { 1332 return false, err 1333 } 1334 expectedSummary := &structs.JobSummary{ 1335 JobID: job.ID, 1336 Namespace: job.Namespace, 1337 Summary: map[string]structs.TaskGroupSummary{ 1338 "web": { 1339 Queued: 1, 1340 Lost: 1, 1341 }, 1342 }, 1343 Children: new(structs.JobChildrenSummary), 1344 CreateIndex: jobResp.JobModifyIndex, 1345 ModifyIndex: summary.ModifyIndex, 1346 } 1347 if !reflect.DeepEqual(summary, expectedSummary) { 1348 return false, fmt.Errorf("Service: expected: %#v, actual: %#v", expectedSummary, summary) 1349 } 1350 1351 summary1, err := s1.fsm.state.JobSummaryByID(ws, job1.Namespace, job1.ID) 1352 if err != nil { 1353 return false, err 1354 } 1355 expectedSummary1 := &structs.JobSummary{ 1356 JobID: job1.ID, 1357 Namespace: job1.Namespace, 1358 Summary: map[string]structs.TaskGroupSummary{ 1359 "web": { 1360 Lost: 1, 1361 }, 1362 }, 1363 Children: new(structs.JobChildrenSummary), 1364 CreateIndex: jobResp1.JobModifyIndex, 1365 ModifyIndex: summary1.ModifyIndex, 1366 } 1367 if !reflect.DeepEqual(summary1, expectedSummary1) { 1368 return false, fmt.Errorf("System: expected: %#v, actual: %#v", expectedSummary1, summary1) 1369 } 1370 return true, nil 1371 }, func(err error) { 1372 t.Fatalf("err: %v", err) 1373 }) 1374} 1375 1376func TestClientEndpoint_UpdateEligibility(t *testing.T) { 1377 t.Parallel() 1378 require := require.New(t) 1379 1380 s1, cleanupS1 := TestServer(t, nil) 1381 defer cleanupS1() 1382 codec := rpcClient(t, s1) 1383 testutil.WaitForLeader(t, s1.RPC) 1384 1385 // Create the register request 1386 node := mock.Node() 1387 reg := &structs.NodeRegisterRequest{ 1388 Node: node, 1389 WriteRequest: structs.WriteRequest{Region: "global"}, 1390 } 1391 1392 // Fetch the response 1393 var resp structs.NodeUpdateResponse 1394 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 1395 1396 // Update the eligibility 1397 elig := &structs.NodeUpdateEligibilityRequest{ 1398 NodeID: node.ID, 1399 Eligibility: structs.NodeSchedulingIneligible, 1400 WriteRequest: structs.WriteRequest{Region: "global"}, 1401 } 1402 var resp2 structs.NodeEligibilityUpdateResponse 1403 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp2)) 1404 require.NotZero(resp2.Index) 1405 require.Zero(resp2.EvalCreateIndex) 1406 require.Empty(resp2.EvalIDs) 1407 1408 // Check for the node in the FSM 1409 state := s1.fsm.State() 1410 out, err := state.NodeByID(nil, node.ID) 1411 require.Nil(err) 1412 require.Equal(out.SchedulingEligibility, structs.NodeSchedulingIneligible) 1413 require.Len(out.Events, 2) 1414 require.Equal(NodeEligibilityEventIneligible, out.Events[1].Message) 1415 1416 // Register a system job 1417 job := mock.SystemJob() 1418 require.Nil(s1.State().UpsertJob(structs.MsgTypeTestSetup, 10, job)) 1419 1420 // Update the eligibility and expect evals 1421 elig.Eligibility = structs.NodeSchedulingEligible 1422 var resp3 structs.NodeEligibilityUpdateResponse 1423 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", elig, &resp3)) 1424 require.NotZero(resp3.Index) 1425 require.NotZero(resp3.EvalCreateIndex) 1426 require.Len(resp3.EvalIDs, 1) 1427 1428 out, err = state.NodeByID(nil, node.ID) 1429 require.Nil(err) 1430 require.Len(out.Events, 3) 1431 require.Equal(NodeEligibilityEventEligible, out.Events[2].Message) 1432} 1433 1434func TestClientEndpoint_UpdateEligibility_ACL(t *testing.T) { 1435 t.Parallel() 1436 1437 s1, root, cleanupS1 := TestACLServer(t, nil) 1438 defer cleanupS1() 1439 codec := rpcClient(t, s1) 1440 testutil.WaitForLeader(t, s1.RPC) 1441 require := require.New(t) 1442 1443 // Create the node 1444 node := mock.Node() 1445 state := s1.fsm.State() 1446 1447 require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode") 1448 1449 // Create the policy and tokens 1450 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite)) 1451 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) 1452 1453 // Update the status without a token and expect failure 1454 dereg := &structs.NodeUpdateEligibilityRequest{ 1455 NodeID: node.ID, 1456 Eligibility: structs.NodeSchedulingIneligible, 1457 WriteRequest: structs.WriteRequest{Region: "global"}, 1458 } 1459 { 1460 var resp structs.NodeEligibilityUpdateResponse 1461 err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp) 1462 require.NotNil(err, "RPC") 1463 require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1464 } 1465 1466 // Try with a valid token 1467 dereg.AuthToken = validToken.SecretID 1468 { 1469 var resp structs.NodeEligibilityUpdateResponse 1470 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC") 1471 } 1472 1473 // Try with a invalid token 1474 dereg.AuthToken = invalidToken.SecretID 1475 { 1476 var resp structs.NodeEligibilityUpdateResponse 1477 err := msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp) 1478 require.NotNil(err, "RPC") 1479 require.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1480 } 1481 1482 // Try with a root token 1483 dereg.AuthToken = root.SecretID 1484 { 1485 var resp structs.NodeEligibilityUpdateResponse 1486 require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateEligibility", dereg, &resp), "RPC") 1487 } 1488} 1489 1490func TestClientEndpoint_GetNode(t *testing.T) { 1491 t.Parallel() 1492 1493 s1, cleanupS1 := TestServer(t, nil) 1494 defer cleanupS1() 1495 codec := rpcClient(t, s1) 1496 testutil.WaitForLeader(t, s1.RPC) 1497 1498 // Create the register request 1499 node := mock.Node() 1500 reg := &structs.NodeRegisterRequest{ 1501 Node: node, 1502 WriteRequest: structs.WriteRequest{Region: "global"}, 1503 } 1504 1505 // Fetch the response 1506 var resp structs.GenericResponse 1507 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 1508 t.Fatalf("err: %v", err) 1509 } 1510 node.CreateIndex = resp.Index 1511 node.ModifyIndex = resp.Index 1512 1513 // Lookup the node 1514 get := &structs.NodeSpecificRequest{ 1515 NodeID: node.ID, 1516 QueryOptions: structs.QueryOptions{Region: "global"}, 1517 } 1518 var resp2 structs.SingleNodeResponse 1519 if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", get, &resp2); err != nil { 1520 t.Fatalf("err: %v", err) 1521 } 1522 if resp2.Index != resp.Index { 1523 t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) 1524 } 1525 1526 if resp2.Node.ComputedClass == "" { 1527 t.Fatalf("bad ComputedClass: %#v", resp2.Node) 1528 } 1529 1530 // Update the status updated at value 1531 node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt 1532 node.SecretID = "" 1533 node.Events = resp2.Node.Events 1534 if !reflect.DeepEqual(node, resp2.Node) { 1535 t.Fatalf("bad: %#v \n %#v", node, resp2.Node) 1536 } 1537 1538 // assert that the node register event was set correctly 1539 if len(resp2.Node.Events) != 1 { 1540 t.Fatalf("Did not set node events: %#v", resp2.Node) 1541 } 1542 if resp2.Node.Events[0].Message != state.NodeRegisterEventRegistered { 1543 t.Fatalf("Did not set node register event correctly: %#v", resp2.Node) 1544 } 1545 1546 // Lookup non-existing node 1547 get.NodeID = "12345678-abcd-efab-cdef-123456789abc" 1548 if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", get, &resp2); err != nil { 1549 t.Fatalf("err: %v", err) 1550 } 1551 if resp2.Index != resp.Index { 1552 t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) 1553 } 1554 if resp2.Node != nil { 1555 t.Fatalf("unexpected node") 1556 } 1557} 1558 1559func TestClientEndpoint_GetNode_ACL(t *testing.T) { 1560 t.Parallel() 1561 1562 s1, root, cleanupS1 := TestACLServer(t, nil) 1563 defer cleanupS1() 1564 codec := rpcClient(t, s1) 1565 testutil.WaitForLeader(t, s1.RPC) 1566 assert := assert.New(t) 1567 1568 // Create the node 1569 node := mock.Node() 1570 state := s1.fsm.State() 1571 assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode") 1572 1573 // Create the policy and tokens 1574 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyRead)) 1575 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyDeny)) 1576 1577 // Lookup the node without a token and expect failure 1578 req := &structs.NodeSpecificRequest{ 1579 NodeID: node.ID, 1580 QueryOptions: structs.QueryOptions{Region: "global"}, 1581 } 1582 { 1583 var resp structs.SingleNodeResponse 1584 err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp) 1585 assert.NotNil(err, "RPC") 1586 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1587 } 1588 1589 // Try with a valid token 1590 req.AuthToken = validToken.SecretID 1591 { 1592 var resp structs.SingleNodeResponse 1593 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp), "RPC") 1594 assert.Equal(node.ID, resp.Node.ID) 1595 } 1596 1597 // Try with a Node.SecretID 1598 req.AuthToken = node.SecretID 1599 { 1600 var resp structs.SingleNodeResponse 1601 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp), "RPC") 1602 assert.Equal(node.ID, resp.Node.ID) 1603 } 1604 1605 // Try with a invalid token 1606 req.AuthToken = invalidToken.SecretID 1607 { 1608 var resp structs.SingleNodeResponse 1609 err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp) 1610 assert.NotNil(err, "RPC") 1611 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1612 } 1613 1614 // Try with a root token 1615 req.AuthToken = root.SecretID 1616 { 1617 var resp structs.SingleNodeResponse 1618 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp), "RPC") 1619 assert.Equal(node.ID, resp.Node.ID) 1620 } 1621} 1622 1623func TestClientEndpoint_GetNode_Blocking(t *testing.T) { 1624 t.Parallel() 1625 1626 s1, cleanupS1 := TestServer(t, nil) 1627 defer cleanupS1() 1628 state := s1.fsm.State() 1629 codec := rpcClient(t, s1) 1630 testutil.WaitForLeader(t, s1.RPC) 1631 1632 // Create the node 1633 node1 := mock.Node() 1634 node2 := mock.Node() 1635 1636 // First create an unrelated node. 1637 time.AfterFunc(100*time.Millisecond, func() { 1638 if err := state.UpsertNode(structs.MsgTypeTestSetup, 100, node1); err != nil { 1639 t.Fatalf("err: %v", err) 1640 } 1641 }) 1642 1643 // Upsert the node we are watching later 1644 time.AfterFunc(200*time.Millisecond, func() { 1645 if err := state.UpsertNode(structs.MsgTypeTestSetup, 200, node2); err != nil { 1646 t.Fatalf("err: %v", err) 1647 } 1648 }) 1649 1650 // Lookup the node 1651 req := &structs.NodeSpecificRequest{ 1652 NodeID: node2.ID, 1653 QueryOptions: structs.QueryOptions{ 1654 Region: "global", 1655 MinQueryIndex: 150, 1656 }, 1657 } 1658 var resp structs.SingleNodeResponse 1659 start := time.Now() 1660 if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp); err != nil { 1661 t.Fatalf("err: %v", err) 1662 } 1663 1664 if elapsed := time.Since(start); elapsed < 200*time.Millisecond { 1665 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 1666 } 1667 if resp.Index != 200 { 1668 t.Fatalf("Bad index: %d %d", resp.Index, 200) 1669 } 1670 if resp.Node == nil || resp.Node.ID != node2.ID { 1671 t.Fatalf("bad: %#v", resp.Node) 1672 } 1673 1674 // Node update triggers watches 1675 time.AfterFunc(100*time.Millisecond, func() { 1676 nodeUpdate := mock.Node() 1677 nodeUpdate.ID = node2.ID 1678 nodeUpdate.Status = structs.NodeStatusDown 1679 if err := state.UpsertNode(structs.MsgTypeTestSetup, 300, nodeUpdate); err != nil { 1680 t.Fatalf("err: %v", err) 1681 } 1682 }) 1683 1684 req.QueryOptions.MinQueryIndex = 250 1685 var resp2 structs.SingleNodeResponse 1686 start = time.Now() 1687 if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp2); err != nil { 1688 t.Fatalf("err: %v", err) 1689 } 1690 1691 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 1692 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 1693 } 1694 if resp2.Index != 300 { 1695 t.Fatalf("Bad index: %d %d", resp2.Index, 300) 1696 } 1697 if resp2.Node == nil || resp2.Node.Status != structs.NodeStatusDown { 1698 t.Fatalf("bad: %#v", resp2.Node) 1699 } 1700 1701 // Node delete triggers watches 1702 time.AfterFunc(100*time.Millisecond, func() { 1703 if err := state.DeleteNode(structs.MsgTypeTestSetup, 400, []string{node2.ID}); err != nil { 1704 t.Fatalf("err: %v", err) 1705 } 1706 }) 1707 1708 req.QueryOptions.MinQueryIndex = 350 1709 var resp3 structs.SingleNodeResponse 1710 start = time.Now() 1711 if err := msgpackrpc.CallWithCodec(codec, "Node.GetNode", req, &resp3); err != nil { 1712 t.Fatalf("err: %v", err) 1713 } 1714 1715 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 1716 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 1717 } 1718 if resp3.Index != 400 { 1719 t.Fatalf("Bad index: %d %d", resp2.Index, 400) 1720 } 1721 if resp3.Node != nil { 1722 t.Fatalf("bad: %#v", resp3.Node) 1723 } 1724} 1725 1726func TestClientEndpoint_GetAllocs(t *testing.T) { 1727 t.Parallel() 1728 1729 s1, cleanupS1 := TestServer(t, nil) 1730 defer cleanupS1() 1731 codec := rpcClient(t, s1) 1732 testutil.WaitForLeader(t, s1.RPC) 1733 1734 // Create the register request 1735 node := mock.Node() 1736 reg := &structs.NodeRegisterRequest{ 1737 Node: node, 1738 WriteRequest: structs.WriteRequest{Region: "global"}, 1739 } 1740 1741 // Fetch the response 1742 var resp structs.GenericResponse 1743 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 1744 t.Fatalf("err: %v", err) 1745 } 1746 node.CreateIndex = resp.Index 1747 node.ModifyIndex = resp.Index 1748 1749 // Inject fake evaluations 1750 alloc := mock.Alloc() 1751 alloc.NodeID = node.ID 1752 state := s1.fsm.State() 1753 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 1754 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) 1755 if err != nil { 1756 t.Fatalf("err: %v", err) 1757 } 1758 1759 // Lookup the allocs 1760 get := &structs.NodeSpecificRequest{ 1761 NodeID: node.ID, 1762 QueryOptions: structs.QueryOptions{Region: "global"}, 1763 } 1764 var resp2 structs.NodeAllocsResponse 1765 if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", get, &resp2); err != nil { 1766 t.Fatalf("err: %v", err) 1767 } 1768 if resp2.Index != 100 { 1769 t.Fatalf("Bad index: %d %d", resp2.Index, 100) 1770 } 1771 1772 if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID { 1773 t.Fatalf("bad: %#v", resp2.Allocs) 1774 } 1775 1776 // Lookup non-existing node 1777 get.NodeID = "foobarbaz" 1778 if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", get, &resp2); err != nil { 1779 t.Fatalf("err: %v", err) 1780 } 1781 if resp2.Index != 100 { 1782 t.Fatalf("Bad index: %d %d", resp2.Index, 100) 1783 } 1784 if len(resp2.Allocs) != 0 { 1785 t.Fatalf("unexpected node") 1786 } 1787} 1788 1789func TestClientEndpoint_GetAllocs_ACL_Basic(t *testing.T) { 1790 t.Parallel() 1791 1792 s1, root, cleanupS1 := TestACLServer(t, nil) 1793 defer cleanupS1() 1794 codec := rpcClient(t, s1) 1795 testutil.WaitForLeader(t, s1.RPC) 1796 assert := assert.New(t) 1797 1798 // Create the node 1799 allocDefaultNS := mock.Alloc() 1800 node := mock.Node() 1801 allocDefaultNS.NodeID = node.ID 1802 state := s1.fsm.State() 1803 assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode") 1804 assert.Nil(state.UpsertJobSummary(2, mock.JobSummary(allocDefaultNS.JobID)), "UpsertJobSummary") 1805 allocs := []*structs.Allocation{allocDefaultNS} 1806 assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 5, allocs), "UpsertAllocs") 1807 1808 // Create the namespace policy and tokens 1809 validDefaultToken := mock.CreatePolicyAndToken(t, state, 1001, "test-default-valid", mock.NodePolicy(acl.PolicyRead)+ 1810 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 1811 invalidToken := mock.CreatePolicyAndToken(t, state, 1004, "test-invalid", 1812 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 1813 1814 req := &structs.NodeSpecificRequest{ 1815 NodeID: node.ID, 1816 QueryOptions: structs.QueryOptions{ 1817 Region: "global", 1818 }, 1819 } 1820 1821 // Lookup the node without a token and expect failure 1822 { 1823 var resp structs.NodeAllocsResponse 1824 err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) 1825 assert.NotNil(err, "RPC") 1826 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1827 } 1828 1829 // Try with a valid token for the default namespace 1830 req.AuthToken = validDefaultToken.SecretID 1831 { 1832 var resp structs.NodeAllocsResponse 1833 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") 1834 assert.Len(resp.Allocs, 1) 1835 assert.Equal(allocDefaultNS.ID, resp.Allocs[0].ID) 1836 } 1837 1838 // Try with a invalid token 1839 req.AuthToken = invalidToken.SecretID 1840 { 1841 var resp structs.NodeAllocsResponse 1842 err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) 1843 assert.NotNil(err, "RPC") 1844 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1845 } 1846 1847 // Try with a root token 1848 req.AuthToken = root.SecretID 1849 { 1850 var resp structs.NodeAllocsResponse 1851 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") 1852 assert.Len(resp.Allocs, 1) 1853 for _, alloc := range resp.Allocs { 1854 switch alloc.ID { 1855 case allocDefaultNS.ID: 1856 // expected 1857 default: 1858 t.Errorf("unexpected alloc %q for namespace %q", alloc.ID, alloc.Namespace) 1859 } 1860 } 1861 } 1862} 1863 1864func TestClientEndpoint_GetAllocs_ACL_Namespaces(t *testing.T) { 1865 t.Parallel() 1866 s1, root, cleanupS1 := TestACLServer(t, nil) 1867 defer cleanupS1() 1868 codec := rpcClient(t, s1) 1869 testutil.WaitForLeader(t, s1.RPC) 1870 assert := assert.New(t) 1871 1872 // Create the namespaces 1873 ns1 := mock.Namespace() 1874 ns2 := mock.Namespace() 1875 ns1.Name = "altnamespace" 1876 ns2.Name = "should-only-be-displayed-for-root-ns" 1877 1878 // Create the allocs 1879 allocDefaultNS := mock.Alloc() 1880 allocAltNS := mock.Alloc() 1881 allocAltNS.Namespace = ns1.Name 1882 allocOtherNS := mock.Alloc() 1883 allocOtherNS.Namespace = ns2.Name 1884 1885 node := mock.Node() 1886 allocDefaultNS.NodeID = node.ID 1887 allocAltNS.NodeID = node.ID 1888 allocOtherNS.NodeID = node.ID 1889 state := s1.fsm.State() 1890 assert.Nil(state.UpsertNamespaces(1, []*structs.Namespace{ns1, ns2}), "UpsertNamespaces") 1891 assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 2, node), "UpsertNode") 1892 assert.Nil(state.UpsertJobSummary(3, mock.JobSummary(allocDefaultNS.JobID)), "UpsertJobSummary") 1893 assert.Nil(state.UpsertJobSummary(4, mock.JobSummary(allocAltNS.JobID)), "UpsertJobSummary") 1894 assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(allocOtherNS.JobID)), "UpsertJobSummary") 1895 allocs := []*structs.Allocation{allocDefaultNS, allocAltNS, allocOtherNS} 1896 assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 6, allocs), "UpsertAllocs") 1897 1898 // Create the namespace policy and tokens 1899 validDefaultToken := mock.CreatePolicyAndToken(t, state, 1001, "test-default-valid", mock.NodePolicy(acl.PolicyRead)+ 1900 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 1901 validNoNSToken := mock.CreatePolicyAndToken(t, state, 1003, "test-alt-valid", mock.NodePolicy(acl.PolicyRead)) 1902 invalidToken := mock.CreatePolicyAndToken(t, state, 1004, "test-invalid", 1903 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 1904 1905 // Lookup the node without a token and expect failure 1906 req := &structs.NodeSpecificRequest{ 1907 NodeID: node.ID, 1908 QueryOptions: structs.QueryOptions{Region: "global"}, 1909 } 1910 { 1911 var resp structs.NodeAllocsResponse 1912 err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) 1913 assert.NotNil(err, "RPC") 1914 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1915 } 1916 1917 // Try with a valid token for the default namespace 1918 req.AuthToken = validDefaultToken.SecretID 1919 { 1920 var resp structs.NodeAllocsResponse 1921 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") 1922 assert.Len(resp.Allocs, 1) 1923 assert.Equal(allocDefaultNS.ID, resp.Allocs[0].ID) 1924 } 1925 1926 // Try with a valid token for a namespace with no allocs on this node 1927 req.AuthToken = validNoNSToken.SecretID 1928 { 1929 var resp structs.NodeAllocsResponse 1930 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") 1931 assert.Len(resp.Allocs, 0) 1932 } 1933 1934 // Try with a invalid token 1935 req.AuthToken = invalidToken.SecretID 1936 { 1937 var resp structs.NodeAllocsResponse 1938 err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp) 1939 assert.NotNil(err, "RPC") 1940 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 1941 } 1942 1943 // Try with a root token 1944 req.AuthToken = root.SecretID 1945 { 1946 var resp structs.NodeAllocsResponse 1947 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp), "RPC") 1948 assert.Len(resp.Allocs, 3) 1949 for _, alloc := range resp.Allocs { 1950 switch alloc.ID { 1951 case allocDefaultNS.ID, allocAltNS.ID, allocOtherNS.ID: 1952 // expected 1953 default: 1954 t.Errorf("unexpected alloc %q for namespace %q", alloc.ID, alloc.Namespace) 1955 } 1956 } 1957 } 1958} 1959 1960func TestClientEndpoint_GetClientAllocs(t *testing.T) { 1961 t.Parallel() 1962 require := require.New(t) 1963 1964 s1, cleanupS1 := TestServer(t, nil) 1965 defer cleanupS1() 1966 codec := rpcClient(t, s1) 1967 testutil.WaitForLeader(t, s1.RPC) 1968 1969 // Check that we have no client connections 1970 require.Empty(s1.connectedNodes()) 1971 1972 // Create the register request 1973 node := mock.Node() 1974 state := s1.fsm.State() 1975 require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 98, node)) 1976 1977 // Inject fake evaluations 1978 alloc := mock.Alloc() 1979 alloc.NodeID = node.ID 1980 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 1981 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) 1982 if err != nil { 1983 t.Fatalf("err: %v", err) 1984 } 1985 1986 // Lookup the allocs 1987 get := &structs.NodeSpecificRequest{ 1988 NodeID: node.ID, 1989 SecretID: node.SecretID, 1990 QueryOptions: structs.QueryOptions{Region: "global"}, 1991 } 1992 var resp2 structs.NodeClientAllocsResponse 1993 if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2); err != nil { 1994 t.Fatalf("err: %v", err) 1995 } 1996 if resp2.Index != 100 { 1997 t.Fatalf("Bad index: %d %d", resp2.Index, 100) 1998 } 1999 2000 if len(resp2.Allocs) != 1 || resp2.Allocs[alloc.ID] != 100 { 2001 t.Fatalf("bad: %#v", resp2.Allocs) 2002 } 2003 2004 // Check that we have the client connections 2005 nodes := s1.connectedNodes() 2006 require.Len(nodes, 1) 2007 require.Contains(nodes, node.ID) 2008 2009 // Lookup node with bad SecretID 2010 get.SecretID = "foobarbaz" 2011 var resp3 structs.NodeClientAllocsResponse 2012 err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp3) 2013 if err == nil || !strings.Contains(err.Error(), "does not match") { 2014 t.Fatalf("err: %v", err) 2015 } 2016 2017 // Lookup non-existing node 2018 get.NodeID = uuid.Generate() 2019 var resp4 structs.NodeClientAllocsResponse 2020 if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp4); err != nil { 2021 t.Fatalf("err: %v", err) 2022 } 2023 if resp4.Index != 100 { 2024 t.Fatalf("Bad index: %d %d", resp3.Index, 100) 2025 } 2026 if len(resp4.Allocs) != 0 { 2027 t.Fatalf("unexpected node %#v", resp3.Allocs) 2028 } 2029 2030 // Close the connection and check that we remove the client connections 2031 require.Nil(codec.Close()) 2032 testutil.WaitForResult(func() (bool, error) { 2033 nodes := s1.connectedNodes() 2034 return len(nodes) == 0, nil 2035 }, func(err error) { 2036 t.Fatalf("should have no clients") 2037 }) 2038} 2039 2040func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { 2041 t.Parallel() 2042 2043 s1, cleanupS1 := TestServer(t, nil) 2044 defer cleanupS1() 2045 codec := rpcClient(t, s1) 2046 testutil.WaitForLeader(t, s1.RPC) 2047 2048 // Create the register request 2049 node := mock.Node() 2050 reg := &structs.NodeRegisterRequest{ 2051 Node: node, 2052 WriteRequest: structs.WriteRequest{Region: "global"}, 2053 } 2054 2055 // Fetch the response 2056 var resp structs.GenericResponse 2057 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2058 t.Fatalf("err: %v", err) 2059 } 2060 node.CreateIndex = resp.Index 2061 node.ModifyIndex = resp.Index 2062 2063 // Inject fake evaluations async 2064 now := time.Now().UTC().UnixNano() 2065 alloc := mock.Alloc() 2066 alloc.NodeID = node.ID 2067 alloc.ModifyTime = now 2068 state := s1.fsm.State() 2069 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 2070 start := time.Now() 2071 time.AfterFunc(100*time.Millisecond, func() { 2072 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) 2073 if err != nil { 2074 t.Fatalf("err: %v", err) 2075 } 2076 }) 2077 2078 // Lookup the allocs in a blocking query 2079 req := &structs.NodeSpecificRequest{ 2080 NodeID: node.ID, 2081 SecretID: node.SecretID, 2082 QueryOptions: structs.QueryOptions{ 2083 Region: "global", 2084 MinQueryIndex: 50, 2085 MaxQueryTime: time.Second, 2086 }, 2087 } 2088 var resp2 structs.NodeClientAllocsResponse 2089 if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2); err != nil { 2090 t.Fatalf("err: %v", err) 2091 } 2092 2093 // Should block at least 100ms 2094 if time.Since(start) < 100*time.Millisecond { 2095 t.Fatalf("too fast") 2096 } 2097 2098 if resp2.Index != 100 { 2099 t.Fatalf("Bad index: %d %d", resp2.Index, 100) 2100 } 2101 2102 if len(resp2.Allocs) != 1 || resp2.Allocs[alloc.ID] != 100 { 2103 t.Fatalf("bad: %#v", resp2.Allocs) 2104 } 2105 2106 iter, err := state.AllocsByIDPrefix(nil, structs.DefaultNamespace, alloc.ID) 2107 if err != nil { 2108 t.Fatalf("err: %v", err) 2109 } 2110 2111 getAllocs := func(iter memdb.ResultIterator) []*structs.Allocation { 2112 var allocs []*structs.Allocation 2113 for { 2114 raw := iter.Next() 2115 if raw == nil { 2116 break 2117 } 2118 allocs = append(allocs, raw.(*structs.Allocation)) 2119 } 2120 return allocs 2121 } 2122 out := getAllocs(iter) 2123 2124 if len(out) != 1 { 2125 t.Fatalf("Expected to get one allocation but got:%v", out) 2126 } 2127 2128 if out[0].ModifyTime != now { 2129 t.Fatalf("Invalid modify time %v", out[0].ModifyTime) 2130 } 2131 2132 // Alloc updates fire watches 2133 time.AfterFunc(100*time.Millisecond, func() { 2134 allocUpdate := mock.Alloc() 2135 allocUpdate.NodeID = alloc.NodeID 2136 allocUpdate.ID = alloc.ID 2137 allocUpdate.ClientStatus = structs.AllocClientStatusRunning 2138 state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID)) 2139 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate}) 2140 if err != nil { 2141 t.Fatalf("err: %v", err) 2142 } 2143 }) 2144 2145 req.QueryOptions.MinQueryIndex = 150 2146 var resp3 structs.NodeClientAllocsResponse 2147 if err := msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3); err != nil { 2148 t.Fatalf("err: %v", err) 2149 } 2150 2151 if time.Since(start) < 100*time.Millisecond { 2152 t.Fatalf("too fast") 2153 } 2154 if resp3.Index != 200 { 2155 t.Fatalf("Bad index: %d %d", resp3.Index, 200) 2156 } 2157 if len(resp3.Allocs) != 1 || resp3.Allocs[alloc.ID] != 200 { 2158 t.Fatalf("bad: %#v", resp3.Allocs) 2159 } 2160} 2161 2162func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { 2163 t.Parallel() 2164 assert := assert.New(t) 2165 2166 s1, cleanupS1 := TestServer(t, nil) 2167 defer cleanupS1() 2168 codec := rpcClient(t, s1) 2169 testutil.WaitForLeader(t, s1.RPC) 2170 2171 // Create the register request 2172 node := mock.Node() 2173 reg := &structs.NodeRegisterRequest{ 2174 Node: node, 2175 WriteRequest: structs.WriteRequest{Region: "global"}, 2176 } 2177 2178 // Fetch the response 2179 var resp structs.GenericResponse 2180 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 2181 node.CreateIndex = resp.Index 2182 node.ModifyIndex = resp.Index 2183 2184 // Inject fake allocations async 2185 alloc1 := mock.Alloc() 2186 alloc1.NodeID = node.ID 2187 alloc2 := mock.Alloc() 2188 alloc2.NodeID = node.ID 2189 state := s1.fsm.State() 2190 state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) 2191 start := time.Now() 2192 time.AfterFunc(100*time.Millisecond, func() { 2193 assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc1, alloc2})) 2194 }) 2195 2196 // Lookup the allocs in a blocking query 2197 req := &structs.NodeSpecificRequest{ 2198 NodeID: node.ID, 2199 SecretID: node.SecretID, 2200 QueryOptions: structs.QueryOptions{ 2201 Region: "global", 2202 MinQueryIndex: 50, 2203 MaxQueryTime: time.Second, 2204 }, 2205 } 2206 var resp2 structs.NodeClientAllocsResponse 2207 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp2)) 2208 2209 // Should block at least 100ms 2210 if time.Since(start) < 100*time.Millisecond { 2211 t.Fatalf("too fast") 2212 } 2213 2214 assert.EqualValues(100, resp2.Index) 2215 if assert.Len(resp2.Allocs, 2) { 2216 assert.EqualValues(100, resp2.Allocs[alloc1.ID]) 2217 } 2218 2219 // Delete an allocation 2220 time.AfterFunc(100*time.Millisecond, func() { 2221 assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID})) 2222 }) 2223 2224 req.QueryOptions.MinQueryIndex = 150 2225 var resp3 structs.NodeClientAllocsResponse 2226 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", req, &resp3)) 2227 2228 if time.Since(start) < 100*time.Millisecond { 2229 t.Fatalf("too fast") 2230 } 2231 assert.EqualValues(200, resp3.Index) 2232 if assert.Len(resp3.Allocs, 1) { 2233 assert.EqualValues(100, resp3.Allocs[alloc1.ID]) 2234 } 2235} 2236 2237// A MigrateToken should not be created if an allocation shares the same node 2238// with its previous allocation 2239func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) { 2240 t.Parallel() 2241 assert := assert.New(t) 2242 2243 s1, cleanupS1 := TestServer(t, nil) 2244 defer cleanupS1() 2245 codec := rpcClient(t, s1) 2246 testutil.WaitForLeader(t, s1.RPC) 2247 2248 // Create the register request 2249 node := mock.Node() 2250 reg := &structs.NodeRegisterRequest{ 2251 Node: node, 2252 WriteRequest: structs.WriteRequest{Region: "global"}, 2253 } 2254 2255 // Fetch the response 2256 var resp structs.GenericResponse 2257 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2258 t.Fatalf("err: %v", err) 2259 } 2260 node.CreateIndex = resp.Index 2261 node.ModifyIndex = resp.Index 2262 2263 // Inject fake evaluations 2264 prevAlloc := mock.Alloc() 2265 prevAlloc.NodeID = node.ID 2266 alloc := mock.Alloc() 2267 alloc.NodeID = node.ID 2268 alloc.PreviousAllocation = prevAlloc.ID 2269 alloc.DesiredStatus = structs.AllocClientStatusComplete 2270 state := s1.fsm.State() 2271 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 2272 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{prevAlloc, alloc}) 2273 assert.Nil(err) 2274 2275 // Lookup the allocs 2276 get := &structs.NodeSpecificRequest{ 2277 NodeID: node.ID, 2278 SecretID: node.SecretID, 2279 QueryOptions: structs.QueryOptions{Region: "global"}, 2280 } 2281 var resp2 structs.NodeClientAllocsResponse 2282 2283 err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2) 2284 assert.Nil(err) 2285 2286 assert.Equal(uint64(100), resp2.Index) 2287 assert.Equal(2, len(resp2.Allocs)) 2288 assert.Equal(uint64(100), resp2.Allocs[alloc.ID]) 2289 assert.Equal(0, len(resp2.MigrateTokens)) 2290} 2291 2292func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { 2293 t.Parallel() 2294 2295 s1, cleanupS1 := TestServer(t, nil) 2296 defer cleanupS1() 2297 codec := rpcClient(t, s1) 2298 testutil.WaitForLeader(t, s1.RPC) 2299 2300 // Create the register request 2301 node := mock.Node() 2302 reg := &structs.NodeRegisterRequest{ 2303 Node: node, 2304 WriteRequest: structs.WriteRequest{Region: "global"}, 2305 } 2306 2307 // Fetch the response 2308 var resp structs.GenericResponse 2309 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2310 t.Fatalf("err: %v", err) 2311 } 2312 node.CreateIndex = resp.Index 2313 node.ModifyIndex = resp.Index 2314 2315 // Inject fake evaluations async 2316 alloc := mock.Alloc() 2317 alloc.NodeID = node.ID 2318 state := s1.fsm.State() 2319 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 2320 start := time.Now() 2321 time.AfterFunc(100*time.Millisecond, func() { 2322 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) 2323 if err != nil { 2324 t.Fatalf("err: %v", err) 2325 } 2326 }) 2327 2328 // Lookup the allocs in a blocking query 2329 req := &structs.NodeSpecificRequest{ 2330 NodeID: node.ID, 2331 QueryOptions: structs.QueryOptions{ 2332 Region: "global", 2333 MinQueryIndex: 50, 2334 MaxQueryTime: time.Second, 2335 }, 2336 } 2337 var resp2 structs.NodeAllocsResponse 2338 if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp2); err != nil { 2339 t.Fatalf("err: %v", err) 2340 } 2341 2342 // Should block at least 100ms 2343 if time.Since(start) < 100*time.Millisecond { 2344 t.Fatalf("too fast") 2345 } 2346 2347 if resp2.Index != 100 { 2348 t.Fatalf("Bad index: %d %d", resp2.Index, 100) 2349 } 2350 2351 if len(resp2.Allocs) != 1 || resp2.Allocs[0].ID != alloc.ID { 2352 t.Fatalf("bad: %#v", resp2.Allocs) 2353 } 2354 2355 // Alloc updates fire watches 2356 time.AfterFunc(100*time.Millisecond, func() { 2357 allocUpdate := mock.Alloc() 2358 allocUpdate.NodeID = alloc.NodeID 2359 allocUpdate.ID = alloc.ID 2360 allocUpdate.ClientStatus = structs.AllocClientStatusRunning 2361 state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID)) 2362 err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate}) 2363 if err != nil { 2364 t.Fatalf("err: %v", err) 2365 } 2366 }) 2367 2368 req.QueryOptions.MinQueryIndex = 150 2369 var resp3 structs.NodeAllocsResponse 2370 if err := msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", req, &resp3); err != nil { 2371 t.Fatalf("err: %v", err) 2372 } 2373 2374 if time.Since(start) < 100*time.Millisecond { 2375 t.Fatalf("too fast") 2376 } 2377 if resp3.Index != 200 { 2378 t.Fatalf("Bad index: %d %d", resp3.Index, 200) 2379 } 2380 if len(resp3.Allocs) != 1 || resp3.Allocs[0].ClientStatus != structs.AllocClientStatusRunning { 2381 t.Fatalf("bad: %#v", resp3.Allocs[0]) 2382 } 2383} 2384 2385func TestClientEndpoint_UpdateAlloc(t *testing.T) { 2386 t.Parallel() 2387 2388 s1, cleanupS1 := TestServer(t, func(c *Config) { 2389 // Disabling scheduling in this test so that we can 2390 // ensure that the state store doesn't accumulate more evals 2391 // than what we expect the unit test to add 2392 c.NumSchedulers = 0 2393 }) 2394 2395 defer cleanupS1() 2396 codec := rpcClient(t, s1) 2397 testutil.WaitForLeader(t, s1.RPC) 2398 require := require.New(t) 2399 2400 // Create the register request 2401 node := mock.Node() 2402 reg := &structs.NodeRegisterRequest{ 2403 Node: node, 2404 WriteRequest: structs.WriteRequest{Region: "global"}, 2405 } 2406 2407 // Fetch the response 2408 var resp structs.GenericResponse 2409 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2410 t.Fatalf("err: %v", err) 2411 } 2412 2413 state := s1.fsm.State() 2414 // Inject mock job 2415 job := mock.Job() 2416 job.ID = "mytestjob" 2417 err := state.UpsertJob(structs.MsgTypeTestSetup, 101, job) 2418 require.Nil(err) 2419 2420 // Inject fake allocations 2421 alloc := mock.Alloc() 2422 alloc.JobID = job.ID 2423 alloc.NodeID = node.ID 2424 err = state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 2425 require.Nil(err) 2426 alloc.TaskGroup = job.TaskGroups[0].Name 2427 2428 alloc2 := mock.Alloc() 2429 alloc2.JobID = job.ID 2430 alloc2.NodeID = node.ID 2431 err = state.UpsertJobSummary(99, mock.JobSummary(alloc2.JobID)) 2432 require.Nil(err) 2433 alloc2.TaskGroup = job.TaskGroups[0].Name 2434 2435 err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc, alloc2}) 2436 require.Nil(err) 2437 2438 // Attempt updates of more than one alloc for the same job 2439 clientAlloc1 := new(structs.Allocation) 2440 *clientAlloc1 = *alloc 2441 clientAlloc1.ClientStatus = structs.AllocClientStatusFailed 2442 2443 clientAlloc2 := new(structs.Allocation) 2444 *clientAlloc2 = *alloc2 2445 clientAlloc2.ClientStatus = structs.AllocClientStatusFailed 2446 2447 // Update the alloc 2448 update := &structs.AllocUpdateRequest{ 2449 Alloc: []*structs.Allocation{clientAlloc1, clientAlloc2}, 2450 WriteRequest: structs.WriteRequest{Region: "global"}, 2451 } 2452 var resp2 structs.NodeAllocsResponse 2453 start := time.Now() 2454 err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2) 2455 require.Nil(err) 2456 require.NotEqual(uint64(0), resp2.Index) 2457 2458 if diff := time.Since(start); diff < batchUpdateInterval { 2459 t.Fatalf("too fast: %v", diff) 2460 } 2461 2462 // Lookup the alloc 2463 ws := memdb.NewWatchSet() 2464 out, err := state.AllocByID(ws, alloc.ID) 2465 require.Nil(err) 2466 require.Equal(structs.AllocClientStatusFailed, out.ClientStatus) 2467 require.True(out.ModifyTime > 0) 2468 2469 // Assert that exactly one eval with TriggeredBy EvalTriggerRetryFailedAlloc exists 2470 evaluations, err := state.EvalsByJob(ws, job.Namespace, job.ID) 2471 require.Nil(err) 2472 require.True(len(evaluations) != 0) 2473 foundCount := 0 2474 for _, resultEval := range evaluations { 2475 if resultEval.TriggeredBy == structs.EvalTriggerRetryFailedAlloc && resultEval.WaitUntil.IsZero() { 2476 foundCount++ 2477 } 2478 } 2479 require.Equal(1, foundCount, "Should create exactly one eval for failed allocs") 2480 2481} 2482 2483func TestClientEndpoint_BatchUpdate(t *testing.T) { 2484 t.Parallel() 2485 2486 s1, cleanupS1 := TestServer(t, nil) 2487 defer cleanupS1() 2488 codec := rpcClient(t, s1) 2489 testutil.WaitForLeader(t, s1.RPC) 2490 2491 // Create the register request 2492 node := mock.Node() 2493 reg := &structs.NodeRegisterRequest{ 2494 Node: node, 2495 WriteRequest: structs.WriteRequest{Region: "global"}, 2496 } 2497 2498 // Fetch the response 2499 var resp structs.GenericResponse 2500 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2501 t.Fatalf("err: %v", err) 2502 } 2503 2504 // Inject fake evaluations 2505 alloc := mock.Alloc() 2506 alloc.NodeID = node.ID 2507 state := s1.fsm.State() 2508 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 2509 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) 2510 if err != nil { 2511 t.Fatalf("err: %v", err) 2512 } 2513 2514 // Attempt update 2515 clientAlloc := new(structs.Allocation) 2516 *clientAlloc = *alloc 2517 clientAlloc.ClientStatus = structs.AllocClientStatusFailed 2518 2519 // Call to do the batch update 2520 bf := structs.NewBatchFuture() 2521 endpoint := s1.staticEndpoints.Node 2522 endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc}, nil) 2523 if err := bf.Wait(); err != nil { 2524 t.Fatalf("err: %v", err) 2525 } 2526 if bf.Index() == 0 { 2527 t.Fatalf("Bad index: %d", bf.Index()) 2528 } 2529 2530 // Lookup the alloc 2531 ws := memdb.NewWatchSet() 2532 out, err := state.AllocByID(ws, alloc.ID) 2533 if err != nil { 2534 t.Fatalf("err: %v", err) 2535 } 2536 if out.ClientStatus != structs.AllocClientStatusFailed { 2537 t.Fatalf("Bad: %#v", out) 2538 } 2539} 2540 2541func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) { 2542 t.Parallel() 2543 2544 s1, cleanupS1 := TestServer(t, nil) 2545 defer cleanupS1() 2546 codec := rpcClient(t, s1) 2547 testutil.WaitForLeader(t, s1.RPC) 2548 2549 // Create the register request 2550 node := mock.Node() 2551 reg := &structs.NodeRegisterRequest{ 2552 Node: node, 2553 WriteRequest: structs.WriteRequest{Region: "global"}, 2554 } 2555 2556 // Fetch the response 2557 var resp structs.GenericResponse 2558 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2559 t.Fatalf("err: %v", err) 2560 } 2561 2562 // Swap the servers Vault Client 2563 tvc := &TestVaultClient{} 2564 s1.vault = tvc 2565 2566 // Inject fake allocation and vault accessor 2567 alloc := mock.Alloc() 2568 alloc.NodeID = node.ID 2569 state := s1.fsm.State() 2570 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 2571 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}); err != nil { 2572 t.Fatalf("err: %v", err) 2573 } 2574 2575 va := mock.VaultAccessor() 2576 va.NodeID = node.ID 2577 va.AllocID = alloc.ID 2578 if err := state.UpsertVaultAccessor(101, []*structs.VaultAccessor{va}); err != nil { 2579 t.Fatalf("err: %v", err) 2580 } 2581 2582 // Inject mock job 2583 job := mock.Job() 2584 job.ID = alloc.JobID 2585 err := state.UpsertJob(structs.MsgTypeTestSetup, 101, job) 2586 if err != nil { 2587 t.Fatalf("err: %v", err) 2588 } 2589 2590 // Attempt update 2591 clientAlloc := new(structs.Allocation) 2592 *clientAlloc = *alloc 2593 clientAlloc.ClientStatus = structs.AllocClientStatusFailed 2594 2595 // Update the alloc 2596 update := &structs.AllocUpdateRequest{ 2597 Alloc: []*structs.Allocation{clientAlloc}, 2598 WriteRequest: structs.WriteRequest{Region: "global"}, 2599 } 2600 var resp2 structs.NodeAllocsResponse 2601 start := time.Now() 2602 if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2); err != nil { 2603 t.Fatalf("err: %v", err) 2604 } 2605 if resp2.Index == 0 { 2606 t.Fatalf("Bad index: %d", resp2.Index) 2607 } 2608 if diff := time.Since(start); diff < batchUpdateInterval { 2609 t.Fatalf("too fast: %v", diff) 2610 } 2611 2612 // Lookup the alloc 2613 ws := memdb.NewWatchSet() 2614 out, err := state.AllocByID(ws, alloc.ID) 2615 if err != nil { 2616 t.Fatalf("err: %v", err) 2617 } 2618 if out.ClientStatus != structs.AllocClientStatusFailed { 2619 t.Fatalf("Bad: %#v", out) 2620 } 2621 2622 if l := len(tvc.RevokedTokens); l != 1 { 2623 t.Fatalf("Deregister revoked %d tokens; want 1", l) 2624 } 2625} 2626 2627func TestClientEndpoint_CreateNodeEvals(t *testing.T) { 2628 t.Parallel() 2629 2630 s1, cleanupS1 := TestServer(t, nil) 2631 defer cleanupS1() 2632 testutil.WaitForLeader(t, s1.RPC) 2633 2634 // Inject fake evaluations 2635 alloc := mock.Alloc() 2636 state := s1.fsm.State() 2637 state.UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) 2638 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil { 2639 t.Fatalf("err: %v", err) 2640 } 2641 2642 // Inject a fake system job. 2643 job := mock.SystemJob() 2644 if err := state.UpsertJob(structs.MsgTypeTestSetup, 3, job); err != nil { 2645 t.Fatalf("err: %v", err) 2646 } 2647 2648 // Create some evaluations 2649 ids, index, err := s1.staticEndpoints.Node.createNodeEvals(alloc.NodeID, 1) 2650 if err != nil { 2651 t.Fatalf("err: %v", err) 2652 } 2653 if index == 0 { 2654 t.Fatalf("bad: %d", index) 2655 } 2656 if len(ids) != 2 { 2657 t.Fatalf("bad: %s", ids) 2658 } 2659 2660 // Lookup the evaluations 2661 ws := memdb.NewWatchSet() 2662 evalByType := make(map[string]*structs.Evaluation, 2) 2663 for _, id := range ids { 2664 eval, err := state.EvalByID(ws, id) 2665 if err != nil { 2666 t.Fatalf("err: %v", err) 2667 } 2668 if eval == nil { 2669 t.Fatalf("expected eval") 2670 } 2671 2672 if old, ok := evalByType[eval.Type]; ok { 2673 t.Fatalf("multiple evals of the same type: %v and %v", old, eval) 2674 } 2675 2676 evalByType[eval.Type] = eval 2677 } 2678 2679 if len(evalByType) != 2 { 2680 t.Fatalf("Expected a service and system job; got %#v", evalByType) 2681 } 2682 2683 // Ensure the evals are correct. 2684 for schedType, eval := range evalByType { 2685 expPriority := alloc.Job.Priority 2686 expJobID := alloc.JobID 2687 if schedType == "system" { 2688 expPriority = job.Priority 2689 expJobID = job.ID 2690 } 2691 2692 t.Logf("checking eval: %v", pretty.Sprint(eval)) 2693 require.Equal(t, index, eval.CreateIndex) 2694 require.Equal(t, structs.EvalTriggerNodeUpdate, eval.TriggeredBy) 2695 require.Equal(t, alloc.NodeID, eval.NodeID) 2696 require.Equal(t, uint64(1), eval.NodeModifyIndex) 2697 switch eval.Status { 2698 case structs.EvalStatusPending, structs.EvalStatusComplete: 2699 // success 2700 default: 2701 t.Fatalf("expected pending or complete, found %v", eval.Status) 2702 } 2703 require.Equal(t, expPriority, eval.Priority) 2704 require.Equal(t, expJobID, eval.JobID) 2705 require.NotZero(t, eval.CreateTime) 2706 require.NotZero(t, eval.ModifyTime) 2707 } 2708} 2709 2710func TestClientEndpoint_Evaluate(t *testing.T) { 2711 t.Parallel() 2712 2713 s1, cleanupS1 := TestServer(t, func(c *Config) { 2714 c.NumSchedulers = 0 // Prevent automatic dequeue 2715 }) 2716 defer cleanupS1() 2717 codec := rpcClient(t, s1) 2718 testutil.WaitForLeader(t, s1.RPC) 2719 2720 // Inject fake evaluations 2721 alloc := mock.Alloc() 2722 node := mock.Node() 2723 node.ID = alloc.NodeID 2724 state := s1.fsm.State() 2725 err := state.UpsertNode(structs.MsgTypeTestSetup, 1, node) 2726 if err != nil { 2727 t.Fatalf("err: %v", err) 2728 } 2729 state.UpsertJobSummary(2, mock.JobSummary(alloc.JobID)) 2730 err = state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}) 2731 if err != nil { 2732 t.Fatalf("err: %v", err) 2733 } 2734 2735 // Re-evaluate 2736 req := &structs.NodeEvaluateRequest{ 2737 NodeID: alloc.NodeID, 2738 WriteRequest: structs.WriteRequest{Region: "global"}, 2739 } 2740 2741 // Fetch the response 2742 var resp structs.NodeUpdateResponse 2743 if err := msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp); err != nil { 2744 t.Fatalf("err: %v", err) 2745 } 2746 if resp.Index == 0 { 2747 t.Fatalf("bad index: %d", resp.Index) 2748 } 2749 2750 // Create some evaluations 2751 ids := resp.EvalIDs 2752 if len(ids) != 1 { 2753 t.Fatalf("bad: %s", ids) 2754 } 2755 2756 // Lookup the evaluation 2757 ws := memdb.NewWatchSet() 2758 eval, err := state.EvalByID(ws, ids[0]) 2759 if err != nil { 2760 t.Fatalf("err: %v", err) 2761 } 2762 if eval == nil { 2763 t.Fatalf("expected eval") 2764 } 2765 if eval.CreateIndex != resp.Index { 2766 t.Fatalf("index mis-match") 2767 } 2768 2769 if eval.Priority != alloc.Job.Priority { 2770 t.Fatalf("bad: %#v", eval) 2771 } 2772 if eval.Type != alloc.Job.Type { 2773 t.Fatalf("bad: %#v", eval) 2774 } 2775 if eval.TriggeredBy != structs.EvalTriggerNodeUpdate { 2776 t.Fatalf("bad: %#v", eval) 2777 } 2778 if eval.JobID != alloc.JobID { 2779 t.Fatalf("bad: %#v", eval) 2780 } 2781 if eval.NodeID != alloc.NodeID { 2782 t.Fatalf("bad: %#v", eval) 2783 } 2784 if eval.NodeModifyIndex != 1 { 2785 t.Fatalf("bad: %#v", eval) 2786 } 2787 if eval.Status != structs.EvalStatusPending { 2788 t.Fatalf("bad: %#v", eval) 2789 } 2790 if eval.CreateTime == 0 { 2791 t.Fatalf("CreateTime is unset: %#v", eval) 2792 } 2793 if eval.ModifyTime == 0 { 2794 t.Fatalf("ModifyTime is unset: %#v", eval) 2795 } 2796} 2797 2798func TestClientEndpoint_Evaluate_ACL(t *testing.T) { 2799 t.Parallel() 2800 2801 s1, root, cleanupS1 := TestACLServer(t, nil) 2802 defer cleanupS1() 2803 codec := rpcClient(t, s1) 2804 testutil.WaitForLeader(t, s1.RPC) 2805 assert := assert.New(t) 2806 2807 // Create the node with an alloc 2808 alloc := mock.Alloc() 2809 node := mock.Node() 2810 node.ID = alloc.NodeID 2811 state := s1.fsm.State() 2812 2813 assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode") 2814 assert.Nil(state.UpsertJobSummary(2, mock.JobSummary(alloc.JobID)), "UpsertJobSummary") 2815 assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}), "UpsertAllocs") 2816 2817 // Create the policy and tokens 2818 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyWrite)) 2819 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyRead)) 2820 2821 // Re-evaluate without a token and expect failure 2822 req := &structs.NodeEvaluateRequest{ 2823 NodeID: alloc.NodeID, 2824 WriteRequest: structs.WriteRequest{Region: "global"}, 2825 } 2826 { 2827 var resp structs.NodeUpdateResponse 2828 err := msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp) 2829 assert.NotNil(err, "RPC") 2830 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 2831 } 2832 2833 // Try with a valid token 2834 req.AuthToken = validToken.SecretID 2835 { 2836 var resp structs.NodeUpdateResponse 2837 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp), "RPC") 2838 } 2839 2840 // Try with a invalid token 2841 req.AuthToken = invalidToken.SecretID 2842 { 2843 var resp structs.NodeUpdateResponse 2844 err := msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp) 2845 assert.NotNil(err, "RPC") 2846 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 2847 } 2848 2849 // Try with a root token 2850 req.AuthToken = root.SecretID 2851 { 2852 var resp structs.NodeUpdateResponse 2853 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.Evaluate", req, &resp), "RPC") 2854 } 2855} 2856 2857func TestClientEndpoint_ListNodes(t *testing.T) { 2858 t.Parallel() 2859 2860 s1, cleanupS1 := TestServer(t, nil) 2861 defer cleanupS1() 2862 codec := rpcClient(t, s1) 2863 testutil.WaitForLeader(t, s1.RPC) 2864 2865 // Create the register request 2866 node := mock.Node() 2867 node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{ 2868 "foo": { 2869 Name: "foo", 2870 Path: "/", 2871 ReadOnly: true, 2872 }, 2873 } 2874 reg := &structs.NodeRegisterRequest{ 2875 Node: node, 2876 WriteRequest: structs.WriteRequest{Region: "global"}, 2877 } 2878 2879 // Fetch the response 2880 var resp structs.GenericResponse 2881 if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { 2882 t.Fatalf("err: %v", err) 2883 } 2884 node.CreateIndex = resp.Index 2885 node.ModifyIndex = resp.Index 2886 2887 // Lookup the node 2888 get := &structs.NodeListRequest{ 2889 QueryOptions: structs.QueryOptions{Region: "global"}, 2890 } 2891 var resp2 structs.NodeListResponse 2892 if err := msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp2); err != nil { 2893 t.Fatalf("err: %v", err) 2894 } 2895 if resp2.Index != resp.Index { 2896 t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) 2897 } 2898 2899 require.Len(t, resp2.Nodes, 1) 2900 require.Equal(t, node.ID, resp2.Nodes[0].ID) 2901 2902 // #7344 - Assert HostVolumes are included in stub 2903 require.Equal(t, node.HostVolumes, resp2.Nodes[0].HostVolumes) 2904 2905 // #9055 - Assert Resources are *not* included by default 2906 require.Nil(t, resp2.Nodes[0].NodeResources) 2907 require.Nil(t, resp2.Nodes[0].ReservedResources) 2908 2909 // Lookup the node with prefix 2910 get = &structs.NodeListRequest{ 2911 QueryOptions: structs.QueryOptions{Region: "global", Prefix: node.ID[:4]}, 2912 } 2913 var resp3 structs.NodeListResponse 2914 if err := msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp3); err != nil { 2915 t.Fatalf("err: %v", err) 2916 } 2917 if resp3.Index != resp.Index { 2918 t.Fatalf("Bad index: %d %d", resp3.Index, resp2.Index) 2919 } 2920 2921 if len(resp3.Nodes) != 1 { 2922 t.Fatalf("bad: %#v", resp3.Nodes) 2923 } 2924 if resp3.Nodes[0].ID != node.ID { 2925 t.Fatalf("bad: %#v", resp3.Nodes[0]) 2926 } 2927} 2928 2929func TestClientEndpoint_ListNodes_Fields(t *testing.T) { 2930 t.Parallel() 2931 2932 s1, cleanupS1 := TestServer(t, nil) 2933 defer cleanupS1() 2934 codec := rpcClient(t, s1) 2935 testutil.WaitForLeader(t, s1.RPC) 2936 2937 // Create the register request 2938 node := mock.Node() 2939 reg := &structs.NodeRegisterRequest{ 2940 Node: node, 2941 WriteRequest: structs.WriteRequest{Region: "global"}, 2942 } 2943 2944 // Fetch the response 2945 var resp structs.GenericResponse 2946 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)) 2947 node.CreateIndex = resp.Index 2948 node.ModifyIndex = resp.Index 2949 2950 // Lookup the node with fields 2951 get := &structs.NodeListRequest{ 2952 QueryOptions: structs.QueryOptions{Region: "global"}, 2953 Fields: &structs.NodeStubFields{ 2954 Resources: true, 2955 }, 2956 } 2957 var resp2 structs.NodeListResponse 2958 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp2)) 2959 require.Equal(t, resp.Index, resp2.Index) 2960 require.Len(t, resp2.Nodes, 1) 2961 require.Equal(t, node.ID, resp2.Nodes[0].ID) 2962 require.NotNil(t, resp2.Nodes[0].NodeResources) 2963 require.NotNil(t, resp2.Nodes[0].ReservedResources) 2964} 2965 2966func TestClientEndpoint_ListNodes_ACL(t *testing.T) { 2967 t.Parallel() 2968 2969 s1, root, cleanupS1 := TestACLServer(t, nil) 2970 defer cleanupS1() 2971 codec := rpcClient(t, s1) 2972 testutil.WaitForLeader(t, s1.RPC) 2973 assert := assert.New(t) 2974 2975 // Create the node 2976 node := mock.Node() 2977 state := s1.fsm.State() 2978 assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1, node), "UpsertNode") 2979 2980 // Create the namespace policy and tokens 2981 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", mock.NodePolicy(acl.PolicyRead)) 2982 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", mock.NodePolicy(acl.PolicyDeny)) 2983 2984 // Lookup the node without a token and expect failure 2985 req := &structs.NodeListRequest{ 2986 QueryOptions: structs.QueryOptions{Region: "global"}, 2987 } 2988 { 2989 var resp structs.NodeListResponse 2990 err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp) 2991 assert.NotNil(err, "RPC") 2992 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 2993 } 2994 2995 // Try with a valid token 2996 req.AuthToken = validToken.SecretID 2997 { 2998 var resp structs.NodeListResponse 2999 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp), "RPC") 3000 assert.Equal(node.ID, resp.Nodes[0].ID) 3001 } 3002 3003 // Try with a invalid token 3004 req.AuthToken = invalidToken.SecretID 3005 { 3006 var resp structs.NodeListResponse 3007 err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp) 3008 assert.NotNil(err, "RPC") 3009 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 3010 } 3011 3012 // Try with a root token 3013 req.AuthToken = root.SecretID 3014 { 3015 var resp structs.NodeListResponse 3016 assert.Nil(msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp), "RPC") 3017 assert.Equal(node.ID, resp.Nodes[0].ID) 3018 } 3019} 3020 3021func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { 3022 t.Parallel() 3023 3024 s1, cleanupS1 := TestServer(t, nil) 3025 defer cleanupS1() 3026 state := s1.fsm.State() 3027 codec := rpcClient(t, s1) 3028 testutil.WaitForLeader(t, s1.RPC) 3029 3030 // Disable drainer to prevent drain from completing during test 3031 s1.nodeDrainer.SetEnabled(false, nil) 3032 3033 // Create the node 3034 node := mock.Node() 3035 3036 // Node upsert triggers watches 3037 errCh := make(chan error, 1) 3038 timer := time.AfterFunc(100*time.Millisecond, func() { 3039 errCh <- state.UpsertNode(structs.MsgTypeTestSetup, 2, node) 3040 }) 3041 defer timer.Stop() 3042 3043 req := &structs.NodeListRequest{ 3044 QueryOptions: structs.QueryOptions{ 3045 Region: "global", 3046 MinQueryIndex: 1, 3047 }, 3048 } 3049 start := time.Now() 3050 var resp structs.NodeListResponse 3051 if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp); err != nil { 3052 t.Fatalf("err: %v", err) 3053 } 3054 3055 if err := <-errCh; err != nil { 3056 t.Fatalf("error from timer: %v", err) 3057 } 3058 3059 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 3060 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 3061 } 3062 if resp.Index != 2 { 3063 t.Fatalf("Bad index: %d %d", resp.Index, 2) 3064 } 3065 if len(resp.Nodes) != 1 || resp.Nodes[0].ID != node.ID { 3066 t.Fatalf("bad: %#v", resp.Nodes) 3067 } 3068 3069 // Node drain updates trigger watches. 3070 time.AfterFunc(100*time.Millisecond, func() { 3071 s := &structs.DrainStrategy{ 3072 DrainSpec: structs.DrainSpec{ 3073 Deadline: 10 * time.Second, 3074 }, 3075 } 3076 errCh <- state.UpdateNodeDrain(structs.MsgTypeTestSetup, 3, node.ID, s, false, 0, nil, nil, "") 3077 }) 3078 3079 req.MinQueryIndex = 2 3080 var resp2 structs.NodeListResponse 3081 start = time.Now() 3082 if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp2); err != nil { 3083 t.Fatalf("err: %v", err) 3084 } 3085 3086 if err := <-errCh; err != nil { 3087 t.Fatalf("error from timer: %v", err) 3088 } 3089 3090 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 3091 t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) 3092 } 3093 if resp2.Index != 3 { 3094 t.Fatalf("Bad index: %d %d", resp2.Index, 3) 3095 } 3096 if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain { 3097 t.Fatalf("bad: %#v", resp2.Nodes) 3098 } 3099 3100 // Node status update triggers watches 3101 time.AfterFunc(100*time.Millisecond, func() { 3102 errCh <- state.UpdateNodeStatus(structs.MsgTypeTestSetup, 40, node.ID, structs.NodeStatusDown, 0, nil) 3103 }) 3104 3105 req.MinQueryIndex = 38 3106 var resp3 structs.NodeListResponse 3107 start = time.Now() 3108 if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp3); err != nil { 3109 t.Fatalf("err: %v", err) 3110 } 3111 3112 if err := <-errCh; err != nil { 3113 t.Fatalf("error from timer: %v", err) 3114 } 3115 3116 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 3117 t.Fatalf("should block (returned in %s) %#v", elapsed, resp3) 3118 } 3119 if resp3.Index != 40 { 3120 t.Fatalf("Bad index: %d %d", resp3.Index, 40) 3121 } 3122 if len(resp3.Nodes) != 1 || resp3.Nodes[0].Status != structs.NodeStatusDown { 3123 t.Fatalf("bad: %#v", resp3.Nodes) 3124 } 3125 3126 // Node delete triggers watches. 3127 time.AfterFunc(100*time.Millisecond, func() { 3128 errCh <- state.DeleteNode(structs.MsgTypeTestSetup, 50, []string{node.ID}) 3129 }) 3130 3131 req.MinQueryIndex = 45 3132 var resp4 structs.NodeListResponse 3133 start = time.Now() 3134 if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp4); err != nil { 3135 t.Fatalf("err: %v", err) 3136 } 3137 3138 if err := <-errCh; err != nil { 3139 t.Fatalf("error from timer: %v", err) 3140 } 3141 3142 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 3143 t.Fatalf("should block (returned in %s) %#v", elapsed, resp4) 3144 } 3145 if resp4.Index != 50 { 3146 t.Fatalf("Bad index: %d %d", resp4.Index, 50) 3147 } 3148 if len(resp4.Nodes) != 0 { 3149 t.Fatalf("bad: %#v", resp4.Nodes) 3150 } 3151} 3152 3153func TestClientEndpoint_DeriveVaultToken_Bad(t *testing.T) { 3154 t.Parallel() 3155 3156 s1, cleanupS1 := TestServer(t, nil) 3157 defer cleanupS1() 3158 state := s1.fsm.State() 3159 codec := rpcClient(t, s1) 3160 testutil.WaitForLeader(t, s1.RPC) 3161 3162 // Create the node 3163 node := mock.Node() 3164 if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node); err != nil { 3165 t.Fatalf("err: %v", err) 3166 } 3167 3168 // Create an alloc 3169 alloc := mock.Alloc() 3170 task := alloc.Job.TaskGroups[0].Tasks[0] 3171 tasks := []string{task.Name} 3172 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}); err != nil { 3173 t.Fatalf("err: %v", err) 3174 } 3175 3176 req := &structs.DeriveVaultTokenRequest{ 3177 NodeID: node.ID, 3178 SecretID: uuid.Generate(), 3179 AllocID: alloc.ID, 3180 Tasks: tasks, 3181 QueryOptions: structs.QueryOptions{ 3182 Region: "global", 3183 }, 3184 } 3185 3186 var resp structs.DeriveVaultTokenResponse 3187 if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil { 3188 t.Fatalf("bad: %v", err) 3189 } 3190 3191 if resp.Error == nil || !strings.Contains(resp.Error.Error(), "SecretID mismatch") { 3192 t.Fatalf("Expected SecretID mismatch: %v", resp.Error) 3193 } 3194 3195 // Put the correct SecretID 3196 req.SecretID = node.SecretID 3197 3198 // Now we should get an error about the allocation not running on the node 3199 if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil { 3200 t.Fatalf("bad: %v", err) 3201 } 3202 if resp.Error == nil || !strings.Contains(resp.Error.Error(), "not running on Node") { 3203 t.Fatalf("Expected not running on node error: %v", resp.Error) 3204 } 3205 3206 // Update to be running on the node 3207 alloc.NodeID = node.ID 3208 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 4, []*structs.Allocation{alloc}); err != nil { 3209 t.Fatalf("err: %v", err) 3210 } 3211 3212 // Now we should get an error about the job not needing any Vault secrets 3213 if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil { 3214 t.Fatalf("bad: %v", err) 3215 } 3216 if resp.Error == nil || !strings.Contains(resp.Error.Error(), "does not require") { 3217 t.Fatalf("Expected no policies error: %v", resp.Error) 3218 } 3219 3220 // Update to be terminal 3221 alloc.DesiredStatus = structs.AllocDesiredStatusStop 3222 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 5, []*structs.Allocation{alloc}); err != nil { 3223 t.Fatalf("err: %v", err) 3224 } 3225 3226 // Now we should get an error about the job not needing any Vault secrets 3227 if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil { 3228 t.Fatalf("bad: %v", err) 3229 } 3230 if resp.Error == nil || !strings.Contains(resp.Error.Error(), "terminal") { 3231 t.Fatalf("Expected terminal allocation error: %v", resp.Error) 3232 } 3233} 3234 3235func TestClientEndpoint_DeriveVaultToken(t *testing.T) { 3236 t.Parallel() 3237 3238 s1, cleanupS1 := TestServer(t, nil) 3239 defer cleanupS1() 3240 state := s1.fsm.State() 3241 codec := rpcClient(t, s1) 3242 testutil.WaitForLeader(t, s1.RPC) 3243 3244 // Enable vault and allow authenticated 3245 tr := true 3246 s1.config.VaultConfig.Enabled = &tr 3247 s1.config.VaultConfig.AllowUnauthenticated = &tr 3248 3249 // Replace the Vault Client on the server 3250 tvc := &TestVaultClient{} 3251 s1.vault = tvc 3252 3253 // Create the node 3254 node := mock.Node() 3255 if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node); err != nil { 3256 t.Fatalf("err: %v", err) 3257 } 3258 3259 // Create an alloc an allocation that has vault policies required 3260 alloc := mock.Alloc() 3261 alloc.NodeID = node.ID 3262 task := alloc.Job.TaskGroups[0].Tasks[0] 3263 tasks := []string{task.Name} 3264 task.Vault = &structs.Vault{Policies: []string{"a", "b"}} 3265 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}); err != nil { 3266 t.Fatalf("err: %v", err) 3267 } 3268 3269 // Return a secret for the task 3270 token := uuid.Generate() 3271 accessor := uuid.Generate() 3272 ttl := 10 3273 secret := &vapi.Secret{ 3274 WrapInfo: &vapi.SecretWrapInfo{ 3275 Token: token, 3276 WrappedAccessor: accessor, 3277 TTL: ttl, 3278 }, 3279 } 3280 tvc.SetCreateTokenSecret(alloc.ID, task.Name, secret) 3281 3282 req := &structs.DeriveVaultTokenRequest{ 3283 NodeID: node.ID, 3284 SecretID: node.SecretID, 3285 AllocID: alloc.ID, 3286 Tasks: tasks, 3287 QueryOptions: structs.QueryOptions{ 3288 Region: "global", 3289 }, 3290 } 3291 3292 var resp structs.DeriveVaultTokenResponse 3293 if err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp); err != nil { 3294 t.Fatalf("bad: %v", err) 3295 } 3296 if resp.Error != nil { 3297 t.Fatalf("bad: %v", resp.Error) 3298 } 3299 3300 // Check the state store and ensure that we created a VaultAccessor 3301 ws := memdb.NewWatchSet() 3302 va, err := state.VaultAccessor(ws, accessor) 3303 if err != nil { 3304 t.Fatalf("bad: %v", err) 3305 } 3306 if va == nil { 3307 t.Fatalf("bad: %v", va) 3308 } 3309 3310 if va.CreateIndex == 0 { 3311 t.Fatalf("bad: %v", va) 3312 } 3313 3314 va.CreateIndex = 0 3315 expected := &structs.VaultAccessor{ 3316 AllocID: alloc.ID, 3317 Task: task.Name, 3318 NodeID: alloc.NodeID, 3319 Accessor: accessor, 3320 CreationTTL: ttl, 3321 } 3322 3323 if !reflect.DeepEqual(expected, va) { 3324 t.Fatalf("Got %#v; want %#v", va, expected) 3325 } 3326} 3327 3328func TestClientEndpoint_DeriveVaultToken_VaultError(t *testing.T) { 3329 t.Parallel() 3330 3331 s1, cleanupS1 := TestServer(t, nil) 3332 defer cleanupS1() 3333 state := s1.fsm.State() 3334 codec := rpcClient(t, s1) 3335 testutil.WaitForLeader(t, s1.RPC) 3336 3337 // Enable vault and allow authenticated 3338 tr := true 3339 s1.config.VaultConfig.Enabled = &tr 3340 s1.config.VaultConfig.AllowUnauthenticated = &tr 3341 3342 // Replace the Vault Client on the server 3343 tvc := &TestVaultClient{} 3344 s1.vault = tvc 3345 3346 // Create the node 3347 node := mock.Node() 3348 if err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node); err != nil { 3349 t.Fatalf("err: %v", err) 3350 } 3351 3352 // Create an alloc an allocation that has vault policies required 3353 alloc := mock.Alloc() 3354 alloc.NodeID = node.ID 3355 task := alloc.Job.TaskGroups[0].Tasks[0] 3356 tasks := []string{task.Name} 3357 task.Vault = &structs.Vault{Policies: []string{"a", "b"}} 3358 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}); err != nil { 3359 t.Fatalf("err: %v", err) 3360 } 3361 3362 // Return an error when creating the token 3363 tvc.SetCreateTokenError(alloc.ID, task.Name, 3364 structs.NewRecoverableError(fmt.Errorf("recover"), true)) 3365 3366 req := &structs.DeriveVaultTokenRequest{ 3367 NodeID: node.ID, 3368 SecretID: node.SecretID, 3369 AllocID: alloc.ID, 3370 Tasks: tasks, 3371 QueryOptions: structs.QueryOptions{ 3372 Region: "global", 3373 }, 3374 } 3375 3376 var resp structs.DeriveVaultTokenResponse 3377 err := msgpackrpc.CallWithCodec(codec, "Node.DeriveVaultToken", req, &resp) 3378 if err != nil { 3379 t.Fatalf("bad: %v", err) 3380 } 3381 if resp.Error == nil || !resp.Error.IsRecoverable() { 3382 t.Fatalf("bad: %+v", resp.Error) 3383 } 3384} 3385 3386func TestClientEndpoint_taskUsesConnect(t *testing.T) { 3387 t.Parallel() 3388 3389 try := func(t *testing.T, task *structs.Task, exp bool) { 3390 result := taskUsesConnect(task) 3391 require.Equal(t, exp, result) 3392 } 3393 3394 t.Run("task uses connect", func(t *testing.T) { 3395 try(t, &structs.Task{ 3396 // see nomad.newConnectSidecarTask for how this works 3397 Name: "connect-proxy-myservice", 3398 Kind: "connect-proxy:myservice", 3399 }, true) 3400 }) 3401 3402 t.Run("task does not use connect", func(t *testing.T) { 3403 try(t, &structs.Task{ 3404 Name: "mytask", 3405 Kind: "incorrect:mytask", 3406 }, false) 3407 }) 3408 3409 t.Run("task does not exist", func(t *testing.T) { 3410 try(t, nil, false) 3411 }) 3412} 3413 3414func TestClientEndpoint_tasksNotUsingConnect(t *testing.T) { 3415 t.Parallel() 3416 3417 taskGroup := &structs.TaskGroup{ 3418 Name: "testgroup", 3419 Tasks: []*structs.Task{{ 3420 Name: "connect-proxy-service1", 3421 Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"), 3422 }, { 3423 Name: "incorrect-task3", 3424 Kind: "incorrect:task3", 3425 }, { 3426 Name: "connect-proxy-service4", 3427 Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service4"), 3428 }, { 3429 Name: "incorrect-task5", 3430 Kind: "incorrect:task5", 3431 }, { 3432 Name: "task6", 3433 Kind: structs.NewTaskKind(structs.ConnectNativePrefix, "service6"), 3434 }}, 3435 } 3436 3437 requestingTasks := []string{ 3438 "connect-proxy-service1", // yes 3439 "task2", // does not exist 3440 "task3", // no 3441 "connect-proxy-service4", // yes 3442 "task5", // no 3443 "task6", // yes, native 3444 } 3445 3446 notConnect, usingConnect := connectTasks(taskGroup, requestingTasks) 3447 3448 notConnectExp := []string{"task2", "task3", "task5"} 3449 usingConnectExp := []connectTask{ 3450 {TaskName: "connect-proxy-service1", TaskKind: "connect-proxy:service1"}, 3451 {TaskName: "connect-proxy-service4", TaskKind: "connect-proxy:service4"}, 3452 {TaskName: "task6", TaskKind: "connect-native:service6"}, 3453 } 3454 3455 require.Equal(t, notConnectExp, notConnect) 3456 require.Equal(t, usingConnectExp, usingConnect) 3457} 3458 3459func mutateConnectJob(t *testing.T, job *structs.Job) { 3460 var jch jobConnectHook 3461 _, warnings, err := jch.Mutate(job) 3462 require.Empty(t, warnings) 3463 require.NoError(t, err) 3464} 3465 3466func TestClientEndpoint_DeriveSIToken(t *testing.T) { 3467 t.Parallel() 3468 r := require.New(t) 3469 3470 s1, cleanupS1 := TestServer(t, nil) // already sets consul mocks 3471 defer cleanupS1() 3472 3473 state := s1.fsm.State() 3474 codec := rpcClient(t, s1) 3475 testutil.WaitForLeader(t, s1.RPC) 3476 3477 // Set allow unauthenticated (no operator token required) 3478 s1.config.ConsulConfig.AllowUnauthenticated = helper.BoolToPtr(true) 3479 3480 // Create the node 3481 node := mock.Node() 3482 err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node) 3483 r.NoError(err) 3484 3485 // Create an alloc with a typical connect service (sidecar) defined 3486 alloc := mock.ConnectAlloc() 3487 alloc.NodeID = node.ID 3488 mutateConnectJob(t, alloc.Job) // appends sidecar task 3489 sidecarTask := alloc.Job.TaskGroups[0].Tasks[1] 3490 3491 err = state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}) 3492 r.NoError(err) 3493 3494 request := &structs.DeriveSITokenRequest{ 3495 NodeID: node.ID, 3496 SecretID: node.SecretID, 3497 AllocID: alloc.ID, 3498 Tasks: []string{sidecarTask.Name}, 3499 QueryOptions: structs.QueryOptions{Region: "global"}, 3500 } 3501 3502 var response structs.DeriveSITokenResponse 3503 err = msgpackrpc.CallWithCodec(codec, "Node.DeriveSIToken", request, &response) 3504 r.NoError(err) 3505 r.Nil(response.Error) 3506 3507 // Check the state store and ensure we created a Consul SI Token Accessor 3508 ws := memdb.NewWatchSet() 3509 accessors, err := state.SITokenAccessorsByNode(ws, node.ID) 3510 r.NoError(err) 3511 r.Equal(1, len(accessors)) // only asked for one 3512 r.Equal("connect-proxy-testconnect", accessors[0].TaskName) // set by the mock 3513 r.Equal(node.ID, accessors[0].NodeID) // should match 3514 r.Equal(alloc.ID, accessors[0].AllocID) // should match 3515 r.True(helper.IsUUID(accessors[0].AccessorID)) // should be set 3516 r.Greater(accessors[0].CreateIndex, uint64(3)) // more than 3rd 3517} 3518 3519func TestClientEndpoint_DeriveSIToken_ConsulError(t *testing.T) { 3520 t.Parallel() 3521 r := require.New(t) 3522 3523 s1, cleanupS1 := TestServer(t, nil) 3524 defer cleanupS1() 3525 state := s1.fsm.State() 3526 codec := rpcClient(t, s1) 3527 testutil.WaitForLeader(t, s1.RPC) 3528 3529 // Set allow unauthenticated (no operator token required) 3530 s1.config.ConsulConfig.AllowUnauthenticated = helper.BoolToPtr(true) 3531 3532 // Create the node 3533 node := mock.Node() 3534 err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node) 3535 r.NoError(err) 3536 3537 // Create an alloc with a typical connect service (sidecar) defined 3538 alloc := mock.ConnectAlloc() 3539 alloc.NodeID = node.ID 3540 mutateConnectJob(t, alloc.Job) // appends sidecar task 3541 sidecarTask := alloc.Job.TaskGroups[0].Tasks[1] 3542 3543 // rejigger the server to use a broken mock consul 3544 mockACLsAPI := consul.NewMockACLsAPI(s1.logger) 3545 mockACLsAPI.SetError(structs.NewRecoverableError(errors.New("consul recoverable error"), true)) 3546 m := NewConsulACLsAPI(mockACLsAPI, s1.logger, nil) 3547 s1.consulACLs = m 3548 3549 err = state.UpsertAllocs(structs.MsgTypeTestSetup, 3, []*structs.Allocation{alloc}) 3550 r.NoError(err) 3551 3552 request := &structs.DeriveSITokenRequest{ 3553 NodeID: node.ID, 3554 SecretID: node.SecretID, 3555 AllocID: alloc.ID, 3556 Tasks: []string{sidecarTask.Name}, 3557 QueryOptions: structs.QueryOptions{Region: "global"}, 3558 } 3559 3560 var response structs.DeriveSITokenResponse 3561 err = msgpackrpc.CallWithCodec(codec, "Node.DeriveSIToken", request, &response) 3562 r.NoError(err) 3563 r.NotNil(response.Error) // error should be set 3564 r.True(response.Error.IsRecoverable()) // and is recoverable 3565} 3566 3567func TestClientEndpoint_EmitEvents(t *testing.T) { 3568 t.Parallel() 3569 require := require.New(t) 3570 3571 s1, cleanupS1 := TestServer(t, nil) 3572 defer cleanupS1() 3573 state := s1.fsm.State() 3574 codec := rpcClient(t, s1) 3575 testutil.WaitForLeader(t, s1.RPC) 3576 3577 // create a node that we can register our event to 3578 node := mock.Node() 3579 err := state.UpsertNode(structs.MsgTypeTestSetup, 2, node) 3580 require.Nil(err) 3581 3582 nodeEvent := &structs.NodeEvent{ 3583 Message: "Registration failed", 3584 Subsystem: "Server", 3585 Timestamp: time.Now(), 3586 } 3587 3588 nodeEvents := map[string][]*structs.NodeEvent{node.ID: {nodeEvent}} 3589 req := structs.EmitNodeEventsRequest{ 3590 NodeEvents: nodeEvents, 3591 WriteRequest: structs.WriteRequest{Region: "global"}, 3592 } 3593 3594 var resp structs.GenericResponse 3595 err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp) 3596 require.Nil(err) 3597 require.NotEqual(uint64(0), resp.Index) 3598 3599 // Check for the node in the FSM 3600 ws := memdb.NewWatchSet() 3601 out, err := state.NodeByID(ws, node.ID) 3602 require.Nil(err) 3603 require.False(len(out.Events) < 2) 3604} 3605