1package nomad 2 3import ( 4 "reflect" 5 "testing" 6 "time" 7 8 msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" 9 "github.com/stretchr/testify/assert" 10 "github.com/stretchr/testify/require" 11 12 "github.com/hashicorp/nomad/acl" 13 "github.com/hashicorp/nomad/helper" 14 "github.com/hashicorp/nomad/helper/uuid" 15 "github.com/hashicorp/nomad/nomad/mock" 16 "github.com/hashicorp/nomad/nomad/structs" 17 "github.com/hashicorp/nomad/testutil" 18) 19 20func TestAllocEndpoint_List(t *testing.T) { 21 t.Parallel() 22 23 s1, cleanupS1 := TestServer(t, nil) 24 defer cleanupS1() 25 26 codec := rpcClient(t, s1) 27 testutil.WaitForLeader(t, s1.RPC) 28 29 // Create the register request 30 alloc := mock.Alloc() 31 summary := mock.JobSummary(alloc.JobID) 32 state := s1.fsm.State() 33 34 if err := state.UpsertJobSummary(999, summary); err != nil { 35 t.Fatalf("err: %v", err) 36 } 37 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}); err != nil { 38 t.Fatalf("err: %v", err) 39 } 40 41 // Lookup the allocations 42 get := &structs.AllocListRequest{ 43 QueryOptions: structs.QueryOptions{ 44 Region: "global", 45 Namespace: structs.DefaultNamespace, 46 }, 47 } 48 var resp structs.AllocListResponse 49 if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp); err != nil { 50 t.Fatalf("err: %v", err) 51 } 52 if resp.Index != 1000 { 53 t.Fatalf("Bad index: %d %d", resp.Index, 1000) 54 } 55 56 if len(resp.Allocations) != 1 { 57 t.Fatalf("bad: %#v", resp.Allocations) 58 } 59 if resp.Allocations[0].ID != alloc.ID { 60 t.Fatalf("bad: %#v", resp.Allocations[0]) 61 } 62 63 // Lookup the allocations by prefix 64 get = &structs.AllocListRequest{ 65 QueryOptions: structs.QueryOptions{ 66 Region: "global", 67 Namespace: structs.DefaultNamespace, 68 Prefix: alloc.ID[:4], 69 }, 70 } 71 72 var resp2 structs.AllocListResponse 73 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp2)) 74 require.Equal(t, uint64(1000), resp2.Index) 75 require.Len(t, resp2.Allocations, 1) 76 require.Equal(t, alloc.ID, resp2.Allocations[0].ID) 77} 78 79func TestAllocEndpoint_List_Fields(t *testing.T) { 80 t.Parallel() 81 82 s1, cleanupS1 := TestServer(t, nil) 83 defer cleanupS1() 84 85 codec := rpcClient(t, s1) 86 testutil.WaitForLeader(t, s1.RPC) 87 88 // Create a running alloc 89 alloc := mock.Alloc() 90 alloc.ClientStatus = structs.AllocClientStatusRunning 91 alloc.TaskStates = map[string]*structs.TaskState{ 92 "web": { 93 State: structs.TaskStateRunning, 94 StartedAt: time.Now(), 95 }, 96 } 97 summary := mock.JobSummary(alloc.JobID) 98 state := s1.fsm.State() 99 100 require.NoError(t, state.UpsertJobSummary(999, summary)) 101 require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) 102 103 cases := []struct { 104 Name string 105 Fields *structs.AllocStubFields 106 Assert func(t *testing.T, allocs []*structs.AllocListStub) 107 }{ 108 { 109 Name: "None", 110 Fields: nil, 111 Assert: func(t *testing.T, allocs []*structs.AllocListStub) { 112 require.Nil(t, allocs[0].AllocatedResources) 113 require.Len(t, allocs[0].TaskStates, 1) 114 }, 115 }, 116 { 117 Name: "Default", 118 Fields: structs.NewAllocStubFields(), 119 Assert: func(t *testing.T, allocs []*structs.AllocListStub) { 120 require.Nil(t, allocs[0].AllocatedResources) 121 require.Len(t, allocs[0].TaskStates, 1) 122 }, 123 }, 124 { 125 Name: "Resources", 126 Fields: &structs.AllocStubFields{ 127 Resources: true, 128 TaskStates: false, 129 }, 130 Assert: func(t *testing.T, allocs []*structs.AllocListStub) { 131 require.NotNil(t, allocs[0].AllocatedResources) 132 require.Len(t, allocs[0].TaskStates, 0) 133 }, 134 }, 135 { 136 Name: "NoTaskStates", 137 Fields: &structs.AllocStubFields{ 138 Resources: false, 139 TaskStates: false, 140 }, 141 Assert: func(t *testing.T, allocs []*structs.AllocListStub) { 142 require.Nil(t, allocs[0].AllocatedResources) 143 require.Len(t, allocs[0].TaskStates, 0) 144 }, 145 }, 146 { 147 Name: "Both", 148 Fields: &structs.AllocStubFields{ 149 Resources: true, 150 TaskStates: true, 151 }, 152 Assert: func(t *testing.T, allocs []*structs.AllocListStub) { 153 require.NotNil(t, allocs[0].AllocatedResources) 154 require.Len(t, allocs[0].TaskStates, 1) 155 }, 156 }, 157 } 158 159 for i := range cases { 160 tc := cases[i] 161 t.Run(tc.Name, func(t *testing.T) { 162 get := &structs.AllocListRequest{ 163 QueryOptions: structs.QueryOptions{ 164 Region: "global", 165 Namespace: structs.DefaultNamespace, 166 }, 167 Fields: tc.Fields, 168 } 169 var resp structs.AllocListResponse 170 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) 171 require.Equal(t, uint64(1000), resp.Index) 172 require.Len(t, resp.Allocations, 1) 173 require.Equal(t, alloc.ID, resp.Allocations[0].ID) 174 tc.Assert(t, resp.Allocations) 175 }) 176 } 177 178} 179 180func TestAllocEndpoint_List_ACL(t *testing.T) { 181 t.Parallel() 182 183 s1, root, cleanupS1 := TestACLServer(t, nil) 184 defer cleanupS1() 185 codec := rpcClient(t, s1) 186 testutil.WaitForLeader(t, s1.RPC) 187 assert := assert.New(t) 188 189 // Create the alloc 190 alloc := mock.Alloc() 191 allocs := []*structs.Allocation{alloc} 192 summary := mock.JobSummary(alloc.JobID) 193 state := s1.fsm.State() 194 195 assert.Nil(state.UpsertJobSummary(999, summary), "UpsertJobSummary") 196 assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, allocs), "UpsertAllocs") 197 198 stubAllocs := []*structs.AllocListStub{alloc.Stub(nil)} 199 stubAllocs[0].CreateIndex = 1000 200 stubAllocs[0].ModifyIndex = 1000 201 202 // Create the namespace policy and tokens 203 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", 204 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 205 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", 206 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) 207 208 // Lookup the allocs without a token and expect failure 209 get := &structs.AllocListRequest{ 210 QueryOptions: structs.QueryOptions{ 211 Region: "global", 212 Namespace: structs.DefaultNamespace, 213 }, 214 } 215 var resp structs.AllocListResponse 216 assert.NotNil(msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp), "RPC") 217 218 // Try with a valid token 219 get.AuthToken = validToken.SecretID 220 assert.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp), "RPC") 221 assert.EqualValues(resp.Index, 1000, "resp.Index") 222 assert.Equal(stubAllocs, resp.Allocations, "Returned alloc list not equal") 223 224 // Try with a invalid token 225 get.AuthToken = invalidToken.SecretID 226 err := msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp) 227 assert.NotNil(err, "RPC") 228 assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) 229 230 // Try with a root token 231 get.AuthToken = root.SecretID 232 assert.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp), "RPC") 233 assert.EqualValues(resp.Index, 1000, "resp.Index") 234 assert.Equal(stubAllocs, resp.Allocations, "Returned alloc list not equal") 235} 236 237func TestAllocEndpoint_List_Blocking(t *testing.T) { 238 t.Parallel() 239 240 s1, cleanupS1 := TestServer(t, nil) 241 defer cleanupS1() 242 state := s1.fsm.State() 243 codec := rpcClient(t, s1) 244 testutil.WaitForLeader(t, s1.RPC) 245 246 // Create the alloc 247 alloc := mock.Alloc() 248 249 summary := mock.JobSummary(alloc.JobID) 250 if err := state.UpsertJobSummary(1, summary); err != nil { 251 t.Fatalf("err: %v", err) 252 } 253 // Upsert alloc triggers watches 254 time.AfterFunc(100*time.Millisecond, func() { 255 if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 2, []*structs.Allocation{alloc}); err != nil { 256 t.Fatalf("err: %v", err) 257 } 258 }) 259 260 req := &structs.AllocListRequest{ 261 QueryOptions: structs.QueryOptions{ 262 Region: "global", 263 Namespace: structs.DefaultNamespace, 264 MinQueryIndex: 1, 265 }, 266 } 267 start := time.Now() 268 var resp structs.AllocListResponse 269 if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp); err != nil { 270 t.Fatalf("err: %v", err) 271 } 272 273 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 274 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 275 } 276 if resp.Index != 2 { 277 t.Fatalf("Bad index: %d %d", resp.Index, 2) 278 } 279 if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID { 280 t.Fatalf("bad: %#v", resp.Allocations) 281 } 282 283 // Client updates trigger watches 284 alloc2 := mock.Alloc() 285 alloc2.ID = alloc.ID 286 alloc2.ClientStatus = structs.AllocClientStatusRunning 287 time.AfterFunc(100*time.Millisecond, func() { 288 state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID)) 289 if err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 4, []*structs.Allocation{alloc2}); err != nil { 290 t.Fatalf("err: %v", err) 291 } 292 }) 293 294 req.MinQueryIndex = 3 295 start = time.Now() 296 var resp2 structs.AllocListResponse 297 if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil { 298 t.Fatalf("err: %v", err) 299 } 300 301 if elapsed := time.Since(start); elapsed < 100*time.Millisecond { 302 t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) 303 } 304 if resp2.Index != 4 { 305 t.Fatalf("Bad index: %d %d", resp2.Index, 4) 306 } 307 if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID || 308 resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning { 309 t.Fatalf("bad: %#v", resp2.Allocations) 310 } 311} 312 313// TestAllocEndpoint_List_AllNamespaces_OSS asserts that server 314// returns all allocations across namespaces. 315func TestAllocEndpoint_List_AllNamespaces_OSS(t *testing.T) { 316 t.Parallel() 317 318 s1, cleanupS1 := TestServer(t, nil) 319 defer cleanupS1() 320 codec := rpcClient(t, s1) 321 testutil.WaitForLeader(t, s1.RPC) 322 state := s1.fsm.State() 323 324 // two namespaces 325 ns1 := mock.Namespace() 326 ns2 := mock.Namespace() 327 require.NoError(t, state.UpsertNamespaces(900, []*structs.Namespace{ns1, ns2})) 328 329 // Create the allocations 330 alloc1 := mock.Alloc() 331 alloc1.ID = "a" + alloc1.ID[1:] 332 alloc1.Namespace = ns1.Name 333 alloc2 := mock.Alloc() 334 alloc2.ID = "b" + alloc2.ID[1:] 335 alloc2.Namespace = ns2.Name 336 summary1 := mock.JobSummary(alloc1.JobID) 337 summary2 := mock.JobSummary(alloc2.JobID) 338 339 require.NoError(t, state.UpsertJobSummary(999, summary1)) 340 require.NoError(t, state.UpsertJobSummary(999, summary2)) 341 require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) 342 343 t.Run("looking up all allocations", func(t *testing.T) { 344 get := &structs.AllocListRequest{ 345 QueryOptions: structs.QueryOptions{ 346 Region: "global", 347 Namespace: "*", 348 }, 349 } 350 var resp structs.AllocListResponse 351 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) 352 require.Equal(t, uint64(1000), resp.Index) 353 require.Len(t, resp.Allocations, 2) 354 require.ElementsMatch(t, 355 []string{resp.Allocations[0].ID, resp.Allocations[1].ID}, 356 []string{alloc1.ID, alloc2.ID}) 357 }) 358 359 t.Run("looking up allocations with prefix", func(t *testing.T) { 360 get := &structs.AllocListRequest{ 361 QueryOptions: structs.QueryOptions{ 362 Region: "global", 363 Namespace: "*", 364 // allocations were constructed above to have non-matching prefix 365 Prefix: alloc1.ID[:4], 366 }, 367 } 368 var resp structs.AllocListResponse 369 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) 370 require.Equal(t, uint64(1000), resp.Index) 371 require.Len(t, resp.Allocations, 1) 372 require.Equal(t, alloc1.ID, resp.Allocations[0].ID) 373 require.Equal(t, alloc1.Namespace, resp.Allocations[0].Namespace) 374 }) 375 376 t.Run("looking up allocations with mismatch prefix", func(t *testing.T) { 377 // allocations were constructed above to have prefix starting with "a" or "b" 378 badPrefix := "cc" 379 380 get := &structs.AllocListRequest{ 381 QueryOptions: structs.QueryOptions{ 382 Region: "global", 383 Namespace: "*", 384 Prefix: badPrefix, 385 }, 386 } 387 var resp structs.AllocListResponse 388 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp)) 389 require.Equal(t, uint64(1000), resp.Index) 390 require.Empty(t, resp.Allocations) 391 }) 392} 393 394func TestAllocEndpoint_GetAlloc(t *testing.T) { 395 t.Parallel() 396 397 s1, cleanupS1 := TestServer(t, nil) 398 defer cleanupS1() 399 codec := rpcClient(t, s1) 400 testutil.WaitForLeader(t, s1.RPC) 401 402 // Create the register request 403 prevAllocID := uuid.Generate() 404 alloc := mock.Alloc() 405 alloc.RescheduleTracker = &structs.RescheduleTracker{ 406 Events: []*structs.RescheduleEvent{ 407 {RescheduleTime: time.Now().UTC().UnixNano(), PrevNodeID: "boom", PrevAllocID: prevAllocID}, 408 }, 409 } 410 state := s1.fsm.State() 411 state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID)) 412 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc}) 413 if err != nil { 414 t.Fatalf("err: %v", err) 415 } 416 417 // Lookup the alloc 418 get := &structs.AllocSpecificRequest{ 419 AllocID: alloc.ID, 420 QueryOptions: structs.QueryOptions{Region: "global"}, 421 } 422 var resp structs.SingleAllocResponse 423 if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp); err != nil { 424 t.Fatalf("err: %v", err) 425 } 426 if resp.Index != 1000 { 427 t.Fatalf("Bad index: %d %d", resp.Index, 1000) 428 } 429 430 if !reflect.DeepEqual(alloc, resp.Alloc) { 431 t.Fatalf("bad: %#v", resp.Alloc) 432 } 433} 434 435func TestAllocEndpoint_GetAlloc_ACL(t *testing.T) { 436 t.Parallel() 437 438 s1, root, cleanupS1 := TestACLServer(t, nil) 439 defer cleanupS1() 440 codec := rpcClient(t, s1) 441 testutil.WaitForLeader(t, s1.RPC) 442 assert := assert.New(t) 443 444 // Create the alloc 445 alloc := mock.Alloc() 446 allocs := []*structs.Allocation{alloc} 447 summary := mock.JobSummary(alloc.JobID) 448 state := s1.fsm.State() 449 450 assert.Nil(state.UpsertJobSummary(999, summary), "UpsertJobSummary") 451 assert.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, allocs), "UpsertAllocs") 452 453 // Create the namespace policy and tokens 454 validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", 455 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) 456 invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", 457 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) 458 459 getReq := func() *structs.AllocSpecificRequest { 460 return &structs.AllocSpecificRequest{ 461 AllocID: alloc.ID, 462 QueryOptions: structs.QueryOptions{ 463 Region: "global", 464 }, 465 } 466 } 467 468 cases := []struct { 469 Name string 470 F func(t *testing.T) 471 }{ 472 // Lookup the alloc without a token and expect failure 473 { 474 Name: "no-token", 475 F: func(t *testing.T) { 476 var resp structs.SingleAllocResponse 477 err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", getReq(), &resp) 478 require.True(t, structs.IsErrUnknownAllocation(err), "expected unknown alloc but found: %v", err) 479 }, 480 }, 481 482 // Try with a valid ACL token 483 { 484 Name: "valid-token", 485 F: func(t *testing.T) { 486 get := getReq() 487 get.AuthToken = validToken.SecretID 488 get.AllocID = alloc.ID 489 var resp structs.SingleAllocResponse 490 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp), "RPC") 491 require.EqualValues(t, resp.Index, 1000, "resp.Index") 492 require.Equal(t, alloc, resp.Alloc, "Returned alloc not equal") 493 }, 494 }, 495 496 // Try with a valid Node.SecretID 497 { 498 Name: "valid-node-secret", 499 F: func(t *testing.T) { 500 node := mock.Node() 501 assert.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 1005, node)) 502 get := getReq() 503 get.AuthToken = node.SecretID 504 get.AllocID = alloc.ID 505 var resp structs.SingleAllocResponse 506 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp), "RPC") 507 require.EqualValues(t, resp.Index, 1000, "resp.Index") 508 require.Equal(t, alloc, resp.Alloc, "Returned alloc not equal") 509 }, 510 }, 511 512 // Try with a invalid token 513 { 514 Name: "invalid-token", 515 F: func(t *testing.T) { 516 get := getReq() 517 get.AuthToken = invalidToken.SecretID 518 get.AllocID = alloc.ID 519 var resp structs.SingleAllocResponse 520 err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp) 521 require.NotNil(t, err, "RPC") 522 require.True(t, structs.IsErrUnknownAllocation(err), "expected unknown alloc but found: %v", err) 523 }, 524 }, 525 526 // Try with a root token 527 { 528 Name: "root-token", 529 F: func(t *testing.T) { 530 get := getReq() 531 get.AuthToken = root.SecretID 532 get.AllocID = alloc.ID 533 var resp structs.SingleAllocResponse 534 require.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp), "RPC") 535 require.EqualValues(t, resp.Index, 1000, "resp.Index") 536 require.Equal(t, alloc, resp.Alloc, "Returned alloc not equal") 537 }, 538 }, 539 } 540 541 for _, tc := range cases { 542 t.Run(tc.Name, tc.F) 543 } 544} 545 546func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { 547 t.Parallel() 548 549 s1, cleanupS1 := TestServer(t, nil) 550 defer cleanupS1() 551 state := s1.fsm.State() 552 codec := rpcClient(t, s1) 553 testutil.WaitForLeader(t, s1.RPC) 554 555 // Create the allocs 556 alloc1 := mock.Alloc() 557 alloc2 := mock.Alloc() 558 559 // First create an unrelated alloc 560 time.AfterFunc(100*time.Millisecond, func() { 561 state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) 562 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc1}) 563 if err != nil { 564 t.Fatalf("err: %v", err) 565 } 566 }) 567 568 // Create the alloc we are watching later 569 time.AfterFunc(200*time.Millisecond, func() { 570 state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID)) 571 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2}) 572 if err != nil { 573 t.Fatalf("err: %v", err) 574 } 575 }) 576 577 // Lookup the allocs 578 get := &structs.AllocSpecificRequest{ 579 AllocID: alloc2.ID, 580 QueryOptions: structs.QueryOptions{ 581 Region: "global", 582 MinQueryIndex: 150, 583 }, 584 } 585 var resp structs.SingleAllocResponse 586 start := time.Now() 587 if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp); err != nil { 588 t.Fatalf("err: %v", err) 589 } 590 591 if elapsed := time.Since(start); elapsed < 200*time.Millisecond { 592 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 593 } 594 if resp.Index != 200 { 595 t.Fatalf("Bad index: %d %d", resp.Index, 200) 596 } 597 if resp.Alloc == nil || resp.Alloc.ID != alloc2.ID { 598 t.Fatalf("bad: %#v", resp.Alloc) 599 } 600} 601 602func TestAllocEndpoint_GetAllocs(t *testing.T) { 603 t.Parallel() 604 605 s1, cleanupS1 := TestServer(t, nil) 606 defer cleanupS1() 607 codec := rpcClient(t, s1) 608 testutil.WaitForLeader(t, s1.RPC) 609 610 // Create the register request 611 alloc := mock.Alloc() 612 alloc2 := mock.Alloc() 613 state := s1.fsm.State() 614 state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) 615 state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) 616 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc, alloc2}) 617 if err != nil { 618 t.Fatalf("err: %v", err) 619 } 620 621 // Lookup the allocs 622 get := &structs.AllocsGetRequest{ 623 AllocIDs: []string{alloc.ID, alloc2.ID}, 624 QueryOptions: structs.QueryOptions{ 625 Region: "global", 626 }, 627 } 628 var resp structs.AllocsGetResponse 629 if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { 630 t.Fatalf("err: %v", err) 631 } 632 if resp.Index != 1000 { 633 t.Fatalf("Bad index: %d %d", resp.Index, 1000) 634 } 635 636 if len(resp.Allocs) != 2 { 637 t.Fatalf("bad: %#v", resp.Allocs) 638 } 639 640 // Lookup nonexistent allocs. 641 get = &structs.AllocsGetRequest{ 642 AllocIDs: []string{"foo"}, 643 QueryOptions: structs.QueryOptions{Region: "global"}, 644 } 645 if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err == nil { 646 t.Fatalf("expect error") 647 } 648} 649 650func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) { 651 t.Parallel() 652 653 s1, cleanupS1 := TestServer(t, nil) 654 defer cleanupS1() 655 state := s1.fsm.State() 656 codec := rpcClient(t, s1) 657 testutil.WaitForLeader(t, s1.RPC) 658 659 // Create the allocs 660 alloc1 := mock.Alloc() 661 alloc2 := mock.Alloc() 662 663 // First create an unrelated alloc 664 time.AfterFunc(100*time.Millisecond, func() { 665 state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID)) 666 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc1}) 667 if err != nil { 668 t.Fatalf("err: %v", err) 669 } 670 }) 671 672 // Create the alloc we are watching later 673 time.AfterFunc(200*time.Millisecond, func() { 674 state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID)) 675 err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2}) 676 if err != nil { 677 t.Fatalf("err: %v", err) 678 } 679 }) 680 681 // Lookup the allocs 682 get := &structs.AllocsGetRequest{ 683 AllocIDs: []string{alloc1.ID, alloc2.ID}, 684 QueryOptions: structs.QueryOptions{ 685 Region: "global", 686 MinQueryIndex: 150, 687 }, 688 } 689 var resp structs.AllocsGetResponse 690 start := time.Now() 691 if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { 692 t.Fatalf("err: %v", err) 693 } 694 695 if elapsed := time.Since(start); elapsed < 200*time.Millisecond { 696 t.Fatalf("should block (returned in %s) %#v", elapsed, resp) 697 } 698 if resp.Index != 200 { 699 t.Fatalf("Bad index: %d %d", resp.Index, 200) 700 } 701 if len(resp.Allocs) != 2 { 702 t.Fatalf("bad: %#v", resp.Allocs) 703 } 704} 705 706func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) { 707 t.Parallel() 708 require := require.New(t) 709 710 s1, _, cleanupS1 := TestACLServer(t, nil) 711 defer cleanupS1() 712 codec := rpcClient(t, s1) 713 testutil.WaitForLeader(t, s1.RPC) 714 715 // Create the register request 716 alloc := mock.Alloc() 717 alloc2 := mock.Alloc() 718 state := s1.fsm.State() 719 require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))) 720 require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) 721 require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc, alloc2})) 722 723 t1 := &structs.DesiredTransition{ 724 Migrate: helper.BoolToPtr(true), 725 } 726 727 // Update the allocs desired status 728 get := &structs.AllocUpdateDesiredTransitionRequest{ 729 Allocs: map[string]*structs.DesiredTransition{ 730 alloc.ID: t1, 731 alloc2.ID: t1, 732 }, 733 Evals: []*structs.Evaluation{ 734 { 735 ID: uuid.Generate(), 736 Namespace: alloc.Namespace, 737 Priority: alloc.Job.Priority, 738 Type: alloc.Job.Type, 739 TriggeredBy: structs.EvalTriggerNodeDrain, 740 JobID: alloc.Job.ID, 741 JobModifyIndex: alloc.Job.ModifyIndex, 742 Status: structs.EvalStatusPending, 743 }, 744 { 745 ID: uuid.Generate(), 746 Namespace: alloc2.Namespace, 747 Priority: alloc2.Job.Priority, 748 Type: alloc2.Job.Type, 749 TriggeredBy: structs.EvalTriggerNodeDrain, 750 JobID: alloc2.Job.ID, 751 JobModifyIndex: alloc2.Job.ModifyIndex, 752 Status: structs.EvalStatusPending, 753 }, 754 }, 755 WriteRequest: structs.WriteRequest{ 756 Region: "global", 757 }, 758 } 759 760 // Try without permissions 761 var resp structs.GenericResponse 762 err := msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp) 763 require.NotNil(err) 764 require.True(structs.IsErrPermissionDenied(err)) 765 766 // Try with permissions 767 get.WriteRequest.AuthToken = s1.getLeaderAcl() 768 var resp2 structs.GenericResponse 769 require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.UpdateDesiredTransition", get, &resp2)) 770 require.NotZero(resp2.Index) 771 772 // Look up the allocations 773 out1, err := state.AllocByID(nil, alloc.ID) 774 require.Nil(err) 775 out2, err := state.AllocByID(nil, alloc.ID) 776 require.Nil(err) 777 e1, err := state.EvalByID(nil, get.Evals[0].ID) 778 require.Nil(err) 779 e2, err := state.EvalByID(nil, get.Evals[1].ID) 780 require.Nil(err) 781 782 require.NotNil(out1.DesiredTransition.Migrate) 783 require.NotNil(out2.DesiredTransition.Migrate) 784 require.NotNil(e1) 785 require.NotNil(e2) 786 require.True(*out1.DesiredTransition.Migrate) 787 require.True(*out2.DesiredTransition.Migrate) 788} 789 790func TestAllocEndpoint_Stop_ACL(t *testing.T) { 791 t.Parallel() 792 require := require.New(t) 793 794 s1, _, cleanupS1 := TestACLServer(t, nil) 795 defer cleanupS1() 796 codec := rpcClient(t, s1) 797 testutil.WaitForLeader(t, s1.RPC) 798 799 // Create the register request 800 alloc := mock.Alloc() 801 alloc2 := mock.Alloc() 802 state := s1.fsm.State() 803 require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))) 804 require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) 805 require.Nil(state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc, alloc2})) 806 807 req := &structs.AllocStopRequest{ 808 AllocID: alloc.ID, 809 } 810 req.Namespace = structs.DefaultNamespace 811 req.Region = alloc.Job.Region 812 813 // Try without permissions 814 var resp structs.AllocStopResponse 815 err := msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp) 816 require.True(structs.IsErrPermissionDenied(err), "expected permissions error, got: %v", err) 817 818 // Try with management permissions 819 req.WriteRequest.AuthToken = s1.getLeaderAcl() 820 var resp2 structs.AllocStopResponse 821 require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp2)) 822 require.NotZero(resp2.Index) 823 824 // Try with alloc-lifecycle permissions 825 validToken := mock.CreatePolicyAndToken(t, state, 1002, "valid", 826 mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})) 827 req.WriteRequest.AuthToken = validToken.SecretID 828 req.AllocID = alloc2.ID 829 830 var resp3 structs.AllocStopResponse 831 require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp3)) 832 require.NotZero(resp3.Index) 833 834 // Look up the allocations 835 out1, err := state.AllocByID(nil, alloc.ID) 836 require.Nil(err) 837 out2, err := state.AllocByID(nil, alloc2.ID) 838 require.Nil(err) 839 e1, err := state.EvalByID(nil, resp2.EvalID) 840 require.Nil(err) 841 e2, err := state.EvalByID(nil, resp3.EvalID) 842 require.Nil(err) 843 844 require.NotNil(out1.DesiredTransition.Migrate) 845 require.NotNil(out2.DesiredTransition.Migrate) 846 require.NotNil(e1) 847 require.NotNil(e2) 848 require.True(*out1.DesiredTransition.Migrate) 849 require.True(*out2.DesiredTransition.Migrate) 850} 851 852func TestAllocEndpoint_List_AllNamespaces_ACL_OSS(t *testing.T) { 853 t.Parallel() 854 855 s1, root, cleanupS1 := TestACLServer(t, nil) 856 defer cleanupS1() 857 codec := rpcClient(t, s1) 858 testutil.WaitForLeader(t, s1.RPC) 859 state := s1.fsm.State() 860 861 // two namespaces 862 ns1 := mock.Namespace() 863 ns2 := mock.Namespace() 864 require.NoError(t, state.UpsertNamespaces(900, []*structs.Namespace{ns1, ns2})) 865 866 // Create the allocations 867 alloc1 := mock.Alloc() 868 alloc1.ID = "a" + alloc1.ID[1:] 869 alloc1.Namespace = ns1.Name 870 alloc2 := mock.Alloc() 871 alloc2.ID = "b" + alloc2.ID[1:] 872 alloc2.Namespace = ns2.Name 873 summary1 := mock.JobSummary(alloc1.JobID) 874 summary2 := mock.JobSummary(alloc2.JobID) 875 876 require.NoError(t, state.UpsertJobSummary(999, summary1)) 877 require.NoError(t, state.UpsertJobSummary(999, summary2)) 878 require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) 879 alloc1.CreateIndex = 1000 880 alloc1.ModifyIndex = 1000 881 alloc2.CreateIndex = 1000 882 alloc2.ModifyIndex = 1000 883 884 everythingButReadJob := []string{ 885 acl.NamespaceCapabilityDeny, 886 acl.NamespaceCapabilityListJobs, 887 // acl.NamespaceCapabilityReadJob, 888 acl.NamespaceCapabilitySubmitJob, 889 acl.NamespaceCapabilityDispatchJob, 890 acl.NamespaceCapabilityReadLogs, 891 acl.NamespaceCapabilityReadFS, 892 acl.NamespaceCapabilityAllocExec, 893 acl.NamespaceCapabilityAllocNodeExec, 894 acl.NamespaceCapabilityAllocLifecycle, 895 acl.NamespaceCapabilitySentinelOverride, 896 acl.NamespaceCapabilityCSIRegisterPlugin, 897 acl.NamespaceCapabilityCSIWriteVolume, 898 acl.NamespaceCapabilityCSIReadVolume, 899 acl.NamespaceCapabilityCSIListVolume, 900 acl.NamespaceCapabilityCSIMountVolume, 901 acl.NamespaceCapabilityListScalingPolicies, 902 acl.NamespaceCapabilityReadScalingPolicy, 903 acl.NamespaceCapabilityReadJobScaling, 904 acl.NamespaceCapabilityScaleJob, 905 acl.NamespaceCapabilitySubmitRecommendation, 906 } 907 908 ns1token := mock.CreatePolicyAndToken(t, state, 1001, "ns1", 909 mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) 910 ns1tokenInsufficient := mock.CreatePolicyAndToken(t, state, 1001, "ns1-insufficient", 911 mock.NamespacePolicy(ns1.Name, "", everythingButReadJob)) 912 ns2token := mock.CreatePolicyAndToken(t, state, 1001, "ns2", 913 mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob})) 914 bothToken := mock.CreatePolicyAndToken(t, state, 1001, "nsBoth", 915 mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})+ 916 mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob})) 917 918 cases := []struct { 919 Label string 920 Namespace string 921 Token string 922 Allocs []*structs.Allocation 923 Error bool 924 Message string 925 Prefix string 926 }{ 927 { 928 Label: "all namespaces with sufficient token", 929 Namespace: "*", 930 Token: bothToken.SecretID, 931 Allocs: []*structs.Allocation{alloc1, alloc2}, 932 }, 933 { 934 Label: "all namespaces with root token", 935 Namespace: "*", 936 Token: root.SecretID, 937 Allocs: []*structs.Allocation{alloc1, alloc2}, 938 }, 939 { 940 Label: "all namespaces with ns1 token", 941 Namespace: "*", 942 Token: ns1token.SecretID, 943 Allocs: []*structs.Allocation{alloc1}, 944 }, 945 { 946 Label: "all namespaces with ns2 token", 947 Namespace: "*", 948 Token: ns2token.SecretID, 949 Allocs: []*structs.Allocation{alloc2}, 950 }, 951 { 952 Label: "all namespaces with bad token", 953 Namespace: "*", 954 Token: uuid.Generate(), 955 Error: true, 956 Message: structs.ErrTokenNotFound.Error(), 957 }, 958 { 959 Label: "all namespaces with insufficient token", 960 Namespace: "*", 961 Allocs: []*structs.Allocation{}, 962 Token: ns1tokenInsufficient.SecretID, 963 }, 964 { 965 Label: "ns1 with ns1 token", 966 Namespace: ns1.Name, 967 Token: ns1token.SecretID, 968 Allocs: []*structs.Allocation{alloc1}, 969 }, 970 { 971 Label: "ns1 with root token", 972 Namespace: ns1.Name, 973 Token: root.SecretID, 974 Allocs: []*structs.Allocation{alloc1}, 975 }, 976 { 977 Label: "ns1 with ns2 token", 978 Namespace: ns1.Name, 979 Token: ns2token.SecretID, 980 Error: true, 981 }, 982 { 983 Label: "ns1 with invalid token", 984 Namespace: ns1.Name, 985 Token: uuid.Generate(), 986 Error: true, 987 Message: structs.ErrTokenNotFound.Error(), 988 }, 989 { 990 Label: "bad namespace with root token", 991 Namespace: uuid.Generate(), 992 Token: root.SecretID, 993 Allocs: []*structs.Allocation{}, 994 }, 995 { 996 Label: "all namespaces with prefix", 997 Namespace: "*", 998 Prefix: alloc1.ID[:2], 999 Token: root.SecretID, 1000 Allocs: []*structs.Allocation{alloc1}, 1001 }, 1002 } 1003 1004 for _, tc := range cases { 1005 t.Run(tc.Label, func(t *testing.T) { 1006 1007 get := &structs.AllocListRequest{ 1008 QueryOptions: structs.QueryOptions{ 1009 Region: "global", 1010 Namespace: tc.Namespace, 1011 Prefix: tc.Prefix, 1012 AuthToken: tc.Token, 1013 }, 1014 } 1015 var resp structs.AllocListResponse 1016 err := msgpackrpc.CallWithCodec(codec, "Alloc.List", get, &resp) 1017 if tc.Error { 1018 require.Error(t, err) 1019 if tc.Message != "" { 1020 require.Equal(t, err.Error(), tc.Message) 1021 } else { 1022 require.Equal(t, err.Error(), structs.ErrPermissionDenied.Error()) 1023 } 1024 } else { 1025 require.NoError(t, err) 1026 require.Equal(t, uint64(1000), resp.Index) 1027 exp := make([]*structs.AllocListStub, len(tc.Allocs)) 1028 for i, a := range tc.Allocs { 1029 exp[i] = a.Stub(nil) 1030 } 1031 require.ElementsMatch(t, exp, resp.Allocations) 1032 } 1033 }) 1034 } 1035 1036} 1037