1package client 2 3import ( 4 "fmt" 5 "io/ioutil" 6 "net" 7 "os" 8 "path/filepath" 9 "runtime" 10 "sort" 11 "testing" 12 "time" 13 14 memdb "github.com/hashicorp/go-memdb" 15 trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" 16 "github.com/hashicorp/nomad/client/config" 17 consulApi "github.com/hashicorp/nomad/client/consul" 18 "github.com/hashicorp/nomad/client/fingerprint" 19 "github.com/hashicorp/nomad/client/state" 20 "github.com/hashicorp/nomad/command/agent/consul" 21 "github.com/hashicorp/nomad/helper/pluginutils/catalog" 22 "github.com/hashicorp/nomad/helper/pluginutils/singleton" 23 "github.com/hashicorp/nomad/helper/testlog" 24 "github.com/hashicorp/nomad/helper/uuid" 25 "github.com/hashicorp/nomad/nomad" 26 "github.com/hashicorp/nomad/nomad/mock" 27 "github.com/hashicorp/nomad/nomad/structs" 28 nconfig "github.com/hashicorp/nomad/nomad/structs/config" 29 "github.com/hashicorp/nomad/plugins/device" 30 psstructs "github.com/hashicorp/nomad/plugins/shared/structs" 31 "github.com/hashicorp/nomad/testutil" 32 "github.com/stretchr/testify/assert" 33 34 cstate "github.com/hashicorp/nomad/client/state" 35 "github.com/stretchr/testify/require" 36) 37 38func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken, func()) { 39 server, token, cleanup := nomad.TestACLServer(t, cb) 40 return server, server.GetConfig().RPCAddr.String(), token, cleanup 41} 42 43func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, func()) { 44 server, cleanup := nomad.TestServer(t, cb) 45 return server, server.GetConfig().RPCAddr.String(), cleanup 46} 47 48func TestClient_StartStop(t *testing.T) { 49 t.Parallel() 50 client, cleanup := TestClient(t, nil) 51 defer cleanup() 52 if err := client.Shutdown(); err != nil { 53 t.Fatalf("err: %v", err) 54 } 55} 56 57// Certain labels for metrics are dependant on client initial setup. This tests 58// that the client has properly initialized before we assign values to labels 59func TestClient_BaseLabels(t *testing.T) { 60 t.Parallel() 61 assert := assert.New(t) 62 63 client, cleanup := TestClient(t, nil) 64 if err := client.Shutdown(); err != nil { 65 t.Fatalf("err: %v", err) 66 } 67 defer cleanup() 68 69 // directly invoke this function, as otherwise this will fail on a CI build 70 // due to a race condition 71 client.emitStats() 72 73 baseLabels := client.baseLabels 74 assert.NotEqual(0, len(baseLabels)) 75 76 nodeID := client.Node().ID 77 for _, e := range baseLabels { 78 if e.Name == "node_id" { 79 assert.Equal(nodeID, e.Value) 80 } 81 } 82} 83 84func TestClient_RPC(t *testing.T) { 85 t.Parallel() 86 87 _, addr, cleanupS1 := testServer(t, nil) 88 defer cleanupS1() 89 90 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 91 c.Servers = []string{addr} 92 }) 93 defer cleanupC1() 94 95 // RPC should succeed 96 testutil.WaitForResult(func() (bool, error) { 97 var out struct{} 98 err := c1.RPC("Status.Ping", struct{}{}, &out) 99 return err == nil, err 100 }, func(err error) { 101 t.Fatalf("err: %v", err) 102 }) 103} 104 105func TestClient_RPC_FireRetryWatchers(t *testing.T) { 106 t.Parallel() 107 108 _, addr, cleanupS1 := testServer(t, nil) 109 defer cleanupS1() 110 111 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 112 c.Servers = []string{addr} 113 }) 114 defer cleanupC1() 115 116 watcher := c1.rpcRetryWatcher() 117 118 // RPC should succeed 119 testutil.WaitForResult(func() (bool, error) { 120 var out struct{} 121 err := c1.RPC("Status.Ping", struct{}{}, &out) 122 return err == nil, err 123 }, func(err error) { 124 t.Fatalf("err: %v", err) 125 }) 126 127 select { 128 case <-watcher: 129 default: 130 t.Fatal("watcher should be fired") 131 } 132} 133 134func TestClient_RPC_Passthrough(t *testing.T) { 135 t.Parallel() 136 137 s1, _, cleanupS1 := testServer(t, nil) 138 defer cleanupS1() 139 140 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 141 c.RPCHandler = s1 142 }) 143 defer cleanupC1() 144 145 // RPC should succeed 146 testutil.WaitForResult(func() (bool, error) { 147 var out struct{} 148 err := c1.RPC("Status.Ping", struct{}{}, &out) 149 return err == nil, err 150 }, func(err error) { 151 t.Fatalf("err: %v", err) 152 }) 153} 154 155func TestClient_Fingerprint(t *testing.T) { 156 t.Parallel() 157 158 c, cleanup := TestClient(t, nil) 159 defer cleanup() 160 161 // Ensure we are fingerprinting 162 testutil.WaitForResult(func() (bool, error) { 163 node := c.Node() 164 if _, ok := node.Attributes["kernel.name"]; !ok { 165 return false, fmt.Errorf("Expected value for kernel.name") 166 } 167 if _, ok := node.Attributes["cpu.arch"]; !ok { 168 return false, fmt.Errorf("Expected value for cpu.arch") 169 } 170 return true, nil 171 }, func(err error) { 172 t.Fatalf("err: %v", err) 173 }) 174} 175 176// TestClient_Fingerprint_Periodic asserts that driver node attributes are 177// periodically fingerprinted. 178func TestClient_Fingerprint_Periodic(t *testing.T) { 179 t.Parallel() 180 181 c1, cleanup := TestClient(t, func(c *config.Config) { 182 confs := []*nconfig.PluginConfig{ 183 { 184 Name: "mock_driver", 185 Config: map[string]interface{}{ 186 "shutdown_periodic_after": true, 187 "shutdown_periodic_duration": time.Second, 188 }, 189 }, 190 } 191 c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", nil, confs) 192 }) 193 defer cleanup() 194 195 node := c1.config.Node 196 { 197 // Ensure the mock driver is registered on the client 198 testutil.WaitForResult(func() (bool, error) { 199 c1.configLock.Lock() 200 defer c1.configLock.Unlock() 201 202 // assert that the driver is set on the node attributes 203 mockDriverInfoAttr := node.Attributes["driver.mock_driver"] 204 if mockDriverInfoAttr == "" { 205 return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes") 206 } 207 208 mockDriverInfo := node.Drivers["mock_driver"] 209 210 // assert that the Driver information for the node is also set correctly 211 if mockDriverInfo == nil { 212 return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers") 213 } 214 if !mockDriverInfo.Detected { 215 return false, fmt.Errorf("mock driver should be set as detected") 216 } 217 if !mockDriverInfo.Healthy { 218 return false, fmt.Errorf("mock driver should be set as healthy") 219 } 220 if mockDriverInfo.HealthDescription == "" { 221 return false, fmt.Errorf("mock driver description should not be empty") 222 } 223 return true, nil 224 }, func(err error) { 225 t.Fatalf("err: %v", err) 226 }) 227 } 228 229 { 230 testutil.WaitForResult(func() (bool, error) { 231 c1.configLock.Lock() 232 defer c1.configLock.Unlock() 233 mockDriverInfo := node.Drivers["mock_driver"] 234 // assert that the Driver information for the node is also set correctly 235 if mockDriverInfo == nil { 236 return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers") 237 } 238 if mockDriverInfo.Detected { 239 return false, fmt.Errorf("mock driver should not be set as detected") 240 } 241 if mockDriverInfo.Healthy { 242 return false, fmt.Errorf("mock driver should not be set as healthy") 243 } 244 if mockDriverInfo.HealthDescription == "" { 245 return false, fmt.Errorf("mock driver description should not be empty") 246 } 247 return true, nil 248 }, func(err error) { 249 t.Fatalf("err: %v", err) 250 }) 251 } 252} 253 254// TestClient_MixedTLS asserts that when a server is running with TLS enabled 255// it will reject any RPC connections from clients that lack TLS. See #2525 256func TestClient_MixedTLS(t *testing.T) { 257 t.Parallel() 258 const ( 259 cafile = "../helper/tlsutil/testdata/ca.pem" 260 foocert = "../helper/tlsutil/testdata/nomad-foo.pem" 261 fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" 262 ) 263 s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) { 264 c.TLSConfig = &nconfig.TLSConfig{ 265 EnableHTTP: true, 266 EnableRPC: true, 267 VerifyServerHostname: true, 268 CAFile: cafile, 269 CertFile: foocert, 270 KeyFile: fookey, 271 } 272 }) 273 defer cleanupS1() 274 testutil.WaitForLeader(t, s1.RPC) 275 276 c1, cleanup := TestClient(t, func(c *config.Config) { 277 c.Servers = []string{addr} 278 }) 279 defer cleanup() 280 281 req := structs.NodeSpecificRequest{ 282 NodeID: c1.Node().ID, 283 QueryOptions: structs.QueryOptions{Region: "global"}, 284 } 285 var out structs.SingleNodeResponse 286 testutil.AssertUntil(100*time.Millisecond, 287 func() (bool, error) { 288 err := c1.RPC("Node.GetNode", &req, &out) 289 if err == nil { 290 return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out) 291 } 292 return true, nil 293 }, 294 func(err error) { 295 t.Fatalf(err.Error()) 296 }, 297 ) 298} 299 300// TestClient_BadTLS asserts that when a client and server are running with TLS 301// enabled -- but their certificates are signed by different CAs -- they're 302// unable to communicate. 303func TestClient_BadTLS(t *testing.T) { 304 t.Parallel() 305 306 const ( 307 cafile = "../helper/tlsutil/testdata/ca.pem" 308 foocert = "../helper/tlsutil/testdata/nomad-foo.pem" 309 fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" 310 badca = "../helper/tlsutil/testdata/ca-bad.pem" 311 badcert = "../helper/tlsutil/testdata/nomad-bad.pem" 312 badkey = "../helper/tlsutil/testdata/nomad-bad-key.pem" 313 ) 314 s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) { 315 c.TLSConfig = &nconfig.TLSConfig{ 316 EnableHTTP: true, 317 EnableRPC: true, 318 VerifyServerHostname: true, 319 CAFile: cafile, 320 CertFile: foocert, 321 KeyFile: fookey, 322 } 323 }) 324 defer cleanupS1() 325 testutil.WaitForLeader(t, s1.RPC) 326 327 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 328 c.Servers = []string{addr} 329 c.TLSConfig = &nconfig.TLSConfig{ 330 EnableHTTP: true, 331 EnableRPC: true, 332 VerifyServerHostname: true, 333 CAFile: badca, 334 CertFile: badcert, 335 KeyFile: badkey, 336 } 337 }) 338 defer cleanupC1() 339 340 req := structs.NodeSpecificRequest{ 341 NodeID: c1.Node().ID, 342 QueryOptions: structs.QueryOptions{Region: "global"}, 343 } 344 var out structs.SingleNodeResponse 345 testutil.AssertUntil(100*time.Millisecond, 346 func() (bool, error) { 347 err := c1.RPC("Node.GetNode", &req, &out) 348 if err == nil { 349 return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out) 350 } 351 return true, nil 352 }, 353 func(err error) { 354 t.Fatalf(err.Error()) 355 }, 356 ) 357} 358 359func TestClient_Register(t *testing.T) { 360 t.Parallel() 361 362 s1, _, cleanupS1 := testServer(t, nil) 363 defer cleanupS1() 364 testutil.WaitForLeader(t, s1.RPC) 365 366 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 367 c.RPCHandler = s1 368 }) 369 defer cleanupC1() 370 371 req := structs.NodeSpecificRequest{ 372 NodeID: c1.Node().ID, 373 QueryOptions: structs.QueryOptions{Region: "global"}, 374 } 375 var out structs.SingleNodeResponse 376 377 // Register should succeed 378 testutil.WaitForResult(func() (bool, error) { 379 err := s1.RPC("Node.GetNode", &req, &out) 380 if err != nil { 381 return false, err 382 } 383 if out.Node == nil { 384 return false, fmt.Errorf("missing reg") 385 } 386 return out.Node.ID == req.NodeID, nil 387 }, func(err error) { 388 t.Fatalf("err: %v", err) 389 }) 390} 391 392func TestClient_Heartbeat(t *testing.T) { 393 t.Parallel() 394 395 s1, _, cleanupS1 := testServer(t, func(c *nomad.Config) { 396 c.MinHeartbeatTTL = 50 * time.Millisecond 397 }) 398 defer cleanupS1() 399 testutil.WaitForLeader(t, s1.RPC) 400 401 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 402 c.RPCHandler = s1 403 }) 404 defer cleanupC1() 405 406 req := structs.NodeSpecificRequest{ 407 NodeID: c1.Node().ID, 408 QueryOptions: structs.QueryOptions{Region: "global"}, 409 } 410 var out structs.SingleNodeResponse 411 412 // Register should succeed 413 testutil.WaitForResult(func() (bool, error) { 414 err := s1.RPC("Node.GetNode", &req, &out) 415 if err != nil { 416 return false, err 417 } 418 if out.Node == nil { 419 return false, fmt.Errorf("missing reg") 420 } 421 return out.Node.Status == structs.NodeStatusReady, nil 422 }, func(err error) { 423 t.Fatalf("err: %v", err) 424 }) 425} 426 427// TestClient_UpdateAllocStatus that once running allocations send updates to 428// the server. 429func TestClient_UpdateAllocStatus(t *testing.T) { 430 t.Parallel() 431 432 s1, _, cleanupS1 := testServer(t, nil) 433 defer cleanupS1() 434 435 _, cleanup := TestClient(t, func(c *config.Config) { 436 c.RPCHandler = s1 437 }) 438 defer cleanup() 439 440 job := mock.Job() 441 // allow running job on any node including self client, that may not be a Linux box 442 job.Constraints = nil 443 job.TaskGroups[0].Count = 1 444 task := job.TaskGroups[0].Tasks[0] 445 task.Driver = "mock_driver" 446 task.Config = map[string]interface{}{ 447 "run_for": "10s", 448 } 449 task.Services = nil 450 451 // WaitForRunning polls the server until the ClientStatus is running 452 testutil.WaitForRunning(t, s1.RPC, job) 453} 454 455func TestClient_WatchAllocs(t *testing.T) { 456 t.Parallel() 457 458 s1, _, cleanupS1 := testServer(t, nil) 459 defer cleanupS1() 460 testutil.WaitForLeader(t, s1.RPC) 461 462 c1, cleanup := TestClient(t, func(c *config.Config) { 463 c.RPCHandler = s1 464 }) 465 defer cleanup() 466 467 // Wait until the node is ready 468 waitTilNodeReady(c1, t) 469 470 // Create mock allocations 471 job := mock.Job() 472 job.TaskGroups[0].Count = 3 473 job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 474 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 475 "run_for": "10s", 476 } 477 alloc1 := mock.Alloc() 478 alloc1.JobID = job.ID 479 alloc1.Job = job 480 alloc1.NodeID = c1.Node().ID 481 alloc2 := mock.Alloc() 482 alloc2.NodeID = c1.Node().ID 483 alloc2.JobID = job.ID 484 alloc2.Job = job 485 486 state := s1.State() 487 if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil { 488 t.Fatal(err) 489 } 490 if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil { 491 t.Fatal(err) 492 } 493 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2}) 494 if err != nil { 495 t.Fatalf("err: %v", err) 496 } 497 498 // Both allocations should get registered 499 testutil.WaitForResult(func() (bool, error) { 500 c1.allocLock.RLock() 501 num := len(c1.allocs) 502 c1.allocLock.RUnlock() 503 return num == 2, nil 504 }, func(err error) { 505 t.Fatalf("err: %v", err) 506 }) 507 508 // Delete one allocation 509 if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil { 510 t.Fatalf("err: %v", err) 511 } 512 513 // Update the other allocation. Have to make a copy because the allocs are 514 // shared in memory in the test and the modify index would be updated in the 515 // alloc runner. 516 alloc2_2 := alloc2.Copy() 517 alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop 518 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 104, []*structs.Allocation{alloc2_2}); err != nil { 519 t.Fatalf("err upserting stopped alloc: %v", err) 520 } 521 522 // One allocation should get GC'd and removed 523 testutil.WaitForResult(func() (bool, error) { 524 c1.allocLock.RLock() 525 num := len(c1.allocs) 526 c1.allocLock.RUnlock() 527 return num == 1, nil 528 }, func(err error) { 529 t.Fatalf("err: %v", err) 530 }) 531 532 // One allocations should get updated 533 testutil.WaitForResult(func() (bool, error) { 534 c1.allocLock.RLock() 535 ar := c1.allocs[alloc2.ID] 536 c1.allocLock.RUnlock() 537 return ar.Alloc().DesiredStatus == structs.AllocDesiredStatusStop, nil 538 }, func(err error) { 539 t.Fatalf("err: %v", err) 540 }) 541} 542 543func waitTilNodeReady(client *Client, t *testing.T) { 544 testutil.WaitForResult(func() (bool, error) { 545 n := client.Node() 546 if n.Status != structs.NodeStatusReady { 547 return false, fmt.Errorf("node not registered") 548 } 549 return true, nil 550 }, func(err error) { 551 t.Fatalf("err: %v", err) 552 }) 553} 554 555func TestClient_SaveRestoreState(t *testing.T) { 556 t.Parallel() 557 558 s1, _, cleanupS1 := testServer(t, nil) 559 defer cleanupS1() 560 testutil.WaitForLeader(t, s1.RPC) 561 562 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 563 c.DevMode = false 564 c.RPCHandler = s1 565 }) 566 defer cleanupC1() 567 568 // Wait until the node is ready 569 waitTilNodeReady(c1, t) 570 571 // Create mock allocations 572 job := mock.Job() 573 alloc1 := mock.Alloc() 574 alloc1.NodeID = c1.Node().ID 575 alloc1.Job = job 576 alloc1.JobID = job.ID 577 alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 578 alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 579 "run_for": "10s", 580 } 581 alloc1.ClientStatus = structs.AllocClientStatusRunning 582 583 state := s1.State() 584 if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil { 585 t.Fatal(err) 586 } 587 if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil { 588 t.Fatal(err) 589 } 590 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}); err != nil { 591 t.Fatalf("err: %v", err) 592 } 593 594 // Allocations should get registered 595 testutil.WaitForResult(func() (bool, error) { 596 c1.allocLock.RLock() 597 ar := c1.allocs[alloc1.ID] 598 c1.allocLock.RUnlock() 599 if ar == nil { 600 return false, fmt.Errorf("nil alloc runner") 601 } 602 if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning { 603 return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning) 604 } 605 return true, nil 606 }, func(err error) { 607 t.Fatalf("err: %v", err) 608 }) 609 610 // Shutdown the client, saves state 611 if err := c1.Shutdown(); err != nil { 612 t.Fatalf("err: %v", err) 613 } 614 615 // Create a new client 616 logger := testlog.HCLogger(t) 617 c1.config.Logger = logger 618 consulCatalog := consul.NewMockCatalog(logger) 619 mockService := consulApi.NewMockConsulServiceClient(t, logger) 620 621 // ensure we use non-shutdown driver instances 622 c1.config.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", c1.config.Options, nil) 623 c1.config.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader) 624 625 c2, err := NewClient(c1.config, consulCatalog, nil, mockService, nil) 626 if err != nil { 627 t.Fatalf("err: %v", err) 628 } 629 defer c2.Shutdown() 630 631 // Ensure the allocation is running 632 testutil.WaitForResult(func() (bool, error) { 633 c2.allocLock.RLock() 634 ar := c2.allocs[alloc1.ID] 635 c2.allocLock.RUnlock() 636 status := ar.Alloc().ClientStatus 637 alive := status == structs.AllocClientStatusRunning || status == structs.AllocClientStatusPending 638 if !alive { 639 return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc()) 640 } 641 return true, nil 642 }, func(err error) { 643 t.Fatalf("err: %v", err) 644 }) 645 646 // Destroy all the allocations 647 for _, ar := range c2.getAllocRunners() { 648 ar.Destroy() 649 } 650 651 for _, ar := range c2.getAllocRunners() { 652 <-ar.DestroyCh() 653 } 654} 655 656func TestClient_AddAllocError(t *testing.T) { 657 t.Parallel() 658 require := require.New(t) 659 660 s1, _, cleanupS1 := testServer(t, nil) 661 defer cleanupS1() 662 testutil.WaitForLeader(t, s1.RPC) 663 664 c1, cleanupC1 := TestClient(t, func(c *config.Config) { 665 c.DevMode = false 666 c.RPCHandler = s1 667 }) 668 defer cleanupC1() 669 670 // Wait until the node is ready 671 waitTilNodeReady(c1, t) 672 673 // Create mock allocation with invalid task group name 674 job := mock.Job() 675 alloc1 := mock.Alloc() 676 alloc1.NodeID = c1.Node().ID 677 alloc1.Job = job 678 alloc1.JobID = job.ID 679 alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 680 alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 681 "run_for": "10s", 682 } 683 alloc1.ClientStatus = structs.AllocClientStatusPending 684 685 // Set these two fields to nil to cause alloc runner creation to fail 686 alloc1.AllocatedResources = nil 687 alloc1.TaskResources = nil 688 689 state := s1.State() 690 err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job) 691 require.Nil(err) 692 693 err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)) 694 require.Nil(err) 695 696 err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}) 697 require.Nil(err) 698 699 // Push this alloc update to the client 700 allocUpdates := &allocUpdates{ 701 pulled: map[string]*structs.Allocation{ 702 alloc1.ID: alloc1, 703 }, 704 } 705 c1.runAllocs(allocUpdates) 706 707 // Ensure the allocation has been marked as invalid and failed on the server 708 testutil.WaitForResult(func() (bool, error) { 709 c1.allocLock.RLock() 710 ar := c1.allocs[alloc1.ID] 711 _, isInvalid := c1.invalidAllocs[alloc1.ID] 712 c1.allocLock.RUnlock() 713 if ar != nil { 714 return false, fmt.Errorf("expected nil alloc runner") 715 } 716 if !isInvalid { 717 return false, fmt.Errorf("expected alloc to be marked as invalid") 718 } 719 alloc, err := s1.State().AllocByID(nil, alloc1.ID) 720 require.Nil(err) 721 failed := alloc.ClientStatus == structs.AllocClientStatusFailed 722 if !failed { 723 return false, fmt.Errorf("Expected failed client status, but got %v", alloc.ClientStatus) 724 } 725 return true, nil 726 }, func(err error) { 727 require.NoError(err) 728 }) 729 730} 731 732func TestClient_Init(t *testing.T) { 733 t.Parallel() 734 dir, err := ioutil.TempDir("", "nomad") 735 if err != nil { 736 t.Fatalf("err: %s", err) 737 } 738 defer os.RemoveAll(dir) 739 allocDir := filepath.Join(dir, "alloc") 740 741 client := &Client{ 742 config: &config.Config{ 743 AllocDir: allocDir, 744 StateDBFactory: cstate.GetStateDBFactory(true), 745 }, 746 logger: testlog.HCLogger(t), 747 } 748 749 if err := client.init(); err != nil { 750 t.Fatalf("err: %s", err) 751 } 752 753 if _, err := os.Stat(allocDir); err != nil { 754 t.Fatalf("err: %s", err) 755 } 756} 757 758func TestClient_BlockedAllocations(t *testing.T) { 759 t.Parallel() 760 761 s1, _, cleanupS1 := testServer(t, nil) 762 defer cleanupS1() 763 testutil.WaitForLeader(t, s1.RPC) 764 765 c1, cleanup := TestClient(t, func(c *config.Config) { 766 c.RPCHandler = s1 767 }) 768 defer cleanup() 769 770 // Wait for the node to be ready 771 state := s1.State() 772 testutil.WaitForResult(func() (bool, error) { 773 ws := memdb.NewWatchSet() 774 out, err := state.NodeByID(ws, c1.Node().ID) 775 if err != nil { 776 return false, err 777 } 778 if out == nil || out.Status != structs.NodeStatusReady { 779 return false, fmt.Errorf("bad node: %#v", out) 780 } 781 return true, nil 782 }, func(err error) { 783 t.Fatalf("err: %v", err) 784 }) 785 786 // Add an allocation 787 alloc := mock.Alloc() 788 alloc.NodeID = c1.Node().ID 789 alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 790 alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 791 "kill_after": "1s", 792 "run_for": "100s", 793 "exit_code": 0, 794 "exit_signal": 0, 795 } 796 797 state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) 798 state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc}) 799 800 // Wait until the client downloads and starts the allocation 801 testutil.WaitForResult(func() (bool, error) { 802 ws := memdb.NewWatchSet() 803 out, err := state.AllocByID(ws, alloc.ID) 804 if err != nil { 805 return false, err 806 } 807 if out == nil || out.ClientStatus != structs.AllocClientStatusRunning { 808 return false, fmt.Errorf("bad alloc: %#v", out) 809 } 810 return true, nil 811 }, func(err error) { 812 t.Fatalf("err: %v", err) 813 }) 814 815 // Add a new chained alloc 816 alloc2 := alloc.Copy() 817 alloc2.ID = uuid.Generate() 818 alloc2.Job = alloc.Job 819 alloc2.JobID = alloc.JobID 820 alloc2.PreviousAllocation = alloc.ID 821 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2}); err != nil { 822 t.Fatalf("err: %v", err) 823 } 824 825 // Ensure that the chained allocation is being tracked as blocked 826 testutil.WaitForResult(func() (bool, error) { 827 ar := c1.getAllocRunners()[alloc2.ID] 828 if ar == nil { 829 return false, fmt.Errorf("alloc 2's alloc runner does not exist") 830 } 831 if !ar.IsWaiting() { 832 return false, fmt.Errorf("alloc 2 is not blocked") 833 } 834 return true, nil 835 }, func(err error) { 836 t.Fatalf("err: %v", err) 837 }) 838 839 // Change the desired state of the parent alloc to stop 840 alloc1 := alloc.Copy() 841 alloc1.DesiredStatus = structs.AllocDesiredStatusStop 842 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 300, []*structs.Allocation{alloc1}); err != nil { 843 t.Fatalf("err: %v", err) 844 } 845 846 // Ensure that there are no blocked allocations 847 testutil.WaitForResult(func() (bool, error) { 848 for id, ar := range c1.getAllocRunners() { 849 if ar.IsWaiting() { 850 return false, fmt.Errorf("%q still blocked", id) 851 } 852 if ar.IsMigrating() { 853 return false, fmt.Errorf("%q still migrating", id) 854 } 855 } 856 return true, nil 857 }, func(err error) { 858 t.Fatalf("err: %v", err) 859 }) 860 861 // Destroy all the allocations 862 for _, ar := range c1.getAllocRunners() { 863 ar.Destroy() 864 } 865 866 for _, ar := range c1.getAllocRunners() { 867 <-ar.DestroyCh() 868 } 869} 870 871func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) { 872 t.Parallel() 873 assert := assert.New(t) 874 875 c, cleanup := TestClient(t, func(c *config.Config) { 876 c.ACLEnabled = true 877 }) 878 defer cleanup() 879 880 alloc := mock.Alloc() 881 validToken, err := structs.GenerateMigrateToken(alloc.ID, c.secretNodeID()) 882 assert.Nil(err) 883 884 assert.Equal(c.ValidateMigrateToken(alloc.ID, validToken), true) 885} 886 887func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) { 888 t.Parallel() 889 assert := assert.New(t) 890 891 c, cleanup := TestClient(t, func(c *config.Config) { 892 c.ACLEnabled = true 893 }) 894 defer cleanup() 895 896 assert.Equal(c.ValidateMigrateToken("", ""), false) 897 898 alloc := mock.Alloc() 899 assert.Equal(c.ValidateMigrateToken(alloc.ID, alloc.ID), false) 900 assert.Equal(c.ValidateMigrateToken(alloc.ID, ""), false) 901} 902 903func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) { 904 t.Parallel() 905 assert := assert.New(t) 906 907 c, cleanup := TestClient(t, func(c *config.Config) {}) 908 defer cleanup() 909 910 assert.Equal(c.ValidateMigrateToken("", ""), true) 911} 912 913func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) { 914 t.Parallel() 915 assert := assert.New(t) 916 917 s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) { 918 c.Region = "global" 919 }) 920 defer cleanupS1() 921 testutil.WaitForLeader(t, s1.RPC) 922 923 const ( 924 cafile = "../helper/tlsutil/testdata/ca.pem" 925 foocert = "../helper/tlsutil/testdata/nomad-foo.pem" 926 fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" 927 ) 928 929 c1, cleanup := TestClient(t, func(c *config.Config) { 930 c.Servers = []string{addr} 931 }) 932 defer cleanup() 933 934 // Registering a node over plaintext should succeed 935 { 936 req := structs.NodeSpecificRequest{ 937 NodeID: c1.Node().ID, 938 QueryOptions: structs.QueryOptions{Region: "global"}, 939 } 940 941 testutil.WaitForResult(func() (bool, error) { 942 var out structs.SingleNodeResponse 943 err := c1.RPC("Node.GetNode", &req, &out) 944 if err != nil { 945 return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err) 946 } 947 return true, nil 948 }, 949 func(err error) { 950 t.Fatalf(err.Error()) 951 }, 952 ) 953 } 954 955 newConfig := &nconfig.TLSConfig{ 956 EnableHTTP: true, 957 EnableRPC: true, 958 VerifyServerHostname: true, 959 CAFile: cafile, 960 CertFile: foocert, 961 KeyFile: fookey, 962 } 963 964 err := c1.reloadTLSConnections(newConfig) 965 assert.Nil(err) 966 967 // Registering a node over plaintext should fail after the node has upgraded 968 // to TLS 969 { 970 req := structs.NodeSpecificRequest{ 971 NodeID: c1.Node().ID, 972 QueryOptions: structs.QueryOptions{Region: "global"}, 973 } 974 testutil.WaitForResult(func() (bool, error) { 975 var out structs.SingleNodeResponse 976 err := c1.RPC("Node.GetNode", &req, &out) 977 if err == nil { 978 return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", err) 979 } 980 return true, nil 981 }, 982 func(err error) { 983 t.Fatalf(err.Error()) 984 }, 985 ) 986 } 987} 988 989func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) { 990 t.Parallel() 991 assert := assert.New(t) 992 993 s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) { 994 c.Region = "global" 995 }) 996 defer cleanupS1() 997 testutil.WaitForLeader(t, s1.RPC) 998 999 const ( 1000 cafile = "../helper/tlsutil/testdata/ca.pem" 1001 foocert = "../helper/tlsutil/testdata/nomad-foo.pem" 1002 fookey = "../helper/tlsutil/testdata/nomad-foo-key.pem" 1003 ) 1004 1005 c1, cleanup := TestClient(t, func(c *config.Config) { 1006 c.Servers = []string{addr} 1007 c.TLSConfig = &nconfig.TLSConfig{ 1008 EnableHTTP: true, 1009 EnableRPC: true, 1010 VerifyServerHostname: true, 1011 CAFile: cafile, 1012 CertFile: foocert, 1013 KeyFile: fookey, 1014 } 1015 }) 1016 defer cleanup() 1017 1018 // assert that when one node is running in encrypted mode, a RPC request to a 1019 // node running in plaintext mode should fail 1020 { 1021 req := structs.NodeSpecificRequest{ 1022 NodeID: c1.Node().ID, 1023 QueryOptions: structs.QueryOptions{Region: "global"}, 1024 } 1025 testutil.WaitForResult(func() (bool, error) { 1026 var out structs.SingleNodeResponse 1027 err := c1.RPC("Node.GetNode", &req, &out) 1028 if err == nil { 1029 return false, fmt.Errorf("client RPC succeeded when it should have failed :\n%+v", err) 1030 } 1031 return true, nil 1032 }, func(err error) { 1033 t.Fatalf(err.Error()) 1034 }, 1035 ) 1036 } 1037 1038 newConfig := &nconfig.TLSConfig{} 1039 1040 err := c1.reloadTLSConnections(newConfig) 1041 assert.Nil(err) 1042 1043 // assert that when both nodes are in plaintext mode, a RPC request should 1044 // succeed 1045 { 1046 req := structs.NodeSpecificRequest{ 1047 NodeID: c1.Node().ID, 1048 QueryOptions: structs.QueryOptions{Region: "global"}, 1049 } 1050 testutil.WaitForResult(func() (bool, error) { 1051 var out structs.SingleNodeResponse 1052 err := c1.RPC("Node.GetNode", &req, &out) 1053 if err != nil { 1054 return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err) 1055 } 1056 return true, nil 1057 }, func(err error) { 1058 t.Fatalf(err.Error()) 1059 }, 1060 ) 1061 } 1062} 1063 1064// TestClient_ServerList tests client methods that interact with the internal 1065// nomad server list. 1066func TestClient_ServerList(t *testing.T) { 1067 t.Parallel() 1068 client, cleanup := TestClient(t, func(c *config.Config) {}) 1069 defer cleanup() 1070 1071 if s := client.GetServers(); len(s) != 0 { 1072 t.Fatalf("expected server lit to be empty but found: %+q", s) 1073 } 1074 if _, err := client.SetServers(nil); err != noServersErr { 1075 t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err) 1076 } 1077 if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil { 1078 t.Fatalf("expected setting a bad server to return an error") 1079 } 1080 if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil { 1081 t.Fatalf("expected setting at least one good server to succeed but received: %v", err) 1082 } 1083 s := client.GetServers() 1084 if len(s) != 0 { 1085 t.Fatalf("expected 2 servers but received: %+q", s) 1086 } 1087} 1088 1089func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) { 1090 t.Parallel() 1091 client, cleanup := TestClient(t, func(c *config.Config) {}) 1092 defer cleanup() 1093 1094 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1095 NodeResources: &structs.NodeResources{ 1096 Cpu: structs.NodeCpuResources{CpuShares: 123}, 1097 }, 1098 }) 1099 1100 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1101 NodeResources: &structs.NodeResources{ 1102 Memory: structs.NodeMemoryResources{MemoryMB: 1024}, 1103 }, 1104 }) 1105 1106 client.updateNodeFromDevices([]*structs.NodeDeviceResource{ 1107 { 1108 Vendor: "vendor", 1109 Type: "type", 1110 }, 1111 }) 1112 1113 // initial check 1114 expectedResources := &structs.NodeResources{ 1115 // computed through test client initialization 1116 Networks: client.configCopy.Node.NodeResources.Networks, 1117 NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks, 1118 Disk: client.configCopy.Node.NodeResources.Disk, 1119 1120 // injected 1121 Cpu: structs.NodeCpuResources{ 1122 CpuShares: 123, 1123 ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores, 1124 TotalCpuCores: client.configCopy.Node.NodeResources.Cpu.TotalCpuCores, 1125 }, 1126 Memory: structs.NodeMemoryResources{MemoryMB: 1024}, 1127 Devices: []*structs.NodeDeviceResource{ 1128 { 1129 Vendor: "vendor", 1130 Type: "type", 1131 }, 1132 }, 1133 } 1134 1135 assert.EqualValues(t, expectedResources, client.configCopy.Node.NodeResources) 1136 1137 // overrides of values 1138 1139 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1140 NodeResources: &structs.NodeResources{ 1141 Memory: structs.NodeMemoryResources{MemoryMB: 2048}, 1142 }, 1143 }) 1144 1145 client.updateNodeFromDevices([]*structs.NodeDeviceResource{ 1146 { 1147 Vendor: "vendor", 1148 Type: "type", 1149 }, 1150 { 1151 Vendor: "vendor2", 1152 Type: "type2", 1153 }, 1154 }) 1155 1156 expectedResources2 := &structs.NodeResources{ 1157 // computed through test client initialization 1158 Networks: client.configCopy.Node.NodeResources.Networks, 1159 NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks, 1160 Disk: client.configCopy.Node.NodeResources.Disk, 1161 1162 // injected 1163 Cpu: structs.NodeCpuResources{ 1164 CpuShares: 123, 1165 ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores, 1166 TotalCpuCores: client.configCopy.Node.NodeResources.Cpu.TotalCpuCores, 1167 }, 1168 Memory: structs.NodeMemoryResources{MemoryMB: 2048}, 1169 Devices: []*structs.NodeDeviceResource{ 1170 { 1171 Vendor: "vendor", 1172 Type: "type", 1173 }, 1174 { 1175 Vendor: "vendor2", 1176 Type: "type2", 1177 }, 1178 }, 1179 } 1180 1181 assert.EqualValues(t, expectedResources2, client.configCopy.Node.NodeResources) 1182 1183} 1184 1185// TestClient_UpdateNodeFromFingerprintKeepsConfig asserts manually configured 1186// network interfaces take precedence over fingerprinted ones. 1187func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) { 1188 t.Parallel() 1189 if runtime.GOOS != "linux" { 1190 t.Skip("assertions assume linux platform") 1191 } 1192 1193 // Client without network configured updates to match fingerprint 1194 client, cleanup := TestClient(t, nil) 1195 defer cleanup() 1196 1197 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1198 NodeResources: &structs.NodeResources{ 1199 Cpu: structs.NodeCpuResources{CpuShares: 123}, 1200 Networks: []*structs.NetworkResource{{Mode: "host", Device: "any-interface"}}, 1201 }, 1202 Resources: &structs.Resources{ 1203 CPU: 80, 1204 }, 1205 }) 1206 idx := len(client.config.Node.NodeResources.Networks) - 1 1207 require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares) 1208 require.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[idx].Device) 1209 require.Equal(t, 80, client.config.Node.Resources.CPU) 1210 1211 // lookup an interface. client.Node starts with a hardcoded value, eth0, 1212 // and is only updated async through fingerprinter. 1213 // Let's just lookup network device; anyone will do for this test 1214 interfaces, err := net.Interfaces() 1215 require.NoError(t, err) 1216 require.NotEmpty(t, interfaces) 1217 dev := interfaces[0].Name 1218 1219 // Client with network interface configured keeps the config 1220 // setting on update 1221 name := "TestClient_UpdateNodeFromFingerprintKeepsConfig2" 1222 client, cleanup = TestClient(t, func(c *config.Config) { 1223 c.NetworkInterface = dev 1224 c.Node.Name = name 1225 c.Options["fingerprint.denylist"] = "network" 1226 // Node is already a mock.Node, with a device 1227 c.Node.NodeResources.Networks[0].Device = dev 1228 }) 1229 defer cleanup() 1230 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1231 NodeResources: &structs.NodeResources{ 1232 Cpu: structs.NodeCpuResources{CpuShares: 123}, 1233 Networks: []*structs.NetworkResource{ 1234 {Mode: "host", Device: "any-interface", MBits: 20}, 1235 }, 1236 }, 1237 }) 1238 require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares) 1239 // only the configured device is kept 1240 require.Equal(t, 2, len(client.config.Node.NodeResources.Networks)) 1241 require.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device) 1242 require.Equal(t, "bridge", client.config.Node.NodeResources.Networks[1].Mode) 1243 1244 // Network speed is applied to all NetworkResources 1245 client.config.NetworkInterface = "" 1246 client.config.NetworkSpeed = 100 1247 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1248 NodeResources: &structs.NodeResources{ 1249 Cpu: structs.NodeCpuResources{CpuShares: 123}, 1250 Networks: []*structs.NetworkResource{ 1251 {Mode: "host", Device: "any-interface", MBits: 20}, 1252 }, 1253 }, 1254 Resources: &structs.Resources{ 1255 CPU: 80, 1256 }, 1257 }) 1258 assert.Equal(t, 3, len(client.config.Node.NodeResources.Networks)) 1259 assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[2].Device) 1260 assert.Equal(t, 100, client.config.Node.NodeResources.Networks[2].MBits) 1261 assert.Equal(t, 0, client.config.Node.NodeResources.Networks[1].MBits) 1262} 1263 1264// Support multiple IP addresses (ipv4 vs. 6, e.g.) on the configured network interface 1265func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) { 1266 t.Parallel() 1267 1268 var dev string 1269 switch runtime.GOOS { 1270 case "linux": 1271 dev = "lo" 1272 case "darwin": 1273 dev = "lo0" 1274 } 1275 1276 // Client without network configured updates to match fingerprint 1277 client, cleanup := TestClient(t, func(c *config.Config) { 1278 c.NetworkInterface = dev 1279 c.Options["fingerprint.denylist"] = "network,cni,bridge" 1280 c.Node.Resources.Networks = c.Node.NodeResources.Networks 1281 }) 1282 defer cleanup() 1283 1284 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ 1285 NodeResources: &structs.NodeResources{ 1286 Cpu: structs.NodeCpuResources{CpuShares: 123}, 1287 Networks: []*structs.NetworkResource{ 1288 {Device: dev, IP: "127.0.0.1"}, 1289 {Device: dev, IP: "::1"}, 1290 }, 1291 }, 1292 }) 1293 1294 nets := structs.Networks{ 1295 mock.Node().NodeResources.Networks[0], 1296 {Device: dev, IP: "127.0.0.1"}, 1297 {Device: dev, IP: "::1"}, 1298 } 1299 1300 require.Equal(t, nets, client.config.Node.NodeResources.Networks) 1301} 1302 1303func TestClient_computeAllocatedDeviceStats(t *testing.T) { 1304 logger := testlog.HCLogger(t) 1305 c := &Client{logger: logger} 1306 1307 newDeviceStats := func(strValue string) *device.DeviceStats { 1308 return &device.DeviceStats{ 1309 Summary: &psstructs.StatValue{ 1310 StringVal: &strValue, 1311 }, 1312 } 1313 } 1314 1315 allocatedDevices := []*structs.AllocatedDeviceResource{ 1316 { 1317 Vendor: "vendor", 1318 Type: "type", 1319 Name: "name", 1320 DeviceIDs: []string{"d2", "d3", "notfoundid"}, 1321 }, 1322 { 1323 Vendor: "vendor2", 1324 Type: "type2", 1325 Name: "name2", 1326 DeviceIDs: []string{"a2"}, 1327 }, 1328 { 1329 Vendor: "vendor_notfound", 1330 Type: "type_notfound", 1331 Name: "name_notfound", 1332 DeviceIDs: []string{"d3"}, 1333 }, 1334 } 1335 1336 hostDeviceGroupStats := []*device.DeviceGroupStats{ 1337 { 1338 Vendor: "vendor", 1339 Type: "type", 1340 Name: "name", 1341 InstanceStats: map[string]*device.DeviceStats{ 1342 "unallocated": newDeviceStats("unallocated"), 1343 "d2": newDeviceStats("d2"), 1344 "d3": newDeviceStats("d3"), 1345 }, 1346 }, 1347 { 1348 Vendor: "vendor2", 1349 Type: "type2", 1350 Name: "name2", 1351 InstanceStats: map[string]*device.DeviceStats{ 1352 "a2": newDeviceStats("a2"), 1353 }, 1354 }, 1355 { 1356 Vendor: "vendor_unused", 1357 Type: "type_unused", 1358 Name: "name_unused", 1359 InstanceStats: map[string]*device.DeviceStats{ 1360 "unallocated_unused": newDeviceStats("unallocated_unused"), 1361 }, 1362 }, 1363 } 1364 1365 // test some edge conditions 1366 assert.Empty(t, c.computeAllocatedDeviceGroupStats(nil, nil)) 1367 assert.Empty(t, c.computeAllocatedDeviceGroupStats(nil, hostDeviceGroupStats)) 1368 assert.Empty(t, c.computeAllocatedDeviceGroupStats(allocatedDevices, nil)) 1369 1370 // actual test 1371 result := c.computeAllocatedDeviceGroupStats(allocatedDevices, hostDeviceGroupStats) 1372 sort.Slice(result, func(i, j int) bool { 1373 return result[i].Vendor < result[j].Vendor 1374 }) 1375 1376 expected := []*device.DeviceGroupStats{ 1377 { 1378 Vendor: "vendor", 1379 Type: "type", 1380 Name: "name", 1381 InstanceStats: map[string]*device.DeviceStats{ 1382 "d2": newDeviceStats("d2"), 1383 "d3": newDeviceStats("d3"), 1384 }, 1385 }, 1386 { 1387 Vendor: "vendor2", 1388 Type: "type2", 1389 Name: "name2", 1390 InstanceStats: map[string]*device.DeviceStats{ 1391 "a2": newDeviceStats("a2"), 1392 }, 1393 }, 1394 } 1395 1396 assert.EqualValues(t, expected, result) 1397} 1398 1399func TestClient_getAllocatedResources(t *testing.T) { 1400 t.Parallel() 1401 require := require.New(t) 1402 client, cleanup := TestClient(t, nil) 1403 defer cleanup() 1404 1405 allocStops := mock.BatchAlloc() 1406 allocStops.Job.TaskGroups[0].Count = 1 1407 allocStops.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 1408 allocStops.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 1409 "run_for": "1ms", 1410 "exit_code": "0", 1411 } 1412 allocStops.Job.TaskGroups[0].RestartPolicy.Attempts = 0 1413 allocStops.AllocatedResources.Shared.DiskMB = 64 1414 allocStops.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 64} 1415 allocStops.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 64} 1416 require.Nil(client.addAlloc(allocStops, "")) 1417 1418 allocFails := mock.BatchAlloc() 1419 allocFails.Job.TaskGroups[0].Count = 1 1420 allocFails.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 1421 allocFails.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 1422 "run_for": "1ms", 1423 "exit_code": "1", 1424 } 1425 allocFails.Job.TaskGroups[0].RestartPolicy.Attempts = 0 1426 allocFails.AllocatedResources.Shared.DiskMB = 128 1427 allocFails.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 128} 1428 allocFails.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 128} 1429 require.Nil(client.addAlloc(allocFails, "")) 1430 1431 allocRuns := mock.Alloc() 1432 allocRuns.Job.TaskGroups[0].Count = 1 1433 allocRuns.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 1434 allocRuns.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 1435 "run_for": "3s", 1436 } 1437 allocRuns.AllocatedResources.Shared.DiskMB = 256 1438 allocRuns.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 256} 1439 allocRuns.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 256} 1440 require.Nil(client.addAlloc(allocRuns, "")) 1441 1442 allocPends := mock.Alloc() 1443 allocPends.Job.TaskGroups[0].Count = 1 1444 allocPends.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 1445 allocPends.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 1446 "run_for": "5s", 1447 "start_block_for": "10s", 1448 } 1449 allocPends.AllocatedResources.Shared.DiskMB = 512 1450 allocPends.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 512} 1451 allocPends.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 512} 1452 require.Nil(client.addAlloc(allocPends, "")) 1453 1454 // wait for allocStops to stop running and for allocRuns to be pending/running 1455 testutil.WaitForResult(func() (bool, error) { 1456 as, err := client.GetAllocState(allocPends.ID) 1457 if err != nil { 1458 return false, err 1459 } else if as.ClientStatus != structs.AllocClientStatusPending { 1460 return false, fmt.Errorf("allocPends not yet pending: %#v", as) 1461 } 1462 1463 as, err = client.GetAllocState(allocRuns.ID) 1464 if as.ClientStatus != structs.AllocClientStatusRunning { 1465 return false, fmt.Errorf("allocRuns not yet running: %#v", as) 1466 } else if err != nil { 1467 return false, err 1468 } 1469 1470 as, err = client.GetAllocState(allocStops.ID) 1471 if err != nil { 1472 return false, err 1473 } else if as.ClientStatus != structs.AllocClientStatusComplete { 1474 return false, fmt.Errorf("allocStops not yet complete: %#v", as) 1475 } 1476 1477 as, err = client.GetAllocState(allocFails.ID) 1478 if err != nil { 1479 return false, err 1480 } else if as.ClientStatus != structs.AllocClientStatusFailed { 1481 return false, fmt.Errorf("allocFails not yet failed: %#v", as) 1482 } 1483 1484 return true, nil 1485 }, func(err error) { 1486 require.NoError(err) 1487 }) 1488 1489 result := client.getAllocatedResources(client.config.Node) 1490 1491 // Ignore comparing networks for now 1492 result.Flattened.Networks = nil 1493 1494 expected := structs.ComparableResources{ 1495 Flattened: structs.AllocatedTaskResources{ 1496 Cpu: structs.AllocatedCpuResources{ 1497 CpuShares: 768, 1498 ReservedCores: []uint16{}, 1499 }, 1500 Memory: structs.AllocatedMemoryResources{ 1501 MemoryMB: 768, 1502 MemoryMaxMB: 768, 1503 }, 1504 Networks: nil, 1505 }, 1506 Shared: structs.AllocatedSharedResources{ 1507 DiskMB: 768, 1508 }, 1509 } 1510 1511 assert.EqualValues(t, expected, *result) 1512} 1513 1514func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) { 1515 t.Parallel() 1516 client, cleanup := TestClient(t, nil) 1517 defer cleanup() 1518 1519 // initial update 1520 { 1521 info := &structs.DriverInfo{ 1522 Detected: true, 1523 Healthy: false, 1524 HealthDescription: "not healthy at start", 1525 Attributes: map[string]string{ 1526 "node.mock.testattr1": "val1", 1527 }, 1528 } 1529 client.updateNodeFromDriver("mock", info) 1530 n := client.config.Node 1531 1532 updatedInfo := *n.Drivers["mock"] 1533 // compare without update time 1534 updatedInfo.UpdateTime = info.UpdateTime 1535 assert.EqualValues(t, updatedInfo, *info) 1536 1537 // check node attributes 1538 assert.Equal(t, "val1", n.Attributes["node.mock.testattr1"]) 1539 } 1540 1541 // initial update 1542 { 1543 info := &structs.DriverInfo{ 1544 Detected: true, 1545 Healthy: true, 1546 HealthDescription: "healthy", 1547 Attributes: map[string]string{ 1548 "node.mock.testattr1": "val2", 1549 }, 1550 } 1551 client.updateNodeFromDriver("mock", info) 1552 n := client.Node() 1553 1554 updatedInfo := *n.Drivers["mock"] 1555 // compare without update time 1556 updatedInfo.UpdateTime = info.UpdateTime 1557 assert.EqualValues(t, updatedInfo, *info) 1558 1559 // check node attributes are updated 1560 assert.Equal(t, "val2", n.Attributes["node.mock.testattr1"]) 1561 1562 // update once more with the same info, updateTime shouldn't change 1563 client.updateNodeFromDriver("mock", info) 1564 un := client.Node() 1565 assert.EqualValues(t, n, un) 1566 } 1567 1568 // update once more to unhealthy because why not 1569 { 1570 info := &structs.DriverInfo{ 1571 Detected: true, 1572 Healthy: false, 1573 HealthDescription: "lost track", 1574 Attributes: map[string]string{ 1575 "node.mock.testattr1": "", 1576 }, 1577 } 1578 client.updateNodeFromDriver("mock", info) 1579 n := client.Node() 1580 1581 updatedInfo := *n.Drivers["mock"] 1582 // compare without update time 1583 updatedInfo.UpdateTime = info.UpdateTime 1584 assert.EqualValues(t, updatedInfo, *info) 1585 1586 // check node attributes are updated 1587 assert.Equal(t, "", n.Attributes["node.mock.testattr1"]) 1588 1589 // update once more with the same info, updateTime shouldn't change 1590 client.updateNodeFromDriver("mock", info) 1591 un := client.Node() 1592 assert.EqualValues(t, n, un) 1593 } 1594} 1595 1596// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported 1597func TestClient_hasLocalState(t *testing.T) { 1598 t.Parallel() 1599 1600 c, cleanup := TestClient(t, nil) 1601 defer cleanup() 1602 1603 c.stateDB = state.NewMemDB(c.logger) 1604 1605 t.Run("plain alloc", func(t *testing.T) { 1606 alloc := mock.BatchAlloc() 1607 c.stateDB.PutAllocation(alloc) 1608 1609 require.False(t, c.hasLocalState(alloc)) 1610 }) 1611 1612 t.Run("alloc with a task with local state", func(t *testing.T) { 1613 alloc := mock.BatchAlloc() 1614 taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name 1615 ls := &trstate.LocalState{} 1616 1617 c.stateDB.PutAllocation(alloc) 1618 c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls) 1619 1620 require.True(t, c.hasLocalState(alloc)) 1621 }) 1622 1623 t.Run("alloc with a task with task state", func(t *testing.T) { 1624 alloc := mock.BatchAlloc() 1625 taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name 1626 ts := &structs.TaskState{ 1627 State: structs.TaskStateRunning, 1628 } 1629 1630 c.stateDB.PutAllocation(alloc) 1631 c.stateDB.PutTaskState(alloc.ID, taskName, ts) 1632 1633 require.True(t, c.hasLocalState(alloc)) 1634 }) 1635} 1636 1637func Test_verifiedTasks(t *testing.T) { 1638 t.Parallel() 1639 logger := testlog.HCLogger(t) 1640 1641 // produce a result and check against expected tasks and/or error output 1642 try := func(t *testing.T, a *structs.Allocation, tasks, expTasks []string, expErr string) { 1643 result, err := verifiedTasks(logger, a, tasks) 1644 if expErr != "" { 1645 require.EqualError(t, err, expErr) 1646 } else { 1647 require.NoError(t, err) 1648 require.Equal(t, expTasks, result) 1649 } 1650 } 1651 1652 // create an alloc with TaskGroup=g1, tasks configured given g1Tasks 1653 alloc := func(g1Tasks []string) *structs.Allocation { 1654 var tasks []*structs.Task 1655 for _, taskName := range g1Tasks { 1656 tasks = append(tasks, &structs.Task{Name: taskName}) 1657 } 1658 1659 return &structs.Allocation{ 1660 Job: &structs.Job{ 1661 TaskGroups: []*structs.TaskGroup{ 1662 {Name: "g0", Tasks: []*structs.Task{{Name: "g0t1"}}}, 1663 {Name: "g1", Tasks: tasks}, 1664 }, 1665 }, 1666 TaskGroup: "g1", 1667 } 1668 } 1669 1670 t.Run("nil alloc", func(t *testing.T) { 1671 tasks := []string{"g1t1"} 1672 try(t, nil, tasks, nil, "nil allocation") 1673 }) 1674 1675 t.Run("missing task names", func(t *testing.T) { 1676 var tasks []string 1677 tgTasks := []string{"g1t1"} 1678 try(t, alloc(tgTasks), tasks, nil, "missing task names") 1679 }) 1680 1681 t.Run("missing group", func(t *testing.T) { 1682 tasks := []string{"g1t1"} 1683 a := alloc(tasks) 1684 a.TaskGroup = "other" 1685 try(t, a, tasks, nil, "group name in allocation is not present in job") 1686 }) 1687 1688 t.Run("nonexistent task", func(t *testing.T) { 1689 tasks := []string{"missing"} 1690 try(t, alloc([]string{"task1"}), tasks, nil, `task "missing" not found in allocation`) 1691 }) 1692 1693 t.Run("matching task", func(t *testing.T) { 1694 tasks := []string{"g1t1"} 1695 try(t, alloc(tasks), tasks, tasks, "") 1696 }) 1697 1698 t.Run("matching task subset", func(t *testing.T) { 1699 tasks := []string{"g1t1", "g1t3"} 1700 tgTasks := []string{"g1t1", "g1t2", "g1t3"} 1701 try(t, alloc(tgTasks), tasks, tasks, "") 1702 }) 1703} 1704