1package client 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io" 7 "net" 8 "runtime" 9 "strings" 10 "testing" 11 "time" 12 13 "github.com/hashicorp/nomad/acl" 14 "github.com/hashicorp/nomad/client/config" 15 cstructs "github.com/hashicorp/nomad/client/structs" 16 "github.com/hashicorp/nomad/helper/pluginutils/catalog" 17 "github.com/hashicorp/nomad/helper/uuid" 18 "github.com/hashicorp/nomad/nomad" 19 "github.com/hashicorp/nomad/nomad/mock" 20 "github.com/hashicorp/nomad/nomad/structs" 21 nstructs "github.com/hashicorp/nomad/nomad/structs" 22 nconfig "github.com/hashicorp/nomad/nomad/structs/config" 23 "github.com/hashicorp/nomad/plugins/drivers" 24 "github.com/hashicorp/nomad/testutil" 25 "github.com/stretchr/testify/require" 26 "github.com/ugorji/go/codec" 27 "golang.org/x/sys/unix" 28) 29 30func TestAllocations_Restart(t *testing.T) { 31 t.Parallel() 32 require := require.New(t) 33 client, cleanup := TestClient(t, nil) 34 defer cleanup() 35 36 a := mock.Alloc() 37 a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 38 a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{ 39 Attempts: 0, 40 Mode: nstructs.RestartPolicyModeFail, 41 } 42 a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 43 "run_for": "10ms", 44 } 45 require.Nil(client.addAlloc(a, "")) 46 47 // Try with bad alloc 48 req := &nstructs.AllocRestartRequest{} 49 var resp nstructs.GenericResponse 50 err := client.ClientRPC("Allocations.Restart", &req, &resp) 51 require.Error(err) 52 53 // Try with good alloc 54 req.AllocID = a.ID 55 56 testutil.WaitForResult(func() (bool, error) { 57 var resp2 nstructs.GenericResponse 58 err := client.ClientRPC("Allocations.Restart", &req, &resp2) 59 if err != nil && strings.Contains(err.Error(), "not running") { 60 return false, err 61 } 62 63 return true, nil 64 }, func(err error) { 65 t.Fatalf("err: %v", err) 66 }) 67} 68 69func TestAllocations_Restart_ACL(t *testing.T) { 70 t.Parallel() 71 require := require.New(t) 72 73 server, addr, root, cleanupS := testACLServer(t, nil) 74 defer cleanupS() 75 76 client, cleanupC := TestClient(t, func(c *config.Config) { 77 c.Servers = []string{addr} 78 c.ACLEnabled = true 79 }) 80 defer cleanupC() 81 82 job := mock.BatchJob() 83 job.TaskGroups[0].Count = 1 84 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 85 "run_for": "20s", 86 } 87 88 // Wait for client to be running job 89 alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0] 90 91 // Try request without a token and expect failure 92 { 93 req := &nstructs.AllocRestartRequest{} 94 req.AllocID = alloc.ID 95 var resp nstructs.GenericResponse 96 err := client.ClientRPC("Allocations.Restart", &req, &resp) 97 require.NotNil(err) 98 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 99 } 100 101 // Try request with an invalid token and expect failure 102 { 103 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{})) 104 req := &nstructs.AllocRestartRequest{} 105 req.AllocID = alloc.ID 106 req.AuthToken = token.SecretID 107 108 var resp nstructs.GenericResponse 109 err := client.ClientRPC("Allocations.Restart", &req, &resp) 110 111 require.NotNil(err) 112 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 113 } 114 115 // Try request with a valid token 116 { 117 policyHCL := mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}) 118 token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", policyHCL) 119 require.NotNil(token) 120 req := &nstructs.AllocRestartRequest{} 121 req.AllocID = alloc.ID 122 req.AuthToken = token.SecretID 123 req.Namespace = nstructs.DefaultNamespace 124 var resp nstructs.GenericResponse 125 err := client.ClientRPC("Allocations.Restart", &req, &resp) 126 require.NoError(err) 127 //require.True(nstructs.IsErrUnknownAllocation(err), "Expected unknown alloc, found: %v", err) 128 } 129 130 // Try request with a management token 131 { 132 req := &nstructs.AllocRestartRequest{} 133 req.AllocID = alloc.ID 134 req.AuthToken = root.SecretID 135 var resp nstructs.GenericResponse 136 err := client.ClientRPC("Allocations.Restart", &req, &resp) 137 // Depending on how quickly the alloc restarts there may be no 138 // error *or* a task not running error; either is fine. 139 if err != nil { 140 require.Contains(err.Error(), "Task not running", err) 141 } 142 } 143} 144 145func TestAllocations_GarbageCollectAll(t *testing.T) { 146 t.Parallel() 147 require := require.New(t) 148 client, cleanup := TestClient(t, nil) 149 defer cleanup() 150 151 req := &nstructs.NodeSpecificRequest{} 152 var resp nstructs.GenericResponse 153 require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)) 154} 155 156func TestAllocations_GarbageCollectAll_ACL(t *testing.T) { 157 t.Parallel() 158 require := require.New(t) 159 160 server, addr, root, cleanupS := testACLServer(t, nil) 161 defer cleanupS() 162 163 client, cleanupC := TestClient(t, func(c *config.Config) { 164 c.Servers = []string{addr} 165 c.ACLEnabled = true 166 }) 167 defer cleanupC() 168 169 // Try request without a token and expect failure 170 { 171 req := &nstructs.NodeSpecificRequest{} 172 var resp nstructs.GenericResponse 173 err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp) 174 require.NotNil(err) 175 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 176 } 177 178 // Try request with an invalid token and expect failure 179 { 180 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) 181 req := &nstructs.NodeSpecificRequest{} 182 req.AuthToken = token.SecretID 183 184 var resp nstructs.GenericResponse 185 err := client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp) 186 187 require.NotNil(err) 188 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 189 } 190 191 // Try request with a valid token 192 { 193 token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", mock.NodePolicy(acl.PolicyWrite)) 194 req := &nstructs.NodeSpecificRequest{} 195 req.AuthToken = token.SecretID 196 var resp nstructs.GenericResponse 197 require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)) 198 } 199 200 // Try request with a management token 201 { 202 req := &nstructs.NodeSpecificRequest{} 203 req.AuthToken = root.SecretID 204 var resp nstructs.GenericResponse 205 require.Nil(client.ClientRPC("Allocations.GarbageCollectAll", &req, &resp)) 206 } 207} 208 209func TestAllocations_GarbageCollect(t *testing.T) { 210 t.Parallel() 211 require := require.New(t) 212 client, cleanup := TestClient(t, func(c *config.Config) { 213 c.GCDiskUsageThreshold = 100.0 214 }) 215 defer cleanup() 216 217 a := mock.Alloc() 218 a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 219 a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{ 220 Attempts: 0, 221 Mode: nstructs.RestartPolicyModeFail, 222 } 223 a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 224 "run_for": "10ms", 225 } 226 require.Nil(client.addAlloc(a, "")) 227 228 // Try with bad alloc 229 req := &nstructs.AllocSpecificRequest{} 230 var resp nstructs.GenericResponse 231 err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) 232 require.NotNil(err) 233 234 // Try with good alloc 235 req.AllocID = a.ID 236 testutil.WaitForResult(func() (bool, error) { 237 // Check if has been removed first 238 if ar, ok := client.allocs[a.ID]; !ok || ar.IsDestroyed() { 239 return true, nil 240 } 241 242 var resp2 nstructs.GenericResponse 243 err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp2) 244 return err == nil, err 245 }, func(err error) { 246 t.Fatalf("err: %v", err) 247 }) 248} 249 250func TestAllocations_GarbageCollect_ACL(t *testing.T) { 251 t.Parallel() 252 require := require.New(t) 253 254 server, addr, root, cleanupS := testACLServer(t, nil) 255 defer cleanupS() 256 257 client, cleanupC := TestClient(t, func(c *config.Config) { 258 c.Servers = []string{addr} 259 c.ACLEnabled = true 260 }) 261 defer cleanupC() 262 263 job := mock.BatchJob() 264 job.TaskGroups[0].Count = 1 265 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 266 "run_for": "20s", 267 } 268 269 // Wait for client to be running job 270 alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0] 271 272 // Try request without a token and expect failure 273 { 274 req := &nstructs.AllocSpecificRequest{} 275 req.AllocID = alloc.ID 276 var resp nstructs.GenericResponse 277 err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) 278 require.NotNil(err) 279 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 280 } 281 282 // Try request with an invalid token and expect failure 283 { 284 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) 285 req := &nstructs.AllocSpecificRequest{} 286 req.AllocID = alloc.ID 287 req.AuthToken = token.SecretID 288 289 var resp nstructs.GenericResponse 290 err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) 291 292 require.NotNil(err) 293 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 294 } 295 296 // Try request with a valid token 297 { 298 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid", 299 mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})) 300 req := &nstructs.AllocSpecificRequest{} 301 req.AllocID = alloc.ID 302 req.AuthToken = token.SecretID 303 req.Namespace = nstructs.DefaultNamespace 304 305 var resp nstructs.GenericResponse 306 err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) 307 require.True(nstructs.IsErrUnknownAllocation(err)) 308 } 309 310 // Try request with a management token 311 { 312 req := &nstructs.AllocSpecificRequest{} 313 req.AuthToken = root.SecretID 314 315 var resp nstructs.GenericResponse 316 err := client.ClientRPC("Allocations.GarbageCollect", &req, &resp) 317 require.True(nstructs.IsErrUnknownAllocation(err)) 318 } 319} 320 321func TestAllocations_Signal(t *testing.T) { 322 t.Parallel() 323 324 client, cleanup := TestClient(t, nil) 325 defer cleanup() 326 327 a := mock.Alloc() 328 require.Nil(t, client.addAlloc(a, "")) 329 330 // Try with bad alloc 331 req := &nstructs.AllocSignalRequest{} 332 var resp nstructs.GenericResponse 333 err := client.ClientRPC("Allocations.Signal", &req, &resp) 334 require.NotNil(t, err) 335 require.True(t, nstructs.IsErrUnknownAllocation(err)) 336 337 // Try with good alloc 338 req.AllocID = a.ID 339 340 var resp2 nstructs.GenericResponse 341 err = client.ClientRPC("Allocations.Signal", &req, &resp2) 342 343 require.Error(t, err, "Expected error, got: %s, resp: %#+v", err, resp2) 344 require.Contains(t, err.Error(), "Failed to signal task: web, err: Task not running") 345} 346 347func TestAllocations_Signal_ACL(t *testing.T) { 348 t.Parallel() 349 require := require.New(t) 350 351 server, addr, root, cleanupS := testACLServer(t, nil) 352 defer cleanupS() 353 354 client, cleanupC := TestClient(t, func(c *config.Config) { 355 c.Servers = []string{addr} 356 c.ACLEnabled = true 357 }) 358 defer cleanupC() 359 360 job := mock.BatchJob() 361 job.TaskGroups[0].Count = 1 362 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 363 "run_for": "20s", 364 } 365 366 // Wait for client to be running job 367 alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0] 368 369 // Try request without a token and expect failure 370 { 371 req := &nstructs.AllocSignalRequest{} 372 req.AllocID = alloc.ID 373 var resp nstructs.GenericResponse 374 err := client.ClientRPC("Allocations.Signal", &req, &resp) 375 require.NotNil(err) 376 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 377 } 378 379 // Try request with an invalid token and expect failure 380 { 381 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) 382 req := &nstructs.AllocSignalRequest{} 383 req.AllocID = alloc.ID 384 req.AuthToken = token.SecretID 385 386 var resp nstructs.GenericResponse 387 err := client.ClientRPC("Allocations.Signal", &req, &resp) 388 389 require.NotNil(err) 390 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 391 } 392 393 // Try request with a valid token 394 { 395 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid", 396 mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})) 397 req := &nstructs.AllocSignalRequest{} 398 req.AllocID = alloc.ID 399 req.AuthToken = token.SecretID 400 req.Namespace = nstructs.DefaultNamespace 401 402 var resp nstructs.GenericResponse 403 err := client.ClientRPC("Allocations.Signal", &req, &resp) 404 require.NoError(err) 405 } 406 407 // Try request with a management token 408 { 409 req := &nstructs.AllocSignalRequest{} 410 req.AllocID = alloc.ID 411 req.AuthToken = root.SecretID 412 413 var resp nstructs.GenericResponse 414 err := client.ClientRPC("Allocations.Signal", &req, &resp) 415 require.NoError(err) 416 } 417} 418 419func TestAllocations_Stats(t *testing.T) { 420 t.Parallel() 421 require := require.New(t) 422 client, cleanup := TestClient(t, nil) 423 defer cleanup() 424 425 a := mock.Alloc() 426 require.Nil(client.addAlloc(a, "")) 427 428 // Try with bad alloc 429 req := &cstructs.AllocStatsRequest{} 430 var resp cstructs.AllocStatsResponse 431 err := client.ClientRPC("Allocations.Stats", &req, &resp) 432 require.NotNil(err) 433 434 // Try with good alloc 435 req.AllocID = a.ID 436 testutil.WaitForResult(func() (bool, error) { 437 var resp2 cstructs.AllocStatsResponse 438 err := client.ClientRPC("Allocations.Stats", &req, &resp2) 439 if err != nil { 440 return false, err 441 } 442 if resp2.Stats == nil { 443 return false, fmt.Errorf("invalid stats object") 444 } 445 446 return true, nil 447 }, func(err error) { 448 t.Fatalf("err: %v", err) 449 }) 450} 451 452func TestAllocations_Stats_ACL(t *testing.T) { 453 t.Parallel() 454 require := require.New(t) 455 456 server, addr, root, cleanupS := testACLServer(t, nil) 457 defer cleanupS() 458 459 client, cleanupC := TestClient(t, func(c *config.Config) { 460 c.Servers = []string{addr} 461 c.ACLEnabled = true 462 }) 463 defer cleanupC() 464 465 job := mock.BatchJob() 466 job.TaskGroups[0].Count = 1 467 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 468 "run_for": "20s", 469 } 470 471 // Wait for client to be running job 472 alloc := testutil.WaitForRunningWithToken(t, server.RPC, job, root.SecretID)[0] 473 474 // Try request without a token and expect failure 475 { 476 req := &cstructs.AllocStatsRequest{} 477 req.AllocID = alloc.ID 478 var resp cstructs.AllocStatsResponse 479 err := client.ClientRPC("Allocations.Stats", &req, &resp) 480 require.NotNil(err) 481 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 482 } 483 484 // Try request with an invalid token and expect failure 485 { 486 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny)) 487 req := &cstructs.AllocStatsRequest{} 488 req.AllocID = alloc.ID 489 req.AuthToken = token.SecretID 490 491 var resp cstructs.AllocStatsResponse 492 err := client.ClientRPC("Allocations.Stats", &req, &resp) 493 494 require.NotNil(err) 495 require.EqualError(err, nstructs.ErrPermissionDenied.Error()) 496 } 497 498 // Try request with a valid token 499 { 500 token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid", 501 mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 502 req := &cstructs.AllocStatsRequest{} 503 req.AllocID = alloc.ID 504 req.AuthToken = token.SecretID 505 req.Namespace = nstructs.DefaultNamespace 506 507 var resp cstructs.AllocStatsResponse 508 err := client.ClientRPC("Allocations.Stats", &req, &resp) 509 require.NoError(err) 510 } 511 512 // Try request with a management token 513 { 514 req := &cstructs.AllocStatsRequest{} 515 req.AllocID = alloc.ID 516 req.AuthToken = root.SecretID 517 518 var resp cstructs.AllocStatsResponse 519 err := client.ClientRPC("Allocations.Stats", &req, &resp) 520 require.NoError(err) 521 } 522} 523 524func TestAlloc_ExecStreaming(t *testing.T) { 525 t.Parallel() 526 require := require.New(t) 527 528 // Start a server and client 529 s, cleanupS := nomad.TestServer(t, nil) 530 defer cleanupS() 531 testutil.WaitForLeader(t, s.RPC) 532 533 c, cleanupC := TestClient(t, func(c *config.Config) { 534 c.Servers = []string{s.GetConfig().RPCAddr.String()} 535 }) 536 defer cleanupC() 537 538 expectedStdout := "Hello from the other side\n" 539 expectedStderr := "Hello from the other side\n" 540 job := mock.BatchJob() 541 job.TaskGroups[0].Count = 1 542 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 543 "run_for": "20s", 544 "exec_command": map[string]interface{}{ 545 "run_for": "1ms", 546 "stdout_string": expectedStdout, 547 "stderr_string": expectedStderr, 548 "exit_code": 3, 549 }, 550 } 551 552 // Wait for client to be running job 553 testutil.WaitForRunning(t, s.RPC, job) 554 555 // Get the allocation ID 556 args := nstructs.AllocListRequest{} 557 args.Region = "global" 558 resp := nstructs.AllocListResponse{} 559 require.NoError(s.RPC("Alloc.List", &args, &resp)) 560 require.Len(resp.Allocations, 1) 561 allocID := resp.Allocations[0].ID 562 563 // Make the request 564 req := &cstructs.AllocExecRequest{ 565 AllocID: allocID, 566 Task: job.TaskGroups[0].Tasks[0].Name, 567 Tty: true, 568 Cmd: []string{"placeholder command"}, 569 QueryOptions: nstructs.QueryOptions{Region: "global"}, 570 } 571 572 // Get the handler 573 handler, err := c.StreamingRpcHandler("Allocations.Exec") 574 require.Nil(err) 575 576 // Create a pipe 577 p1, p2 := net.Pipe() 578 defer p1.Close() 579 defer p2.Close() 580 581 errCh := make(chan error) 582 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 583 584 // Start the handler 585 go handler(p2) 586 go decodeFrames(t, p1, frames, errCh) 587 588 // Send the request 589 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 590 require.Nil(encoder.Encode(req)) 591 592 timeout := time.After(3 * time.Second) 593 594 exitCode := -1 595 receivedStdout := "" 596 receivedStderr := "" 597 598OUTER: 599 for { 600 select { 601 case <-timeout: 602 // time out report 603 require.Equal(expectedStdout, receivedStderr, "didn't receive expected stdout") 604 require.Equal(expectedStderr, receivedStderr, "didn't receive expected stderr") 605 require.Equal(3, exitCode, "failed to get exit code") 606 require.FailNow("timed out") 607 case err := <-errCh: 608 require.NoError(err) 609 case f := <-frames: 610 switch { 611 case f.Stdout != nil && len(f.Stdout.Data) != 0: 612 receivedStdout += string(f.Stdout.Data) 613 case f.Stderr != nil && len(f.Stderr.Data) != 0: 614 receivedStderr += string(f.Stderr.Data) 615 case f.Exited && f.Result != nil: 616 exitCode = int(f.Result.ExitCode) 617 default: 618 t.Logf("received unrelevant frame: %v", f) 619 } 620 621 if expectedStdout == receivedStdout && expectedStderr == receivedStderr && exitCode == 3 { 622 break OUTER 623 } 624 } 625 } 626} 627 628func TestAlloc_ExecStreaming_NoAllocation(t *testing.T) { 629 t.Parallel() 630 require := require.New(t) 631 632 // Start a server and client 633 s, cleanupS := nomad.TestServer(t, nil) 634 defer cleanupS() 635 testutil.WaitForLeader(t, s.RPC) 636 637 c, cleanupC := TestClient(t, func(c *config.Config) { 638 c.Servers = []string{s.GetConfig().RPCAddr.String()} 639 }) 640 defer cleanupC() 641 642 // Make the request 643 req := &cstructs.AllocExecRequest{ 644 AllocID: uuid.Generate(), 645 Task: "testtask", 646 Tty: true, 647 Cmd: []string{"placeholder command"}, 648 QueryOptions: nstructs.QueryOptions{Region: "global"}, 649 } 650 651 // Get the handler 652 handler, err := c.StreamingRpcHandler("Allocations.Exec") 653 require.Nil(err) 654 655 // Create a pipe 656 p1, p2 := net.Pipe() 657 defer p1.Close() 658 defer p2.Close() 659 660 errCh := make(chan error) 661 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 662 663 // Start the handler 664 go handler(p2) 665 go decodeFrames(t, p1, frames, errCh) 666 667 // Send the request 668 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 669 require.Nil(encoder.Encode(req)) 670 671 timeout := time.After(3 * time.Second) 672 673 select { 674 case <-timeout: 675 require.FailNow("timed out") 676 case err := <-errCh: 677 require.True(nstructs.IsErrUnknownAllocation(err), "expected no allocation error but found: %v", err) 678 case f := <-frames: 679 require.Fail("received unexpected frame", "frame: %#v", f) 680 } 681} 682 683func TestAlloc_ExecStreaming_DisableRemoteExec(t *testing.T) { 684 t.Parallel() 685 require := require.New(t) 686 687 // Start a server and client 688 s, cleanupS := nomad.TestServer(t, nil) 689 defer cleanupS() 690 testutil.WaitForLeader(t, s.RPC) 691 692 c, cleanupC := TestClient(t, func(c *config.Config) { 693 c.Servers = []string{s.GetConfig().RPCAddr.String()} 694 c.DisableRemoteExec = true 695 }) 696 defer cleanupC() 697 698 // Make the request 699 req := &cstructs.AllocExecRequest{ 700 AllocID: uuid.Generate(), 701 Task: "testtask", 702 Tty: true, 703 Cmd: []string{"placeholder command"}, 704 QueryOptions: nstructs.QueryOptions{Region: "global"}, 705 } 706 707 // Get the handler 708 handler, err := c.StreamingRpcHandler("Allocations.Exec") 709 require.Nil(err) 710 711 // Create a pipe 712 p1, p2 := net.Pipe() 713 defer p1.Close() 714 defer p2.Close() 715 716 errCh := make(chan error) 717 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 718 719 // Start the handler 720 go handler(p2) 721 go decodeFrames(t, p1, frames, errCh) 722 723 // Send the request 724 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 725 require.Nil(encoder.Encode(req)) 726 727 timeout := time.After(3 * time.Second) 728 729 select { 730 case <-timeout: 731 require.FailNow("timed out") 732 case err := <-errCh: 733 require.True(nstructs.IsErrPermissionDenied(err), "expected permission denied error but found: %v", err) 734 case f := <-frames: 735 require.Fail("received unexpected frame", "frame: %#v", f) 736 } 737} 738 739func TestAlloc_ExecStreaming_ACL_Basic(t *testing.T) { 740 t.Parallel() 741 742 // Start a server and client 743 s, root, cleanupS := nomad.TestACLServer(t, nil) 744 defer cleanupS() 745 testutil.WaitForLeader(t, s.RPC) 746 747 client, cleanupC := TestClient(t, func(c *config.Config) { 748 c.ACLEnabled = true 749 c.Servers = []string{s.GetConfig().RPCAddr.String()} 750 }) 751 defer cleanupC() 752 753 // Create a bad token 754 policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny}) 755 tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) 756 757 policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", 758 []string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityReadFS}) 759 tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) 760 761 job := mock.BatchJob() 762 job.TaskGroups[0].Count = 1 763 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 764 "run_for": "20s", 765 } 766 767 // Wait for client to be running job 768 alloc := testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID)[0] 769 770 cases := []struct { 771 Name string 772 Token string 773 ExpectedError string 774 }{ 775 { 776 Name: "bad token", 777 Token: tokenBad.SecretID, 778 ExpectedError: structs.ErrPermissionDenied.Error(), 779 }, 780 { 781 Name: "good token", 782 Token: tokenGood.SecretID, 783 ExpectedError: "task not found", 784 }, 785 { 786 Name: "root token", 787 Token: root.SecretID, 788 ExpectedError: "task not found", 789 }, 790 } 791 792 for _, c := range cases { 793 t.Run(c.Name, func(t *testing.T) { 794 795 // Make the request 796 req := &cstructs.AllocExecRequest{ 797 AllocID: alloc.ID, 798 Task: "testtask", 799 Tty: true, 800 Cmd: []string{"placeholder command"}, 801 QueryOptions: nstructs.QueryOptions{ 802 Region: "global", 803 AuthToken: c.Token, 804 Namespace: nstructs.DefaultNamespace, 805 }, 806 } 807 808 // Get the handler 809 handler, err := client.StreamingRpcHandler("Allocations.Exec") 810 require.Nil(t, err) 811 812 // Create a pipe 813 p1, p2 := net.Pipe() 814 defer p1.Close() 815 defer p2.Close() 816 817 errCh := make(chan error) 818 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 819 820 // Start the handler 821 go handler(p2) 822 go decodeFrames(t, p1, frames, errCh) 823 824 // Send the request 825 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 826 require.Nil(t, encoder.Encode(req)) 827 828 select { 829 case <-time.After(3 * time.Second): 830 require.FailNow(t, "timed out") 831 case err := <-errCh: 832 require.Contains(t, err.Error(), c.ExpectedError) 833 case f := <-frames: 834 require.Fail(t, "received unexpected frame", "frame: %#v", f) 835 } 836 }) 837 } 838} 839 840// TestAlloc_ExecStreaming_ACL_WithIsolation_Image asserts that token only needs 841// alloc-exec acl policy when image isolation is used 842func TestAlloc_ExecStreaming_ACL_WithIsolation_Image(t *testing.T) { 843 t.Parallel() 844 isolation := drivers.FSIsolationImage 845 846 // Start a server and client 847 s, root, cleanupS := nomad.TestACLServer(t, nil) 848 defer cleanupS() 849 testutil.WaitForLeader(t, s.RPC) 850 851 client, cleanupC := TestClient(t, func(c *config.Config) { 852 c.ACLEnabled = true 853 c.Servers = []string{s.GetConfig().RPCAddr.String()} 854 855 pluginConfig := []*nconfig.PluginConfig{ 856 { 857 Name: "mock_driver", 858 Config: map[string]interface{}{ 859 "fs_isolation": string(isolation), 860 }, 861 }, 862 } 863 864 c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", map[string]string{}, pluginConfig) 865 }) 866 defer cleanupC() 867 868 // Create a bad token 869 policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny}) 870 tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) 871 872 policyAllocExec := mock.NamespacePolicy(structs.DefaultNamespace, "", 873 []string{acl.NamespaceCapabilityAllocExec}) 874 tokenAllocExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyAllocExec) 875 876 policyAllocNodeExec := mock.NamespacePolicy(structs.DefaultNamespace, "", 877 []string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityAllocNodeExec}) 878 tokenAllocNodeExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyAllocNodeExec) 879 880 job := mock.BatchJob() 881 job.TaskGroups[0].Count = 1 882 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 883 "run_for": "20s", 884 "exec_command": map[string]interface{}{ 885 "run_for": "1ms", 886 "stdout_string": "some output", 887 }, 888 } 889 890 // Wait for client to be running job 891 testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID) 892 893 // Get the allocation ID 894 args := nstructs.AllocListRequest{} 895 args.Region = "global" 896 args.AuthToken = root.SecretID 897 args.Namespace = nstructs.DefaultNamespace 898 resp := nstructs.AllocListResponse{} 899 require.NoError(t, s.RPC("Alloc.List", &args, &resp)) 900 require.Len(t, resp.Allocations, 1) 901 allocID := resp.Allocations[0].ID 902 903 cases := []struct { 904 Name string 905 Token string 906 ExpectedError string 907 }{ 908 { 909 Name: "bad token", 910 Token: tokenBad.SecretID, 911 ExpectedError: structs.ErrPermissionDenied.Error(), 912 }, 913 { 914 Name: "alloc-exec token", 915 Token: tokenAllocExec.SecretID, 916 ExpectedError: "", 917 }, 918 { 919 Name: "alloc-node-exec token", 920 Token: tokenAllocNodeExec.SecretID, 921 ExpectedError: "", 922 }, 923 { 924 Name: "root token", 925 Token: root.SecretID, 926 ExpectedError: "", 927 }, 928 } 929 930 for _, c := range cases { 931 t.Run(c.Name, func(t *testing.T) { 932 933 // Make the request 934 req := &cstructs.AllocExecRequest{ 935 AllocID: allocID, 936 Task: job.TaskGroups[0].Tasks[0].Name, 937 Tty: true, 938 Cmd: []string{"placeholder command"}, 939 QueryOptions: nstructs.QueryOptions{ 940 Region: "global", 941 AuthToken: c.Token, 942 Namespace: nstructs.DefaultNamespace, 943 }, 944 } 945 946 // Get the handler 947 handler, err := client.StreamingRpcHandler("Allocations.Exec") 948 require.Nil(t, err) 949 950 // Create a pipe 951 p1, p2 := net.Pipe() 952 defer p1.Close() 953 defer p2.Close() 954 955 errCh := make(chan error) 956 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 957 958 // Start the handler 959 go handler(p2) 960 go decodeFrames(t, p1, frames, errCh) 961 962 // Send the request 963 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 964 require.Nil(t, encoder.Encode(req)) 965 966 select { 967 case <-time.After(3 * time.Second): 968 case err := <-errCh: 969 if c.ExpectedError == "" { 970 require.NoError(t, err) 971 } else { 972 require.Contains(t, err.Error(), c.ExpectedError) 973 } 974 case f := <-frames: 975 // we are good if we don't expect an error 976 if c.ExpectedError != "" { 977 require.Fail(t, "unexpected frame", "frame: %#v", f) 978 } 979 } 980 }) 981 } 982} 983 984// TestAlloc_ExecStreaming_ACL_WithIsolation_Chroot asserts that token only needs 985// alloc-exec acl policy when chroot isolation is used 986func TestAlloc_ExecStreaming_ACL_WithIsolation_Chroot(t *testing.T) { 987 t.Parallel() 988 989 if runtime.GOOS != "linux" || unix.Geteuid() != 0 { 990 t.Skip("chroot isolation requires linux root") 991 } 992 993 isolation := drivers.FSIsolationChroot 994 995 // Start a server and client 996 s, root, cleanupS := nomad.TestACLServer(t, nil) 997 defer cleanupS() 998 testutil.WaitForLeader(t, s.RPC) 999 1000 client, cleanup := TestClient(t, func(c *config.Config) { 1001 c.ACLEnabled = true 1002 c.Servers = []string{s.GetConfig().RPCAddr.String()} 1003 1004 pluginConfig := []*nconfig.PluginConfig{ 1005 { 1006 Name: "mock_driver", 1007 Config: map[string]interface{}{ 1008 "fs_isolation": string(isolation), 1009 }, 1010 }, 1011 } 1012 1013 c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", map[string]string{}, pluginConfig) 1014 }) 1015 defer cleanup() 1016 1017 // Create a bad token 1018 policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny}) 1019 tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) 1020 1021 policyAllocExec := mock.NamespacePolicy(structs.DefaultNamespace, "", 1022 []string{acl.NamespaceCapabilityAllocExec}) 1023 tokenAllocExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-exec", policyAllocExec) 1024 1025 policyAllocNodeExec := mock.NamespacePolicy(structs.DefaultNamespace, "", 1026 []string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityAllocNodeExec}) 1027 tokenAllocNodeExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-node-exec", policyAllocNodeExec) 1028 1029 job := mock.BatchJob() 1030 job.TaskGroups[0].Count = 1 1031 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 1032 "run_for": "20s", 1033 "exec_command": map[string]interface{}{ 1034 "run_for": "1ms", 1035 "stdout_string": "some output", 1036 }, 1037 } 1038 1039 // Wait for client to be running job 1040 testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID) 1041 1042 // Get the allocation ID 1043 args := nstructs.AllocListRequest{} 1044 args.Region = "global" 1045 args.AuthToken = root.SecretID 1046 args.Namespace = nstructs.DefaultNamespace 1047 resp := nstructs.AllocListResponse{} 1048 require.NoError(t, s.RPC("Alloc.List", &args, &resp)) 1049 require.Len(t, resp.Allocations, 1) 1050 allocID := resp.Allocations[0].ID 1051 1052 cases := []struct { 1053 Name string 1054 Token string 1055 ExpectedError string 1056 }{ 1057 { 1058 Name: "bad token", 1059 Token: tokenBad.SecretID, 1060 ExpectedError: structs.ErrPermissionDenied.Error(), 1061 }, 1062 { 1063 Name: "alloc-exec token", 1064 Token: tokenAllocExec.SecretID, 1065 ExpectedError: "", 1066 }, 1067 { 1068 Name: "alloc-node-exec token", 1069 Token: tokenAllocNodeExec.SecretID, 1070 ExpectedError: "", 1071 }, 1072 { 1073 Name: "root token", 1074 Token: root.SecretID, 1075 ExpectedError: "", 1076 }, 1077 } 1078 1079 for _, c := range cases { 1080 t.Run(c.Name, func(t *testing.T) { 1081 1082 // Make the request 1083 req := &cstructs.AllocExecRequest{ 1084 AllocID: allocID, 1085 Task: job.TaskGroups[0].Tasks[0].Name, 1086 Tty: true, 1087 Cmd: []string{"placeholder command"}, 1088 QueryOptions: nstructs.QueryOptions{ 1089 Region: "global", 1090 AuthToken: c.Token, 1091 Namespace: nstructs.DefaultNamespace, 1092 }, 1093 } 1094 1095 // Get the handler 1096 handler, err := client.StreamingRpcHandler("Allocations.Exec") 1097 require.Nil(t, err) 1098 1099 // Create a pipe 1100 p1, p2 := net.Pipe() 1101 defer p1.Close() 1102 defer p2.Close() 1103 1104 errCh := make(chan error) 1105 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 1106 1107 // Start the handler 1108 go handler(p2) 1109 go decodeFrames(t, p1, frames, errCh) 1110 1111 // Send the request 1112 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 1113 require.Nil(t, encoder.Encode(req)) 1114 1115 select { 1116 case <-time.After(3 * time.Second): 1117 case err := <-errCh: 1118 if c.ExpectedError == "" { 1119 require.NoError(t, err) 1120 } else { 1121 require.Contains(t, err.Error(), c.ExpectedError) 1122 } 1123 case f := <-frames: 1124 // we are good if we don't expect an error 1125 if c.ExpectedError != "" { 1126 require.Fail(t, "unexpected frame", "frame: %#v", f) 1127 } 1128 } 1129 }) 1130 } 1131} 1132 1133// TestAlloc_ExecStreaming_ACL_WithIsolation_None asserts that token needs 1134// alloc-node-exec acl policy as well when no isolation is used 1135func TestAlloc_ExecStreaming_ACL_WithIsolation_None(t *testing.T) { 1136 t.Parallel() 1137 isolation := drivers.FSIsolationNone 1138 1139 // Start a server and client 1140 s, root, cleanupS := nomad.TestACLServer(t, nil) 1141 defer cleanupS() 1142 testutil.WaitForLeader(t, s.RPC) 1143 1144 client, cleanup := TestClient(t, func(c *config.Config) { 1145 c.ACLEnabled = true 1146 c.Servers = []string{s.GetConfig().RPCAddr.String()} 1147 1148 pluginConfig := []*nconfig.PluginConfig{ 1149 { 1150 Name: "mock_driver", 1151 Config: map[string]interface{}{ 1152 "fs_isolation": string(isolation), 1153 }, 1154 }, 1155 } 1156 1157 c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", map[string]string{}, pluginConfig) 1158 }) 1159 defer cleanup() 1160 1161 // Create a bad token 1162 policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityDeny}) 1163 tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) 1164 1165 policyAllocExec := mock.NamespacePolicy(structs.DefaultNamespace, "", 1166 []string{acl.NamespaceCapabilityAllocExec}) 1167 tokenAllocExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-exec", policyAllocExec) 1168 1169 policyAllocNodeExec := mock.NamespacePolicy(structs.DefaultNamespace, "", 1170 []string{acl.NamespaceCapabilityAllocExec, acl.NamespaceCapabilityAllocNodeExec}) 1171 tokenAllocNodeExec := mock.CreatePolicyAndToken(t, s.State(), 1009, "alloc-node-exec", policyAllocNodeExec) 1172 1173 job := mock.BatchJob() 1174 job.TaskGroups[0].Count = 1 1175 job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ 1176 "run_for": "20s", 1177 "exec_command": map[string]interface{}{ 1178 "run_for": "1ms", 1179 "stdout_string": "some output", 1180 }, 1181 } 1182 1183 // Wait for client to be running job 1184 testutil.WaitForRunningWithToken(t, s.RPC, job, root.SecretID) 1185 1186 // Get the allocation ID 1187 args := nstructs.AllocListRequest{} 1188 args.Region = "global" 1189 args.AuthToken = root.SecretID 1190 args.Namespace = nstructs.DefaultNamespace 1191 resp := nstructs.AllocListResponse{} 1192 require.NoError(t, s.RPC("Alloc.List", &args, &resp)) 1193 require.Len(t, resp.Allocations, 1) 1194 allocID := resp.Allocations[0].ID 1195 1196 cases := []struct { 1197 Name string 1198 Token string 1199 ExpectedError string 1200 }{ 1201 { 1202 Name: "bad token", 1203 Token: tokenBad.SecretID, 1204 ExpectedError: structs.ErrPermissionDenied.Error(), 1205 }, 1206 { 1207 Name: "alloc-exec token", 1208 Token: tokenAllocExec.SecretID, 1209 ExpectedError: structs.ErrPermissionDenied.Error(), 1210 }, 1211 { 1212 Name: "alloc-node-exec token", 1213 Token: tokenAllocNodeExec.SecretID, 1214 ExpectedError: "", 1215 }, 1216 { 1217 Name: "root token", 1218 Token: root.SecretID, 1219 ExpectedError: "", 1220 }, 1221 } 1222 1223 for _, c := range cases { 1224 t.Run(c.Name, func(t *testing.T) { 1225 1226 // Make the request 1227 req := &cstructs.AllocExecRequest{ 1228 AllocID: allocID, 1229 Task: job.TaskGroups[0].Tasks[0].Name, 1230 Tty: true, 1231 Cmd: []string{"placeholder command"}, 1232 QueryOptions: nstructs.QueryOptions{ 1233 Region: "global", 1234 AuthToken: c.Token, 1235 Namespace: nstructs.DefaultNamespace, 1236 }, 1237 } 1238 1239 // Get the handler 1240 handler, err := client.StreamingRpcHandler("Allocations.Exec") 1241 require.Nil(t, err) 1242 1243 // Create a pipe 1244 p1, p2 := net.Pipe() 1245 defer p1.Close() 1246 defer p2.Close() 1247 1248 errCh := make(chan error) 1249 frames := make(chan *drivers.ExecTaskStreamingResponseMsg) 1250 1251 // Start the handler 1252 go handler(p2) 1253 go decodeFrames(t, p1, frames, errCh) 1254 1255 // Send the request 1256 encoder := codec.NewEncoder(p1, nstructs.MsgpackHandle) 1257 require.Nil(t, encoder.Encode(req)) 1258 1259 select { 1260 case <-time.After(3 * time.Second): 1261 case err := <-errCh: 1262 if c.ExpectedError == "" { 1263 require.NoError(t, err) 1264 } else { 1265 require.Contains(t, err.Error(), c.ExpectedError) 1266 } 1267 case f := <-frames: 1268 // we are good if we don't expect an error 1269 if c.ExpectedError != "" { 1270 require.Fail(t, "unexpected frame", "frame: %#v", f) 1271 } 1272 } 1273 }) 1274 } 1275} 1276 1277func decodeFrames(t *testing.T, p1 net.Conn, frames chan<- *drivers.ExecTaskStreamingResponseMsg, errCh chan<- error) { 1278 // Start the decoder 1279 decoder := codec.NewDecoder(p1, nstructs.MsgpackHandle) 1280 1281 for { 1282 var msg cstructs.StreamErrWrapper 1283 if err := decoder.Decode(&msg); err != nil { 1284 if err == io.EOF || strings.Contains(err.Error(), "closed") { 1285 return 1286 } 1287 t.Logf("received error decoding: %#v", err) 1288 1289 errCh <- fmt.Errorf("error decoding: %v", err) 1290 return 1291 } 1292 1293 if msg.Error != nil { 1294 errCh <- msg.Error 1295 continue 1296 } 1297 1298 var frame drivers.ExecTaskStreamingResponseMsg 1299 if err := json.Unmarshal(msg.Payload, &frame); err != nil { 1300 errCh <- err 1301 return 1302 } 1303 t.Logf("received message: %#v", msg) 1304 frames <- &frame 1305 } 1306} 1307