1package allocrunner 2 3import ( 4 "fmt" 5 "io/ioutil" 6 "os" 7 "path/filepath" 8 "testing" 9 "time" 10 11 "github.com/hashicorp/consul/api" 12 "github.com/hashicorp/nomad/client/allochealth" 13 "github.com/hashicorp/nomad/client/allocwatcher" 14 cconsul "github.com/hashicorp/nomad/client/consul" 15 "github.com/hashicorp/nomad/client/state" 16 "github.com/hashicorp/nomad/command/agent/consul" 17 "github.com/hashicorp/nomad/helper/uuid" 18 "github.com/hashicorp/nomad/nomad/mock" 19 "github.com/hashicorp/nomad/nomad/structs" 20 "github.com/hashicorp/nomad/testutil" 21 "github.com/stretchr/testify/require" 22) 23 24// destroy does a blocking destroy on an alloc runner 25func destroy(ar *allocRunner) { 26 ar.Destroy() 27 <-ar.DestroyCh() 28} 29 30// TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via 31// AllocState() are initialized even before the AllocRunner has run. 32func TestAllocRunner_AllocState_Initialized(t *testing.T) { 33 t.Parallel() 34 35 alloc := mock.Alloc() 36 alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 37 conf, cleanup := testAllocRunnerConfig(t, alloc) 38 defer cleanup() 39 40 ar, err := NewAllocRunner(conf) 41 require.NoError(t, err) 42 43 allocState := ar.AllocState() 44 45 require.NotNil(t, allocState) 46 require.NotNil(t, allocState.TaskStates[conf.Alloc.Job.TaskGroups[0].Tasks[0].Name]) 47} 48 49// TestAllocRunner_TaskLeader_KillTG asserts that when a leader task dies the 50// entire task group is killed. 51func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { 52 t.Parallel() 53 54 alloc := mock.BatchAlloc() 55 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 56 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 57 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 58 59 // Create two tasks in the task group 60 task := alloc.Job.TaskGroups[0].Tasks[0] 61 task.Name = "task1" 62 task.Driver = "mock_driver" 63 task.KillTimeout = 10 * time.Millisecond 64 task.Config = map[string]interface{}{ 65 "run_for": "10s", 66 } 67 68 task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 69 task2.Name = "task2" 70 task2.Driver = "mock_driver" 71 task2.Leader = true 72 task2.Config = map[string]interface{}{ 73 "run_for": "1s", 74 } 75 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) 76 alloc.AllocatedResources.Tasks[task.Name] = tr 77 alloc.AllocatedResources.Tasks[task2.Name] = tr 78 79 conf, cleanup := testAllocRunnerConfig(t, alloc) 80 defer cleanup() 81 ar, err := NewAllocRunner(conf) 82 require.NoError(t, err) 83 defer destroy(ar) 84 go ar.Run() 85 86 // Wait for all tasks to be killed 87 upd := conf.StateUpdater.(*MockStateUpdater) 88 testutil.WaitForResult(func() (bool, error) { 89 last := upd.Last() 90 if last == nil { 91 return false, fmt.Errorf("No updates") 92 } 93 if last.ClientStatus != structs.AllocClientStatusComplete { 94 return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) 95 } 96 97 // Task1 should be killed because Task2 exited 98 state1 := last.TaskStates[task.Name] 99 if state1.State != structs.TaskStateDead { 100 return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) 101 } 102 if state1.FinishedAt.IsZero() || state1.StartedAt.IsZero() { 103 return false, fmt.Errorf("expected to have a start and finish time") 104 } 105 if len(state1.Events) < 2 { 106 // At least have a received and destroyed 107 return false, fmt.Errorf("Unexpected number of events") 108 } 109 110 found := false 111 killingMsg := "" 112 for _, e := range state1.Events { 113 if e.Type == structs.TaskLeaderDead { 114 found = true 115 } 116 if e.Type == structs.TaskKilling { 117 killingMsg = e.DisplayMessage 118 } 119 } 120 121 if !found { 122 return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead) 123 } 124 125 expectedKillingMsg := "Sent interrupt. Waiting 10ms before force killing" 126 if killingMsg != expectedKillingMsg { 127 return false, fmt.Errorf("Unexpected task event message - wanted %q. got %q", killingMsg, expectedKillingMsg) 128 } 129 130 // Task Two should be dead 131 state2 := last.TaskStates[task2.Name] 132 if state2.State != structs.TaskStateDead { 133 return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) 134 } 135 if state2.FinishedAt.IsZero() || state2.StartedAt.IsZero() { 136 return false, fmt.Errorf("expected to have a start and finish time") 137 } 138 139 return true, nil 140 }, func(err error) { 141 t.Fatalf("err: %v", err) 142 }) 143} 144 145// TestAllocRunner_Lifecycle_Poststart asserts that a service job with 2 146// poststart lifecycle hooks (1 sidecar, 1 ephemeral) starts all 3 tasks, only 147// the ephemeral one finishes, and the other 2 exit when the alloc is stopped. 148func TestAllocRunner_Lifecycle_Poststart(t *testing.T) { 149 alloc := mock.LifecycleAlloc() 150 151 alloc.Job.Type = structs.JobTypeService 152 mainTask := alloc.Job.TaskGroups[0].Tasks[0] 153 mainTask.Config["run_for"] = "100s" 154 155 sidecarTask := alloc.Job.TaskGroups[0].Tasks[1] 156 sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart 157 sidecarTask.Config["run_for"] = "100s" 158 159 ephemeralTask := alloc.Job.TaskGroups[0].Tasks[2] 160 ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart 161 162 conf, cleanup := testAllocRunnerConfig(t, alloc) 163 defer cleanup() 164 ar, err := NewAllocRunner(conf) 165 require.NoError(t, err) 166 defer destroy(ar) 167 go ar.Run() 168 169 upd := conf.StateUpdater.(*MockStateUpdater) 170 171 // Wait for main and sidecar tasks to be running, and that the 172 // ephemeral task ran and exited. 173 testutil.WaitForResult(func() (bool, error) { 174 last := upd.Last() 175 if last == nil { 176 return false, fmt.Errorf("No updates") 177 } 178 179 if last.ClientStatus != structs.AllocClientStatusRunning { 180 return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) 181 } 182 183 if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning { 184 return false, fmt.Errorf("expected main task to be running not %s", s) 185 } 186 187 if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning { 188 return false, fmt.Errorf("expected sidecar task to be running not %s", s) 189 } 190 191 if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead { 192 return false, fmt.Errorf("expected ephemeral task to be dead not %s", s) 193 } 194 195 if last.TaskStates[ephemeralTask.Name].Failed { 196 return false, fmt.Errorf("expected ephemeral task to be successful not failed") 197 } 198 199 return true, nil 200 }, func(err error) { 201 t.Fatalf("error waiting for initial state:\n%v", err) 202 }) 203 204 // Tell the alloc to stop 205 stopAlloc := alloc.Copy() 206 stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop 207 ar.Update(stopAlloc) 208 209 // Wait for main and sidecar tasks to stop. 210 testutil.WaitForResult(func() (bool, error) { 211 last := upd.Last() 212 213 if last.ClientStatus != structs.AllocClientStatusComplete { 214 return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) 215 } 216 217 if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead { 218 return false, fmt.Errorf("expected main task to be dead not %s", s) 219 } 220 221 if last.TaskStates[mainTask.Name].Failed { 222 return false, fmt.Errorf("expected main task to be successful not failed") 223 } 224 225 if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead { 226 return false, fmt.Errorf("expected sidecar task to be dead not %s", s) 227 } 228 229 if last.TaskStates[sidecarTask.Name].Failed { 230 return false, fmt.Errorf("expected sidecar task to be successful not failed") 231 } 232 233 return true, nil 234 }, func(err error) { 235 t.Fatalf("error waiting for initial state:\n%v", err) 236 }) 237} 238 239// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the 240// entire task group is killed. 241func TestAllocRunner_TaskMain_KillTG(t *testing.T) { 242 t.Parallel() 243 244 alloc := mock.BatchAlloc() 245 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 246 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 247 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 248 249 // Create four tasks in the task group 250 prestart := alloc.Job.TaskGroups[0].Tasks[0].Copy() 251 prestart.Name = "prestart-sidecar" 252 prestart.Driver = "mock_driver" 253 prestart.KillTimeout = 10 * time.Millisecond 254 prestart.Lifecycle = &structs.TaskLifecycleConfig{ 255 Hook: structs.TaskLifecycleHookPrestart, 256 Sidecar: true, 257 } 258 259 prestart.Config = map[string]interface{}{ 260 "run_for": "100s", 261 } 262 263 poststart := alloc.Job.TaskGroups[0].Tasks[0].Copy() 264 poststart.Name = "poststart-sidecar" 265 poststart.Driver = "mock_driver" 266 poststart.KillTimeout = 10 * time.Millisecond 267 poststart.Lifecycle = &structs.TaskLifecycleConfig{ 268 Hook: structs.TaskLifecycleHookPoststart, 269 Sidecar: true, 270 } 271 272 poststart.Config = map[string]interface{}{ 273 "run_for": "100s", 274 } 275 276 // these two main tasks have the same name, is that ok? 277 main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 278 main1.Name = "task2" 279 main1.Driver = "mock_driver" 280 main1.Config = map[string]interface{}{ 281 "run_for": "1s", 282 } 283 284 main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 285 main2.Name = "task2" 286 main2.Driver = "mock_driver" 287 main2.Config = map[string]interface{}{ 288 "run_for": "2s", 289 } 290 291 alloc.Job.TaskGroups[0].Tasks = []*structs.Task{prestart, poststart, main1, main2} 292 alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{ 293 prestart.Name: tr, 294 poststart.Name: tr, 295 main1.Name: tr, 296 main2.Name: tr, 297 } 298 299 conf, cleanup := testAllocRunnerConfig(t, alloc) 300 defer cleanup() 301 ar, err := NewAllocRunner(conf) 302 require.NoError(t, err) 303 defer destroy(ar) 304 go ar.Run() 305 306 hasTaskMainEvent := func(state *structs.TaskState) bool { 307 for _, e := range state.Events { 308 if e.Type == structs.TaskMainDead { 309 return true 310 } 311 } 312 313 return false 314 } 315 316 // Wait for all tasks to be killed 317 upd := conf.StateUpdater.(*MockStateUpdater) 318 testutil.WaitForResult(func() (bool, error) { 319 last := upd.Last() 320 if last == nil { 321 return false, fmt.Errorf("No updates") 322 } 323 if last.ClientStatus != structs.AllocClientStatusComplete { 324 return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) 325 } 326 327 var state *structs.TaskState 328 329 // both sidecars should be killed because Task2 exited 330 state = last.TaskStates[prestart.Name] 331 if state == nil { 332 return false, fmt.Errorf("could not find state for task %s", prestart.Name) 333 } 334 if state.State != structs.TaskStateDead { 335 return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) 336 } 337 if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { 338 return false, fmt.Errorf("expected to have a start and finish time") 339 } 340 if len(state.Events) < 2 { 341 // At least have a received and destroyed 342 return false, fmt.Errorf("Unexpected number of events") 343 } 344 345 if !hasTaskMainEvent(state) { 346 return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events) 347 } 348 349 state = last.TaskStates[poststart.Name] 350 if state == nil { 351 return false, fmt.Errorf("could not find state for task %s", poststart.Name) 352 } 353 if state.State != structs.TaskStateDead { 354 return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) 355 } 356 if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { 357 return false, fmt.Errorf("expected to have a start and finish time") 358 } 359 if len(state.Events) < 2 { 360 // At least have a received and destroyed 361 return false, fmt.Errorf("Unexpected number of events") 362 } 363 364 if !hasTaskMainEvent(state) { 365 return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events) 366 } 367 368 // main tasks should die naturely 369 state = last.TaskStates[main1.Name] 370 if state.State != structs.TaskStateDead { 371 return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) 372 } 373 if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { 374 return false, fmt.Errorf("expected to have a start and finish time") 375 } 376 if hasTaskMainEvent(state) { 377 return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events) 378 } 379 380 state = last.TaskStates[main2.Name] 381 if state.State != structs.TaskStateDead { 382 return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) 383 } 384 if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { 385 return false, fmt.Errorf("expected to have a start and finish time") 386 } 387 if hasTaskMainEvent(state) { 388 return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events) 389 } 390 391 return true, nil 392 }, func(err error) { 393 t.Fatalf("err: %v", err) 394 }) 395} 396 397// TestAllocRunner_Lifecycle_Poststop asserts that a service job with 1 398// postop lifecycle hook starts all 3 tasks, only 399// the ephemeral one finishes, and the other 2 exit when the alloc is stopped. 400func TestAllocRunner_Lifecycle_Poststop(t *testing.T) { 401 alloc := mock.LifecycleAlloc() 402 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 403 404 alloc.Job.Type = structs.JobTypeService 405 mainTask := alloc.Job.TaskGroups[0].Tasks[0] 406 mainTask.Config["run_for"] = "100s" 407 408 ephemeralTask := alloc.Job.TaskGroups[0].Tasks[1] 409 ephemeralTask.Name = "quit" 410 ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststop 411 ephemeralTask.Config["run_for"] = "10s" 412 413 alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask} 414 alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{ 415 mainTask.Name: tr, 416 ephemeralTask.Name: tr, 417 } 418 419 conf, cleanup := testAllocRunnerConfig(t, alloc) 420 defer cleanup() 421 ar, err := NewAllocRunner(conf) 422 require.NoError(t, err) 423 defer destroy(ar) 424 go ar.Run() 425 426 upd := conf.StateUpdater.(*MockStateUpdater) 427 428 // Wait for main task to be running 429 testutil.WaitForResult(func() (bool, error) { 430 last := upd.Last() 431 if last == nil { 432 return false, fmt.Errorf("No updates") 433 } 434 435 if last.ClientStatus != structs.AllocClientStatusRunning { 436 return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) 437 } 438 439 if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning { 440 return false, fmt.Errorf("expected main task to be running not %s", s) 441 } 442 443 if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStatePending { 444 return false, fmt.Errorf("expected ephemeral task to be pending not %s", s) 445 } 446 447 return true, nil 448 }, func(err error) { 449 t.Fatalf("error waiting for initial state:\n%v", err) 450 }) 451 452 // Tell the alloc to stop 453 stopAlloc := alloc.Copy() 454 stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop 455 ar.Update(stopAlloc) 456 457 // Wait for main task to die & poststop task to run. 458 testutil.WaitForResult(func() (bool, error) { 459 last := upd.Last() 460 461 if last.ClientStatus != structs.AllocClientStatusRunning { 462 return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) 463 } 464 465 if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead { 466 return false, fmt.Errorf("expected main task to be dead not %s", s) 467 } 468 469 if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateRunning { 470 return false, fmt.Errorf("expected poststop task to be running not %s", s) 471 } 472 473 return true, nil 474 }, func(err error) { 475 t.Fatalf("error waiting for initial state:\n%v", err) 476 }) 477 478} 479 480func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { 481 t.Parallel() 482 483 alloc := mock.Alloc() 484 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 485 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 486 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 487 488 // Create a group service 489 tg := alloc.Job.TaskGroups[0] 490 tg.Services = []*structs.Service{ 491 { 492 Name: "shutdown_service", 493 }, 494 } 495 496 // Create two tasks in the group 497 task := alloc.Job.TaskGroups[0].Tasks[0] 498 task.Name = "follower1" 499 task.Driver = "mock_driver" 500 task.Config = map[string]interface{}{ 501 "run_for": "10s", 502 } 503 504 task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 505 task2.Name = "leader" 506 task2.Driver = "mock_driver" 507 task2.Leader = true 508 task2.Config = map[string]interface{}{ 509 "run_for": "10s", 510 } 511 512 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) 513 alloc.AllocatedResources.Tasks[task.Name] = tr 514 alloc.AllocatedResources.Tasks[task2.Name] = tr 515 516 // Set a shutdown delay 517 shutdownDelay := 1 * time.Second 518 alloc.Job.TaskGroups[0].ShutdownDelay = &shutdownDelay 519 520 conf, cleanup := testAllocRunnerConfig(t, alloc) 521 defer cleanup() 522 ar, err := NewAllocRunner(conf) 523 require.NoError(t, err) 524 defer destroy(ar) 525 go ar.Run() 526 527 // Wait for tasks to start 528 upd := conf.StateUpdater.(*MockStateUpdater) 529 last := upd.Last() 530 testutil.WaitForResult(func() (bool, error) { 531 last = upd.Last() 532 if last == nil { 533 return false, fmt.Errorf("No updates") 534 } 535 if n := len(last.TaskStates); n != 2 { 536 return false, fmt.Errorf("Not enough task states (want: 2; found %d)", n) 537 } 538 for name, state := range last.TaskStates { 539 if state.State != structs.TaskStateRunning { 540 return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State) 541 } 542 } 543 return true, nil 544 }, func(err error) { 545 t.Fatalf("err: %v", err) 546 }) 547 548 // Reset updates 549 upd.Reset() 550 551 // Stop alloc 552 shutdownInit := time.Now() 553 update := alloc.Copy() 554 update.DesiredStatus = structs.AllocDesiredStatusStop 555 ar.Update(update) 556 557 // Wait for tasks to stop 558 testutil.WaitForResult(func() (bool, error) { 559 last := upd.Last() 560 if last == nil { 561 return false, fmt.Errorf("No updates") 562 } 563 564 fin := last.TaskStates["leader"].FinishedAt 565 566 if fin.IsZero() { 567 return false, nil 568 } 569 570 return true, nil 571 }, func(err error) { 572 last := upd.Last() 573 for name, state := range last.TaskStates { 574 t.Logf("%s: %s", name, state.State) 575 } 576 t.Fatalf("err: %v", err) 577 }) 578 579 // Get consul client operations 580 consulClient := conf.Consul.(*cconsul.MockConsulServiceClient) 581 consulOpts := consulClient.GetOps() 582 var groupRemoveOp cconsul.MockConsulOp 583 for _, op := range consulOpts { 584 // Grab the first deregistration request 585 if op.Op == "remove" && op.Name == "group-web" { 586 groupRemoveOp = op 587 break 588 } 589 } 590 591 // Ensure remove operation is close to shutdown initiation 592 require.True(t, groupRemoveOp.OccurredAt.Sub(shutdownInit) < 100*time.Millisecond) 593 594 last = upd.Last() 595 minShutdown := shutdownInit.Add(task.ShutdownDelay) 596 leaderFinished := last.TaskStates["leader"].FinishedAt 597 followerFinished := last.TaskStates["follower1"].FinishedAt 598 599 // Check that both tasks shut down after min possible shutdown time 600 require.Greater(t, leaderFinished.UnixNano(), minShutdown.UnixNano()) 601 require.Greater(t, followerFinished.UnixNano(), minShutdown.UnixNano()) 602 603 // Check that there is at least shutdown_delay between consul 604 // remove operation and task finished at time 605 require.True(t, leaderFinished.Sub(groupRemoveOp.OccurredAt) > shutdownDelay) 606} 607 608// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a 609// leader the leader is stopped before other tasks. 610func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { 611 t.Parallel() 612 613 alloc := mock.Alloc() 614 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 615 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 616 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 617 618 // Create 3 tasks in the task group 619 task := alloc.Job.TaskGroups[0].Tasks[0] 620 task.Name = "follower1" 621 task.Driver = "mock_driver" 622 task.Config = map[string]interface{}{ 623 "run_for": "10s", 624 } 625 626 task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 627 task2.Name = "leader" 628 task2.Driver = "mock_driver" 629 task2.Leader = true 630 task2.Config = map[string]interface{}{ 631 "run_for": "10s", 632 } 633 634 task3 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 635 task3.Name = "follower2" 636 task3.Driver = "mock_driver" 637 task3.Config = map[string]interface{}{ 638 "run_for": "10s", 639 } 640 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2, task3) 641 alloc.AllocatedResources.Tasks[task.Name] = tr 642 alloc.AllocatedResources.Tasks[task2.Name] = tr 643 alloc.AllocatedResources.Tasks[task3.Name] = tr 644 645 conf, cleanup := testAllocRunnerConfig(t, alloc) 646 defer cleanup() 647 ar, err := NewAllocRunner(conf) 648 require.NoError(t, err) 649 defer destroy(ar) 650 go ar.Run() 651 652 // Wait for tasks to start 653 upd := conf.StateUpdater.(*MockStateUpdater) 654 last := upd.Last() 655 testutil.WaitForResult(func() (bool, error) { 656 last = upd.Last() 657 if last == nil { 658 return false, fmt.Errorf("No updates") 659 } 660 if n := len(last.TaskStates); n != 3 { 661 return false, fmt.Errorf("Not enough task states (want: 3; found %d)", n) 662 } 663 for name, state := range last.TaskStates { 664 if state.State != structs.TaskStateRunning { 665 return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State) 666 } 667 } 668 return true, nil 669 }, func(err error) { 670 t.Fatalf("err: %v", err) 671 }) 672 673 // Reset updates 674 upd.Reset() 675 676 // Stop alloc 677 update := alloc.Copy() 678 update.DesiredStatus = structs.AllocDesiredStatusStop 679 ar.Update(update) 680 681 // Wait for tasks to stop 682 testutil.WaitForResult(func() (bool, error) { 683 last := upd.Last() 684 if last == nil { 685 return false, fmt.Errorf("No updates") 686 } 687 if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower1"].FinishedAt.UnixNano() { 688 return false, fmt.Errorf("expected leader to finish before follower1: %s >= %s", 689 last.TaskStates["leader"].FinishedAt, last.TaskStates["follower1"].FinishedAt) 690 } 691 if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower2"].FinishedAt.UnixNano() { 692 return false, fmt.Errorf("expected leader to finish before follower2: %s >= %s", 693 last.TaskStates["leader"].FinishedAt, last.TaskStates["follower2"].FinishedAt) 694 } 695 return true, nil 696 }, func(err error) { 697 last := upd.Last() 698 for name, state := range last.TaskStates { 699 t.Logf("%s: %s", name, state.State) 700 } 701 t.Fatalf("err: %v", err) 702 }) 703} 704 705// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a 706// restored task group with a leader that failed before restoring the leader is 707// not stopped as it does not exist. 708// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932 709func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { 710 t.Parallel() 711 712 alloc := mock.Alloc() 713 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 714 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 715 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 716 717 // Create a leader and follower task in the task group 718 task := alloc.Job.TaskGroups[0].Tasks[0] 719 task.Name = "follower1" 720 task.Driver = "mock_driver" 721 task.KillTimeout = 10 * time.Second 722 task.Config = map[string]interface{}{ 723 "run_for": "10s", 724 } 725 726 task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 727 task2.Name = "leader" 728 task2.Driver = "mock_driver" 729 task2.Leader = true 730 task2.KillTimeout = 10 * time.Millisecond 731 task2.Config = map[string]interface{}{ 732 "run_for": "10s", 733 } 734 735 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) 736 alloc.AllocatedResources.Tasks[task.Name] = tr 737 alloc.AllocatedResources.Tasks[task2.Name] = tr 738 739 conf, cleanup := testAllocRunnerConfig(t, alloc) 740 defer cleanup() 741 742 // Use a memory backed statedb 743 conf.StateDB = state.NewMemDB(conf.Logger) 744 745 ar, err := NewAllocRunner(conf) 746 require.NoError(t, err) 747 748 // Mimic Nomad exiting before the leader stopping is able to stop other tasks. 749 ar.tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) 750 ar.tasks["follower1"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) 751 752 // Create a new AllocRunner to test RestoreState and Run 753 ar2, err := NewAllocRunner(conf) 754 require.NoError(t, err) 755 defer destroy(ar2) 756 757 if err := ar2.Restore(); err != nil { 758 t.Fatalf("error restoring state: %v", err) 759 } 760 ar2.Run() 761 762 // Wait for tasks to be stopped because leader is dead 763 testutil.WaitForResult(func() (bool, error) { 764 alloc := ar2.Alloc() 765 // TODO: this test does not test anything!!! alloc.TaskStates is an empty map 766 for task, state := range alloc.TaskStates { 767 if state.State != structs.TaskStateDead { 768 return false, fmt.Errorf("Task %q should be dead: %v", task, state.State) 769 } 770 } 771 return true, nil 772 }, func(err error) { 773 t.Fatalf("err: %v", err) 774 }) 775 776 // Make sure it GCs properly 777 ar2.Destroy() 778 779 select { 780 case <-ar2.DestroyCh(): 781 // exited as expected 782 case <-time.After(10 * time.Second): 783 t.Fatalf("timed out waiting for AR to GC") 784 } 785} 786 787func TestAllocRunner_Restore_LifecycleHooks(t *testing.T) { 788 t.Parallel() 789 790 alloc := mock.LifecycleAlloc() 791 792 conf, cleanup := testAllocRunnerConfig(t, alloc) 793 defer cleanup() 794 795 // Use a memory backed statedb 796 conf.StateDB = state.NewMemDB(conf.Logger) 797 798 ar, err := NewAllocRunner(conf) 799 require.NoError(t, err) 800 801 // We should see all tasks with Prestart hooks are not blocked from running: 802 // i.e. the "init" and "side" task hook coordinator channels are closed 803 require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["init"].Task())), "init channel was open, should be closed") 804 require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task())), "side channel was open, should be closed") 805 806 isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task())) 807 808 // Mimic client dies while init task running, and client restarts after init task finished 809 ar.tasks["init"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskTerminated)) 810 ar.tasks["side"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) 811 812 // Create a new AllocRunner to test RestoreState and Run 813 ar2, err := NewAllocRunner(conf) 814 require.NoError(t, err) 815 816 if err := ar2.Restore(); err != nil { 817 t.Fatalf("error restoring state: %v", err) 818 } 819 820 // We want to see Restore resume execution with correct hook ordering: 821 // i.e. we should see the "web" main task hook coordinator channel is closed 822 require.Truef(t, isChannelClosed(ar2.taskHookCoordinator.startConditionForTask(ar.tasks["web"].Task())), "web channel was open, should be closed") 823} 824 825func TestAllocRunner_Update_Semantics(t *testing.T) { 826 t.Parallel() 827 require := require.New(t) 828 829 updatedAlloc := func(a *structs.Allocation) *structs.Allocation { 830 upd := a.CopySkipJob() 831 upd.AllocModifyIndex++ 832 833 return upd 834 } 835 836 alloc := mock.Alloc() 837 alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" 838 conf, cleanup := testAllocRunnerConfig(t, alloc) 839 defer cleanup() 840 841 ar, err := NewAllocRunner(conf) 842 require.NoError(err) 843 844 upd1 := updatedAlloc(alloc) 845 ar.Update(upd1) 846 847 // Update was placed into a queue 848 require.Len(ar.allocUpdatedCh, 1) 849 850 upd2 := updatedAlloc(alloc) 851 ar.Update(upd2) 852 853 // Allocation was _replaced_ 854 855 require.Len(ar.allocUpdatedCh, 1) 856 queuedAlloc := <-ar.allocUpdatedCh 857 require.Equal(upd2, queuedAlloc) 858 859 // Requeueing older alloc is skipped 860 ar.Update(upd2) 861 ar.Update(upd1) 862 863 queuedAlloc = <-ar.allocUpdatedCh 864 require.Equal(upd2, queuedAlloc) 865 866 // Ignore after watch closed 867 868 close(ar.waitCh) 869 870 ar.Update(upd1) 871 872 // Did not queue the update 873 require.Len(ar.allocUpdatedCh, 0) 874} 875 876// TestAllocRunner_DeploymentHealth_Healthy_Migration asserts that health is 877// reported for services that got migrated; not just part of deployments. 878func TestAllocRunner_DeploymentHealth_Healthy_Migration(t *testing.T) { 879 t.Parallel() 880 881 alloc := mock.Alloc() 882 883 // Ensure the alloc is *not* part of a deployment 884 alloc.DeploymentID = "" 885 886 // Shorten the default migration healthy time 887 tg := alloc.Job.TaskGroups[0] 888 tg.Migrate = structs.DefaultMigrateStrategy() 889 tg.Migrate.MinHealthyTime = 100 * time.Millisecond 890 tg.Migrate.HealthCheck = structs.MigrateStrategyHealthStates 891 892 task := tg.Tasks[0] 893 task.Driver = "mock_driver" 894 task.Config = map[string]interface{}{ 895 "run_for": "30s", 896 } 897 898 conf, cleanup := testAllocRunnerConfig(t, alloc) 899 defer cleanup() 900 901 ar, err := NewAllocRunner(conf) 902 require.NoError(t, err) 903 go ar.Run() 904 defer destroy(ar) 905 906 upd := conf.StateUpdater.(*MockStateUpdater) 907 testutil.WaitForResult(func() (bool, error) { 908 last := upd.Last() 909 if last == nil { 910 return false, fmt.Errorf("No updates") 911 } 912 if !last.DeploymentStatus.HasHealth() { 913 return false, fmt.Errorf("want deployment status unhealthy; got unset") 914 } else if !*last.DeploymentStatus.Healthy { 915 // This is fatal 916 t.Fatal("want deployment status healthy; got unhealthy") 917 } 918 return true, nil 919 }, func(err error) { 920 require.NoError(t, err) 921 }) 922} 923 924// TestAllocRunner_DeploymentHealth_Healthy_NoChecks asserts that the health 925// watcher will mark the allocation as healthy based on task states alone. 926func TestAllocRunner_DeploymentHealth_Healthy_NoChecks(t *testing.T) { 927 t.Parallel() 928 929 alloc := mock.Alloc() 930 931 task := alloc.Job.TaskGroups[0].Tasks[0] 932 task.Driver = "mock_driver" 933 task.Config = map[string]interface{}{ 934 "run_for": "10s", 935 } 936 937 // Create a task that takes longer to become healthy 938 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task.Copy()) 939 alloc.AllocatedResources.Tasks["task2"] = alloc.AllocatedResources.Tasks["web"].Copy() 940 task2 := alloc.Job.TaskGroups[0].Tasks[1] 941 task2.Name = "task2" 942 task2.Config["start_block_for"] = "500ms" 943 944 // Make the alloc be part of a deployment that uses task states for 945 // health checks 946 alloc.DeploymentID = uuid.Generate() 947 alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() 948 alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_TaskStates 949 alloc.Job.TaskGroups[0].Update.MaxParallel = 1 950 alloc.Job.TaskGroups[0].Update.MinHealthyTime = 100 * time.Millisecond 951 952 conf, cleanup := testAllocRunnerConfig(t, alloc) 953 defer cleanup() 954 955 ar, err := NewAllocRunner(conf) 956 require.NoError(t, err) 957 958 start, done := time.Now(), time.Time{} 959 go ar.Run() 960 defer destroy(ar) 961 962 upd := conf.StateUpdater.(*MockStateUpdater) 963 testutil.WaitForResult(func() (bool, error) { 964 last := upd.Last() 965 if last == nil { 966 return false, fmt.Errorf("No updates") 967 } 968 if !last.DeploymentStatus.HasHealth() { 969 return false, fmt.Errorf("want deployment status unhealthy; got unset") 970 } else if !*last.DeploymentStatus.Healthy { 971 // This is fatal 972 t.Fatal("want deployment status healthy; got unhealthy") 973 } 974 975 // Capture the done timestamp 976 done = last.DeploymentStatus.Timestamp 977 return true, nil 978 }, func(err error) { 979 require.NoError(t, err) 980 }) 981 982 if d := done.Sub(start); d < 500*time.Millisecond { 983 t.Fatalf("didn't wait for second task group. Only took %v", d) 984 } 985} 986 987// TestAllocRunner_DeploymentHealth_Unhealthy_Checks asserts that the health 988// watcher will mark the allocation as unhealthy with failing checks. 989func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) { 990 t.Parallel() 991 992 alloc := mock.Alloc() 993 task := alloc.Job.TaskGroups[0].Tasks[0] 994 task.Driver = "mock_driver" 995 task.Config = map[string]interface{}{ 996 "run_for": "10s", 997 } 998 999 // Set a service with check 1000 task.Services = []*structs.Service{ 1001 { 1002 Name: "fakservice", 1003 PortLabel: "http", 1004 Checks: []*structs.ServiceCheck{ 1005 { 1006 Name: "fakecheck", 1007 Type: structs.ServiceCheckScript, 1008 Command: "true", 1009 Interval: 30 * time.Second, 1010 Timeout: 5 * time.Second, 1011 }, 1012 }, 1013 }, 1014 } 1015 1016 // Make the alloc be part of a deployment 1017 alloc.DeploymentID = uuid.Generate() 1018 alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() 1019 alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_Checks 1020 alloc.Job.TaskGroups[0].Update.MaxParallel = 1 1021 alloc.Job.TaskGroups[0].Update.MinHealthyTime = 100 * time.Millisecond 1022 alloc.Job.TaskGroups[0].Update.HealthyDeadline = 1 * time.Second 1023 1024 checkUnhealthy := &api.AgentCheck{ 1025 CheckID: uuid.Generate(), 1026 Status: api.HealthWarning, 1027 } 1028 1029 conf, cleanup := testAllocRunnerConfig(t, alloc) 1030 defer cleanup() 1031 1032 // Only return the check as healthy after a duration 1033 consulClient := conf.Consul.(*cconsul.MockConsulServiceClient) 1034 consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { 1035 return &consul.AllocRegistration{ 1036 Tasks: map[string]*consul.ServiceRegistrations{ 1037 task.Name: { 1038 Services: map[string]*consul.ServiceRegistration{ 1039 "123": { 1040 Service: &api.AgentService{Service: "fakeservice"}, 1041 Checks: []*api.AgentCheck{checkUnhealthy}, 1042 }, 1043 }, 1044 }, 1045 }, 1046 }, nil 1047 } 1048 1049 ar, err := NewAllocRunner(conf) 1050 require.NoError(t, err) 1051 go ar.Run() 1052 defer destroy(ar) 1053 1054 var lastUpdate *structs.Allocation 1055 upd := conf.StateUpdater.(*MockStateUpdater) 1056 testutil.WaitForResult(func() (bool, error) { 1057 lastUpdate = upd.Last() 1058 if lastUpdate == nil { 1059 return false, fmt.Errorf("No updates") 1060 } 1061 if !lastUpdate.DeploymentStatus.HasHealth() { 1062 return false, fmt.Errorf("want deployment status unhealthy; got unset") 1063 } else if *lastUpdate.DeploymentStatus.Healthy { 1064 // This is fatal 1065 t.Fatal("want deployment status unhealthy; got healthy") 1066 } 1067 return true, nil 1068 }, func(err error) { 1069 require.NoError(t, err) 1070 }) 1071 1072 // Assert that we have an event explaining why we are unhealthy. 1073 require.Len(t, lastUpdate.TaskStates, 1) 1074 state := lastUpdate.TaskStates[task.Name] 1075 require.NotNil(t, state) 1076 require.NotEmpty(t, state.Events) 1077 last := state.Events[len(state.Events)-1] 1078 require.Equal(t, allochealth.AllocHealthEventSource, last.Type) 1079 require.Contains(t, last.Message, "by deadline") 1080} 1081 1082// TestAllocRunner_Destroy asserts that Destroy kills and cleans up a running 1083// alloc. 1084func TestAllocRunner_Destroy(t *testing.T) { 1085 t.Parallel() 1086 1087 // Ensure task takes some time 1088 alloc := mock.BatchAlloc() 1089 task := alloc.Job.TaskGroups[0].Tasks[0] 1090 task.Config["run_for"] = "10s" 1091 1092 conf, cleanup := testAllocRunnerConfig(t, alloc) 1093 defer cleanup() 1094 1095 // Use a MemDB to assert alloc state gets cleaned up 1096 conf.StateDB = state.NewMemDB(conf.Logger) 1097 1098 ar, err := NewAllocRunner(conf) 1099 require.NoError(t, err) 1100 go ar.Run() 1101 1102 // Wait for alloc to be running 1103 testutil.WaitForResult(func() (bool, error) { 1104 state := ar.AllocState() 1105 1106 return state.ClientStatus == structs.AllocClientStatusRunning, 1107 fmt.Errorf("got client status %v; want running", state.ClientStatus) 1108 }, func(err error) { 1109 require.NoError(t, err) 1110 }) 1111 1112 // Assert state was stored 1113 ls, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name) 1114 require.NoError(t, err) 1115 require.NotNil(t, ls) 1116 require.NotNil(t, ts) 1117 1118 // Now destroy 1119 ar.Destroy() 1120 1121 select { 1122 case <-ar.DestroyCh(): 1123 // Destroyed properly! 1124 case <-time.After(10 * time.Second): 1125 require.Fail(t, "timed out waiting for alloc to be destroyed") 1126 } 1127 1128 // Assert alloc is dead 1129 state := ar.AllocState() 1130 require.Equal(t, structs.AllocClientStatusComplete, state.ClientStatus) 1131 1132 // Assert the state was cleaned 1133 ls, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name) 1134 require.NoError(t, err) 1135 require.Nil(t, ls) 1136 require.Nil(t, ts) 1137 1138 // Assert the alloc directory was cleaned 1139 if _, err := os.Stat(ar.allocDir.AllocDir); err == nil { 1140 require.Fail(t, "alloc dir still exists: %v", ar.allocDir.AllocDir) 1141 } else if !os.IsNotExist(err) { 1142 require.Failf(t, "expected NotExist error", "found %v", err) 1143 } 1144} 1145 1146func TestAllocRunner_SimpleRun(t *testing.T) { 1147 t.Parallel() 1148 1149 alloc := mock.BatchAlloc() 1150 1151 conf, cleanup := testAllocRunnerConfig(t, alloc) 1152 defer cleanup() 1153 ar, err := NewAllocRunner(conf) 1154 require.NoError(t, err) 1155 go ar.Run() 1156 defer destroy(ar) 1157 1158 // Wait for alloc to be running 1159 testutil.WaitForResult(func() (bool, error) { 1160 state := ar.AllocState() 1161 1162 if state.ClientStatus != structs.AllocClientStatusComplete { 1163 return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusComplete) 1164 } 1165 1166 for t, s := range state.TaskStates { 1167 if s.FinishedAt.IsZero() { 1168 return false, fmt.Errorf("task %q has zero FinishedAt value", t) 1169 } 1170 } 1171 1172 return true, nil 1173 }, func(err error) { 1174 require.NoError(t, err) 1175 }) 1176 1177} 1178 1179// TestAllocRunner_MoveAllocDir asserts that a rescheduled 1180// allocation copies ephemeral disk content from previous alloc run 1181func TestAllocRunner_MoveAllocDir(t *testing.T) { 1182 t.Parallel() 1183 1184 // Step 1: start and run a task 1185 alloc := mock.BatchAlloc() 1186 conf, cleanup := testAllocRunnerConfig(t, alloc) 1187 defer cleanup() 1188 ar, err := NewAllocRunner(conf) 1189 require.NoError(t, err) 1190 ar.Run() 1191 defer destroy(ar) 1192 1193 require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus) 1194 1195 // Step 2. Modify its directory 1196 task := alloc.Job.TaskGroups[0].Tasks[0] 1197 dataFile := filepath.Join(ar.allocDir.SharedDir, "data", "data_file") 1198 ioutil.WriteFile(dataFile, []byte("hello world"), os.ModePerm) 1199 taskDir := ar.allocDir.TaskDirs[task.Name] 1200 taskLocalFile := filepath.Join(taskDir.LocalDir, "local_file") 1201 ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm) 1202 1203 // Step 3. Start a new alloc 1204 alloc2 := mock.BatchAlloc() 1205 alloc2.PreviousAllocation = alloc.ID 1206 alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true 1207 1208 conf2, cleanup := testAllocRunnerConfig(t, alloc2) 1209 conf2.PrevAllocWatcher, conf2.PrevAllocMigrator = allocwatcher.NewAllocWatcher(allocwatcher.Config{ 1210 Alloc: alloc2, 1211 PreviousRunner: ar, 1212 Logger: conf2.Logger, 1213 }) 1214 defer cleanup() 1215 ar2, err := NewAllocRunner(conf2) 1216 require.NoError(t, err) 1217 1218 ar2.Run() 1219 defer destroy(ar2) 1220 1221 require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus) 1222 1223 // Ensure that data from ar was moved to ar2 1224 dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") 1225 fileInfo, _ := os.Stat(dataFile) 1226 require.NotNilf(t, fileInfo, "file %q not found", dataFile) 1227 1228 taskDir = ar2.allocDir.TaskDirs[task.Name] 1229 taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file") 1230 fileInfo, _ = os.Stat(taskLocalFile) 1231 require.NotNilf(t, fileInfo, "file %q not found", dataFile) 1232 1233} 1234 1235// TestAllocRuner_HandlesArtifactFailure ensures that if one task in a task group is 1236// retrying fetching an artifact, other tasks in the group should be able 1237// to proceed. 1238func TestAllocRunner_HandlesArtifactFailure(t *testing.T) { 1239 t.Parallel() 1240 1241 alloc := mock.BatchAlloc() 1242 rp := &structs.RestartPolicy{ 1243 Mode: structs.RestartPolicyModeFail, 1244 Attempts: 1, 1245 Delay: time.Nanosecond, 1246 Interval: time.Hour, 1247 } 1248 alloc.Job.TaskGroups[0].RestartPolicy = rp 1249 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy = rp 1250 1251 // Create a new task with a bad artifact 1252 badtask := alloc.Job.TaskGroups[0].Tasks[0].Copy() 1253 badtask.Name = "bad" 1254 badtask.Artifacts = []*structs.TaskArtifact{ 1255 {GetterSource: "http://127.0.0.1:0/foo/bar/baz"}, 1256 } 1257 1258 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, badtask) 1259 alloc.AllocatedResources.Tasks["bad"] = &structs.AllocatedTaskResources{ 1260 Cpu: structs.AllocatedCpuResources{ 1261 CpuShares: 500, 1262 }, 1263 Memory: structs.AllocatedMemoryResources{ 1264 MemoryMB: 256, 1265 }, 1266 } 1267 1268 conf, cleanup := testAllocRunnerConfig(t, alloc) 1269 defer cleanup() 1270 ar, err := NewAllocRunner(conf) 1271 require.NoError(t, err) 1272 go ar.Run() 1273 defer destroy(ar) 1274 1275 testutil.WaitForResult(func() (bool, error) { 1276 state := ar.AllocState() 1277 1278 switch state.ClientStatus { 1279 case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed: 1280 return true, nil 1281 default: 1282 return false, fmt.Errorf("got status %v but want terminal", state.ClientStatus) 1283 } 1284 1285 }, func(err error) { 1286 require.NoError(t, err) 1287 }) 1288 1289 state := ar.AllocState() 1290 require.Equal(t, structs.AllocClientStatusFailed, state.ClientStatus) 1291 require.Equal(t, structs.TaskStateDead, state.TaskStates["web"].State) 1292 require.True(t, state.TaskStates["web"].Successful()) 1293 require.Equal(t, structs.TaskStateDead, state.TaskStates["bad"].State) 1294 require.True(t, state.TaskStates["bad"].Failed) 1295} 1296 1297// Test that alloc runner kills tasks in task group when another task fails 1298func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { 1299 alloc := mock.Alloc() 1300 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 1301 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 1302 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 1303 1304 // Create two tasks in the task group 1305 task := alloc.Job.TaskGroups[0].Tasks[0] 1306 task.Name = "task1" 1307 task.Driver = "mock_driver" 1308 task.KillTimeout = 10 * time.Millisecond 1309 task.Config = map[string]interface{}{ 1310 "run_for": "10s", 1311 } 1312 // Set a service with check 1313 task.Services = []*structs.Service{ 1314 { 1315 Name: "fakservice", 1316 PortLabel: "http", 1317 Checks: []*structs.ServiceCheck{ 1318 { 1319 Name: "fakecheck", 1320 Type: structs.ServiceCheckScript, 1321 Command: "true", 1322 Interval: 30 * time.Second, 1323 Timeout: 5 * time.Second, 1324 }, 1325 }, 1326 }, 1327 } 1328 1329 task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() 1330 task2.Name = "task 2" 1331 task2.Driver = "mock_driver" 1332 task2.Config = map[string]interface{}{ 1333 "start_error": "fail task please", 1334 } 1335 alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) 1336 alloc.AllocatedResources.Tasks[task.Name] = tr 1337 alloc.AllocatedResources.Tasks[task2.Name] = tr 1338 1339 // Make the alloc be part of a deployment 1340 alloc.DeploymentID = uuid.Generate() 1341 alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy() 1342 alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_Checks 1343 alloc.Job.TaskGroups[0].Update.MaxParallel = 1 1344 alloc.Job.TaskGroups[0].Update.MinHealthyTime = 10 * time.Millisecond 1345 alloc.Job.TaskGroups[0].Update.HealthyDeadline = 2 * time.Second 1346 1347 checkHealthy := &api.AgentCheck{ 1348 CheckID: uuid.Generate(), 1349 Status: api.HealthPassing, 1350 } 1351 1352 conf, cleanup := testAllocRunnerConfig(t, alloc) 1353 defer cleanup() 1354 1355 consulClient := conf.Consul.(*cconsul.MockConsulServiceClient) 1356 consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) { 1357 return &consul.AllocRegistration{ 1358 Tasks: map[string]*consul.ServiceRegistrations{ 1359 task.Name: { 1360 Services: map[string]*consul.ServiceRegistration{ 1361 "123": { 1362 Service: &api.AgentService{Service: "fakeservice"}, 1363 Checks: []*api.AgentCheck{checkHealthy}, 1364 }, 1365 }, 1366 }, 1367 }, 1368 }, nil 1369 } 1370 1371 ar, err := NewAllocRunner(conf) 1372 require.NoError(t, err) 1373 defer destroy(ar) 1374 go ar.Run() 1375 upd := conf.StateUpdater.(*MockStateUpdater) 1376 1377 testutil.WaitForResult(func() (bool, error) { 1378 last := upd.Last() 1379 if last == nil { 1380 return false, fmt.Errorf("No updates") 1381 } 1382 if last.ClientStatus != structs.AllocClientStatusFailed { 1383 return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) 1384 } 1385 1386 // Task One should be killed 1387 state1 := last.TaskStates[task.Name] 1388 if state1.State != structs.TaskStateDead { 1389 return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) 1390 } 1391 if len(state1.Events) < 2 { 1392 // At least have a received and destroyed 1393 return false, fmt.Errorf("Unexpected number of events") 1394 } 1395 1396 found := false 1397 for _, e := range state1.Events { 1398 if e.Type != structs.TaskSiblingFailed { 1399 found = true 1400 } 1401 } 1402 1403 if !found { 1404 return false, fmt.Errorf("Did not find event %v", structs.TaskSiblingFailed) 1405 } 1406 1407 // Task Two should be failed 1408 state2 := last.TaskStates[task2.Name] 1409 if state2.State != structs.TaskStateDead { 1410 return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) 1411 } 1412 if !state2.Failed { 1413 return false, fmt.Errorf("task2 should have failed") 1414 } 1415 1416 if !last.DeploymentStatus.HasHealth() { 1417 return false, fmt.Errorf("Expected deployment health to be non nil") 1418 } 1419 1420 return true, nil 1421 }, func(err error) { 1422 require.Fail(t, "err: %v", err) 1423 }) 1424} 1425 1426// Test that alloc becoming terminal should destroy the alloc runner 1427func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { 1428 t.Parallel() 1429 alloc := mock.BatchAlloc() 1430 tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] 1431 alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 1432 alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 1433 // Ensure task takes some time 1434 task := alloc.Job.TaskGroups[0].Tasks[0] 1435 task.Driver = "mock_driver" 1436 task.Config["run_for"] = "10s" 1437 alloc.AllocatedResources.Tasks[task.Name] = tr 1438 1439 conf, cleanup := testAllocRunnerConfig(t, alloc) 1440 defer cleanup() 1441 ar, err := NewAllocRunner(conf) 1442 require.NoError(t, err) 1443 defer destroy(ar) 1444 go ar.Run() 1445 upd := conf.StateUpdater.(*MockStateUpdater) 1446 1447 testutil.WaitForResult(func() (bool, error) { 1448 last := upd.Last() 1449 if last == nil { 1450 return false, fmt.Errorf("No updates") 1451 } 1452 if last.ClientStatus != structs.AllocClientStatusRunning { 1453 return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) 1454 } 1455 return true, nil 1456 }, func(err error) { 1457 require.Fail(t, "err: %v", err) 1458 }) 1459 1460 // Update the alloc to be terminal which should cause the alloc runner to 1461 // stop the tasks and wait for a destroy. 1462 update := ar.alloc.Copy() 1463 update.DesiredStatus = structs.AllocDesiredStatusStop 1464 ar.Update(update) 1465 1466 testutil.WaitForResult(func() (bool, error) { 1467 last := upd.Last() 1468 if last == nil { 1469 return false, fmt.Errorf("No updates") 1470 } 1471 1472 // Check the status has changed. 1473 if last.ClientStatus != structs.AllocClientStatusComplete { 1474 return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) 1475 } 1476 1477 // Check the alloc directory still exists 1478 if _, err := os.Stat(ar.allocDir.AllocDir); err != nil { 1479 return false, fmt.Errorf("alloc dir destroyed: %v", ar.allocDir.AllocDir) 1480 } 1481 1482 return true, nil 1483 }, func(err error) { 1484 require.Fail(t, "err: %v", err) 1485 }) 1486 1487 // Send the destroy signal and ensure the AllocRunner cleans up. 1488 ar.Destroy() 1489 1490 testutil.WaitForResult(func() (bool, error) { 1491 last := upd.Last() 1492 if last == nil { 1493 return false, fmt.Errorf("No updates") 1494 } 1495 1496 // Check the status has changed. 1497 if last.ClientStatus != structs.AllocClientStatusComplete { 1498 return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) 1499 } 1500 1501 // Check the alloc directory was cleaned 1502 if _, err := os.Stat(ar.allocDir.AllocDir); err == nil { 1503 return false, fmt.Errorf("alloc dir still exists: %v", ar.allocDir.AllocDir) 1504 } else if !os.IsNotExist(err) { 1505 return false, fmt.Errorf("stat err: %v", err) 1506 } 1507 1508 return true, nil 1509 }, func(err error) { 1510 require.Fail(t, "err: %v", err) 1511 }) 1512} 1513 1514// TestAllocRunner_PersistState_Destroyed asserts that destroyed allocs don't persist anymore 1515func TestAllocRunner_PersistState_Destroyed(t *testing.T) { 1516 t.Parallel() 1517 1518 alloc := mock.BatchAlloc() 1519 taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name 1520 1521 conf, cleanup := testAllocRunnerConfig(t, alloc) 1522 conf.StateDB = state.NewMemDB(conf.Logger) 1523 1524 defer cleanup() 1525 ar, err := NewAllocRunner(conf) 1526 require.NoError(t, err) 1527 defer destroy(ar) 1528 1529 go ar.Run() 1530 1531 select { 1532 case <-ar.WaitCh(): 1533 case <-time.After(10 * time.Second): 1534 require.Fail(t, "timed out waiting for alloc to complete") 1535 } 1536 1537 // test final persisted state upon completion 1538 require.NoError(t, ar.PersistState()) 1539 allocs, _, err := conf.StateDB.GetAllAllocations() 1540 require.NoError(t, err) 1541 require.Len(t, allocs, 1) 1542 require.Equal(t, alloc.ID, allocs[0].ID) 1543 _, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, taskName) 1544 require.NoError(t, err) 1545 require.Equal(t, structs.TaskStateDead, ts.State) 1546 1547 // check that DB alloc is empty after destroying AR 1548 ar.Destroy() 1549 select { 1550 case <-ar.DestroyCh(): 1551 case <-time.After(10 * time.Second): 1552 require.Fail(t, "timedout waiting for destruction") 1553 } 1554 1555 allocs, _, err = conf.StateDB.GetAllAllocations() 1556 require.NoError(t, err) 1557 require.Empty(t, allocs) 1558 _, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, taskName) 1559 require.NoError(t, err) 1560 require.Nil(t, ts) 1561 1562 // check that DB alloc is empty after persisting state of destroyed AR 1563 ar.PersistState() 1564 allocs, _, err = conf.StateDB.GetAllAllocations() 1565 require.NoError(t, err) 1566 require.Empty(t, allocs) 1567 _, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, taskName) 1568 require.NoError(t, err) 1569 require.Nil(t, ts) 1570} 1571