1package allocrunner
2
3import (
4	"fmt"
5	"io/ioutil"
6	"os"
7	"path/filepath"
8	"testing"
9	"time"
10
11	"github.com/hashicorp/consul/api"
12	"github.com/hashicorp/nomad/client/allochealth"
13	"github.com/hashicorp/nomad/client/allocwatcher"
14	cconsul "github.com/hashicorp/nomad/client/consul"
15	"github.com/hashicorp/nomad/client/state"
16	"github.com/hashicorp/nomad/command/agent/consul"
17	"github.com/hashicorp/nomad/helper/uuid"
18	"github.com/hashicorp/nomad/nomad/mock"
19	"github.com/hashicorp/nomad/nomad/structs"
20	"github.com/hashicorp/nomad/testutil"
21	"github.com/stretchr/testify/require"
22)
23
24// destroy does a blocking destroy on an alloc runner
25func destroy(ar *allocRunner) {
26	ar.Destroy()
27	<-ar.DestroyCh()
28}
29
30// TestAllocRunner_AllocState_Initialized asserts that getting TaskStates via
31// AllocState() are initialized even before the AllocRunner has run.
32func TestAllocRunner_AllocState_Initialized(t *testing.T) {
33	t.Parallel()
34
35	alloc := mock.Alloc()
36	alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
37	conf, cleanup := testAllocRunnerConfig(t, alloc)
38	defer cleanup()
39
40	ar, err := NewAllocRunner(conf)
41	require.NoError(t, err)
42
43	allocState := ar.AllocState()
44
45	require.NotNil(t, allocState)
46	require.NotNil(t, allocState.TaskStates[conf.Alloc.Job.TaskGroups[0].Tasks[0].Name])
47}
48
49// TestAllocRunner_TaskLeader_KillTG asserts that when a leader task dies the
50// entire task group is killed.
51func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
52	t.Parallel()
53
54	alloc := mock.BatchAlloc()
55	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
56	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
57	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
58
59	// Create two tasks in the task group
60	task := alloc.Job.TaskGroups[0].Tasks[0]
61	task.Name = "task1"
62	task.Driver = "mock_driver"
63	task.KillTimeout = 10 * time.Millisecond
64	task.Config = map[string]interface{}{
65		"run_for": "10s",
66	}
67
68	task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
69	task2.Name = "task2"
70	task2.Driver = "mock_driver"
71	task2.Leader = true
72	task2.Config = map[string]interface{}{
73		"run_for": "1s",
74	}
75	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
76	alloc.AllocatedResources.Tasks[task.Name] = tr
77	alloc.AllocatedResources.Tasks[task2.Name] = tr
78
79	conf, cleanup := testAllocRunnerConfig(t, alloc)
80	defer cleanup()
81	ar, err := NewAllocRunner(conf)
82	require.NoError(t, err)
83	defer destroy(ar)
84	go ar.Run()
85
86	// Wait for all tasks to be killed
87	upd := conf.StateUpdater.(*MockStateUpdater)
88	testutil.WaitForResult(func() (bool, error) {
89		last := upd.Last()
90		if last == nil {
91			return false, fmt.Errorf("No updates")
92		}
93		if last.ClientStatus != structs.AllocClientStatusComplete {
94			return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
95		}
96
97		// Task1 should be killed because Task2 exited
98		state1 := last.TaskStates[task.Name]
99		if state1.State != structs.TaskStateDead {
100			return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
101		}
102		if state1.FinishedAt.IsZero() || state1.StartedAt.IsZero() {
103			return false, fmt.Errorf("expected to have a start and finish time")
104		}
105		if len(state1.Events) < 2 {
106			// At least have a received and destroyed
107			return false, fmt.Errorf("Unexpected number of events")
108		}
109
110		found := false
111		killingMsg := ""
112		for _, e := range state1.Events {
113			if e.Type == structs.TaskLeaderDead {
114				found = true
115			}
116			if e.Type == structs.TaskKilling {
117				killingMsg = e.DisplayMessage
118			}
119		}
120
121		if !found {
122			return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead)
123		}
124
125		expectedKillingMsg := "Sent interrupt. Waiting 10ms before force killing"
126		if killingMsg != expectedKillingMsg {
127			return false, fmt.Errorf("Unexpected task event message - wanted %q. got %q", killingMsg, expectedKillingMsg)
128		}
129
130		// Task Two should be dead
131		state2 := last.TaskStates[task2.Name]
132		if state2.State != structs.TaskStateDead {
133			return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
134		}
135		if state2.FinishedAt.IsZero() || state2.StartedAt.IsZero() {
136			return false, fmt.Errorf("expected to have a start and finish time")
137		}
138
139		return true, nil
140	}, func(err error) {
141		t.Fatalf("err: %v", err)
142	})
143}
144
145// TestAllocRunner_Lifecycle_Poststart asserts that a service job with 2
146// poststart lifecycle hooks (1 sidecar, 1 ephemeral) starts all 3 tasks, only
147// the ephemeral one finishes, and the other 2 exit when the alloc is stopped.
148func TestAllocRunner_Lifecycle_Poststart(t *testing.T) {
149	alloc := mock.LifecycleAlloc()
150
151	alloc.Job.Type = structs.JobTypeService
152	mainTask := alloc.Job.TaskGroups[0].Tasks[0]
153	mainTask.Config["run_for"] = "100s"
154
155	sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
156	sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
157	sidecarTask.Config["run_for"] = "100s"
158
159	ephemeralTask := alloc.Job.TaskGroups[0].Tasks[2]
160	ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
161
162	conf, cleanup := testAllocRunnerConfig(t, alloc)
163	defer cleanup()
164	ar, err := NewAllocRunner(conf)
165	require.NoError(t, err)
166	defer destroy(ar)
167	go ar.Run()
168
169	upd := conf.StateUpdater.(*MockStateUpdater)
170
171	// Wait for main and sidecar tasks to be running, and that the
172	// ephemeral task ran and exited.
173	testutil.WaitForResult(func() (bool, error) {
174		last := upd.Last()
175		if last == nil {
176			return false, fmt.Errorf("No updates")
177		}
178
179		if last.ClientStatus != structs.AllocClientStatusRunning {
180			return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
181		}
182
183		if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
184			return false, fmt.Errorf("expected main task to be running not %s", s)
185		}
186
187		if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
188			return false, fmt.Errorf("expected sidecar task to be running not %s", s)
189		}
190
191		if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
192			return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
193		}
194
195		if last.TaskStates[ephemeralTask.Name].Failed {
196			return false, fmt.Errorf("expected ephemeral task to be successful not failed")
197		}
198
199		return true, nil
200	}, func(err error) {
201		t.Fatalf("error waiting for initial state:\n%v", err)
202	})
203
204	// Tell the alloc to stop
205	stopAlloc := alloc.Copy()
206	stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
207	ar.Update(stopAlloc)
208
209	// Wait for main and sidecar tasks to stop.
210	testutil.WaitForResult(func() (bool, error) {
211		last := upd.Last()
212
213		if last.ClientStatus != structs.AllocClientStatusComplete {
214			return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
215		}
216
217		if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
218			return false, fmt.Errorf("expected main task to be dead not %s", s)
219		}
220
221		if last.TaskStates[mainTask.Name].Failed {
222			return false, fmt.Errorf("expected main task to be successful not failed")
223		}
224
225		if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
226			return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
227		}
228
229		if last.TaskStates[sidecarTask.Name].Failed {
230			return false, fmt.Errorf("expected sidecar task to be successful not failed")
231		}
232
233		return true, nil
234	}, func(err error) {
235		t.Fatalf("error waiting for initial state:\n%v", err)
236	})
237}
238
239// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
240// entire task group is killed.
241func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
242	t.Parallel()
243
244	alloc := mock.BatchAlloc()
245	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
246	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
247	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
248
249	// Create four tasks in the task group
250	prestart := alloc.Job.TaskGroups[0].Tasks[0].Copy()
251	prestart.Name = "prestart-sidecar"
252	prestart.Driver = "mock_driver"
253	prestart.KillTimeout = 10 * time.Millisecond
254	prestart.Lifecycle = &structs.TaskLifecycleConfig{
255		Hook:    structs.TaskLifecycleHookPrestart,
256		Sidecar: true,
257	}
258
259	prestart.Config = map[string]interface{}{
260		"run_for": "100s",
261	}
262
263	poststart := alloc.Job.TaskGroups[0].Tasks[0].Copy()
264	poststart.Name = "poststart-sidecar"
265	poststart.Driver = "mock_driver"
266	poststart.KillTimeout = 10 * time.Millisecond
267	poststart.Lifecycle = &structs.TaskLifecycleConfig{
268		Hook:    structs.TaskLifecycleHookPoststart,
269		Sidecar: true,
270	}
271
272	poststart.Config = map[string]interface{}{
273		"run_for": "100s",
274	}
275
276	// these two main tasks have the same name, is that ok?
277	main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
278	main1.Name = "task2"
279	main1.Driver = "mock_driver"
280	main1.Config = map[string]interface{}{
281		"run_for": "1s",
282	}
283
284	main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
285	main2.Name = "task2"
286	main2.Driver = "mock_driver"
287	main2.Config = map[string]interface{}{
288		"run_for": "2s",
289	}
290
291	alloc.Job.TaskGroups[0].Tasks = []*structs.Task{prestart, poststart, main1, main2}
292	alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
293		prestart.Name:  tr,
294		poststart.Name: tr,
295		main1.Name:     tr,
296		main2.Name:     tr,
297	}
298
299	conf, cleanup := testAllocRunnerConfig(t, alloc)
300	defer cleanup()
301	ar, err := NewAllocRunner(conf)
302	require.NoError(t, err)
303	defer destroy(ar)
304	go ar.Run()
305
306	hasTaskMainEvent := func(state *structs.TaskState) bool {
307		for _, e := range state.Events {
308			if e.Type == structs.TaskMainDead {
309				return true
310			}
311		}
312
313		return false
314	}
315
316	// Wait for all tasks to be killed
317	upd := conf.StateUpdater.(*MockStateUpdater)
318	testutil.WaitForResult(func() (bool, error) {
319		last := upd.Last()
320		if last == nil {
321			return false, fmt.Errorf("No updates")
322		}
323		if last.ClientStatus != structs.AllocClientStatusComplete {
324			return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
325		}
326
327		var state *structs.TaskState
328
329		// both sidecars should be killed because Task2 exited
330		state = last.TaskStates[prestart.Name]
331		if state == nil {
332			return false, fmt.Errorf("could not find state for task %s", prestart.Name)
333		}
334		if state.State != structs.TaskStateDead {
335			return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
336		}
337		if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
338			return false, fmt.Errorf("expected to have a start and finish time")
339		}
340		if len(state.Events) < 2 {
341			// At least have a received and destroyed
342			return false, fmt.Errorf("Unexpected number of events")
343		}
344
345		if !hasTaskMainEvent(state) {
346			return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
347		}
348
349		state = last.TaskStates[poststart.Name]
350		if state == nil {
351			return false, fmt.Errorf("could not find state for task %s", poststart.Name)
352		}
353		if state.State != structs.TaskStateDead {
354			return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
355		}
356		if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
357			return false, fmt.Errorf("expected to have a start and finish time")
358		}
359		if len(state.Events) < 2 {
360			// At least have a received and destroyed
361			return false, fmt.Errorf("Unexpected number of events")
362		}
363
364		if !hasTaskMainEvent(state) {
365			return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
366		}
367
368		// main tasks should die naturely
369		state = last.TaskStates[main1.Name]
370		if state.State != structs.TaskStateDead {
371			return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
372		}
373		if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
374			return false, fmt.Errorf("expected to have a start and finish time")
375		}
376		if hasTaskMainEvent(state) {
377			return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events)
378		}
379
380		state = last.TaskStates[main2.Name]
381		if state.State != structs.TaskStateDead {
382			return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
383		}
384		if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
385			return false, fmt.Errorf("expected to have a start and finish time")
386		}
387		if hasTaskMainEvent(state) {
388			return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events)
389		}
390
391		return true, nil
392	}, func(err error) {
393		t.Fatalf("err: %v", err)
394	})
395}
396
397// TestAllocRunner_Lifecycle_Poststop asserts that a service job with 1
398// postop lifecycle hook starts all 3 tasks, only
399// the ephemeral one finishes, and the other 2 exit when the alloc is stopped.
400func TestAllocRunner_Lifecycle_Poststop(t *testing.T) {
401	alloc := mock.LifecycleAlloc()
402	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
403
404	alloc.Job.Type = structs.JobTypeService
405	mainTask := alloc.Job.TaskGroups[0].Tasks[0]
406	mainTask.Config["run_for"] = "100s"
407
408	ephemeralTask := alloc.Job.TaskGroups[0].Tasks[1]
409	ephemeralTask.Name = "quit"
410	ephemeralTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststop
411	ephemeralTask.Config["run_for"] = "10s"
412
413	alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask}
414	alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
415		mainTask.Name:      tr,
416		ephemeralTask.Name: tr,
417	}
418
419	conf, cleanup := testAllocRunnerConfig(t, alloc)
420	defer cleanup()
421	ar, err := NewAllocRunner(conf)
422	require.NoError(t, err)
423	defer destroy(ar)
424	go ar.Run()
425
426	upd := conf.StateUpdater.(*MockStateUpdater)
427
428	// Wait for main task to be running
429	testutil.WaitForResult(func() (bool, error) {
430		last := upd.Last()
431		if last == nil {
432			return false, fmt.Errorf("No updates")
433		}
434
435		if last.ClientStatus != structs.AllocClientStatusRunning {
436			return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
437		}
438
439		if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
440			return false, fmt.Errorf("expected main task to be running not %s", s)
441		}
442
443		if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStatePending {
444			return false, fmt.Errorf("expected ephemeral task to be pending not %s", s)
445		}
446
447		return true, nil
448	}, func(err error) {
449		t.Fatalf("error waiting for initial state:\n%v", err)
450	})
451
452	// Tell the alloc to stop
453	stopAlloc := alloc.Copy()
454	stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
455	ar.Update(stopAlloc)
456
457	// Wait for main task to die & poststop task to run.
458	testutil.WaitForResult(func() (bool, error) {
459		last := upd.Last()
460
461		if last.ClientStatus != structs.AllocClientStatusRunning {
462			return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
463		}
464
465		if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
466			return false, fmt.Errorf("expected main task to be dead not %s", s)
467		}
468
469		if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateRunning {
470			return false, fmt.Errorf("expected poststop task to be running not %s", s)
471		}
472
473		return true, nil
474	}, func(err error) {
475		t.Fatalf("error waiting for initial state:\n%v", err)
476	})
477
478}
479
480func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
481	t.Parallel()
482
483	alloc := mock.Alloc()
484	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
485	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
486	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
487
488	// Create a group service
489	tg := alloc.Job.TaskGroups[0]
490	tg.Services = []*structs.Service{
491		{
492			Name: "shutdown_service",
493		},
494	}
495
496	// Create two tasks in the  group
497	task := alloc.Job.TaskGroups[0].Tasks[0]
498	task.Name = "follower1"
499	task.Driver = "mock_driver"
500	task.Config = map[string]interface{}{
501		"run_for": "10s",
502	}
503
504	task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
505	task2.Name = "leader"
506	task2.Driver = "mock_driver"
507	task2.Leader = true
508	task2.Config = map[string]interface{}{
509		"run_for": "10s",
510	}
511
512	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
513	alloc.AllocatedResources.Tasks[task.Name] = tr
514	alloc.AllocatedResources.Tasks[task2.Name] = tr
515
516	// Set a shutdown delay
517	shutdownDelay := 1 * time.Second
518	alloc.Job.TaskGroups[0].ShutdownDelay = &shutdownDelay
519
520	conf, cleanup := testAllocRunnerConfig(t, alloc)
521	defer cleanup()
522	ar, err := NewAllocRunner(conf)
523	require.NoError(t, err)
524	defer destroy(ar)
525	go ar.Run()
526
527	// Wait for tasks to start
528	upd := conf.StateUpdater.(*MockStateUpdater)
529	last := upd.Last()
530	testutil.WaitForResult(func() (bool, error) {
531		last = upd.Last()
532		if last == nil {
533			return false, fmt.Errorf("No updates")
534		}
535		if n := len(last.TaskStates); n != 2 {
536			return false, fmt.Errorf("Not enough task states (want: 2; found %d)", n)
537		}
538		for name, state := range last.TaskStates {
539			if state.State != structs.TaskStateRunning {
540				return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
541			}
542		}
543		return true, nil
544	}, func(err error) {
545		t.Fatalf("err: %v", err)
546	})
547
548	// Reset updates
549	upd.Reset()
550
551	// Stop alloc
552	shutdownInit := time.Now()
553	update := alloc.Copy()
554	update.DesiredStatus = structs.AllocDesiredStatusStop
555	ar.Update(update)
556
557	// Wait for tasks to stop
558	testutil.WaitForResult(func() (bool, error) {
559		last := upd.Last()
560		if last == nil {
561			return false, fmt.Errorf("No updates")
562		}
563
564		fin := last.TaskStates["leader"].FinishedAt
565
566		if fin.IsZero() {
567			return false, nil
568		}
569
570		return true, nil
571	}, func(err error) {
572		last := upd.Last()
573		for name, state := range last.TaskStates {
574			t.Logf("%s: %s", name, state.State)
575		}
576		t.Fatalf("err: %v", err)
577	})
578
579	// Get consul client operations
580	consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
581	consulOpts := consulClient.GetOps()
582	var groupRemoveOp cconsul.MockConsulOp
583	for _, op := range consulOpts {
584		// Grab the first deregistration request
585		if op.Op == "remove" && op.Name == "group-web" {
586			groupRemoveOp = op
587			break
588		}
589	}
590
591	// Ensure remove operation is close to shutdown initiation
592	require.True(t, groupRemoveOp.OccurredAt.Sub(shutdownInit) < 100*time.Millisecond)
593
594	last = upd.Last()
595	minShutdown := shutdownInit.Add(task.ShutdownDelay)
596	leaderFinished := last.TaskStates["leader"].FinishedAt
597	followerFinished := last.TaskStates["follower1"].FinishedAt
598
599	// Check that both tasks shut down after min possible shutdown time
600	require.Greater(t, leaderFinished.UnixNano(), minShutdown.UnixNano())
601	require.Greater(t, followerFinished.UnixNano(), minShutdown.UnixNano())
602
603	// Check that there is at least shutdown_delay between consul
604	// remove operation and task finished at time
605	require.True(t, leaderFinished.Sub(groupRemoveOp.OccurredAt) > shutdownDelay)
606}
607
608// TestAllocRunner_TaskLeader_StopTG asserts that when stopping an alloc with a
609// leader the leader is stopped before other tasks.
610func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
611	t.Parallel()
612
613	alloc := mock.Alloc()
614	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
615	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
616	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
617
618	// Create 3 tasks in the task group
619	task := alloc.Job.TaskGroups[0].Tasks[0]
620	task.Name = "follower1"
621	task.Driver = "mock_driver"
622	task.Config = map[string]interface{}{
623		"run_for": "10s",
624	}
625
626	task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
627	task2.Name = "leader"
628	task2.Driver = "mock_driver"
629	task2.Leader = true
630	task2.Config = map[string]interface{}{
631		"run_for": "10s",
632	}
633
634	task3 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
635	task3.Name = "follower2"
636	task3.Driver = "mock_driver"
637	task3.Config = map[string]interface{}{
638		"run_for": "10s",
639	}
640	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2, task3)
641	alloc.AllocatedResources.Tasks[task.Name] = tr
642	alloc.AllocatedResources.Tasks[task2.Name] = tr
643	alloc.AllocatedResources.Tasks[task3.Name] = tr
644
645	conf, cleanup := testAllocRunnerConfig(t, alloc)
646	defer cleanup()
647	ar, err := NewAllocRunner(conf)
648	require.NoError(t, err)
649	defer destroy(ar)
650	go ar.Run()
651
652	// Wait for tasks to start
653	upd := conf.StateUpdater.(*MockStateUpdater)
654	last := upd.Last()
655	testutil.WaitForResult(func() (bool, error) {
656		last = upd.Last()
657		if last == nil {
658			return false, fmt.Errorf("No updates")
659		}
660		if n := len(last.TaskStates); n != 3 {
661			return false, fmt.Errorf("Not enough task states (want: 3; found %d)", n)
662		}
663		for name, state := range last.TaskStates {
664			if state.State != structs.TaskStateRunning {
665				return false, fmt.Errorf("Task %q is not running yet (it's %q)", name, state.State)
666			}
667		}
668		return true, nil
669	}, func(err error) {
670		t.Fatalf("err: %v", err)
671	})
672
673	// Reset updates
674	upd.Reset()
675
676	// Stop alloc
677	update := alloc.Copy()
678	update.DesiredStatus = structs.AllocDesiredStatusStop
679	ar.Update(update)
680
681	// Wait for tasks to stop
682	testutil.WaitForResult(func() (bool, error) {
683		last := upd.Last()
684		if last == nil {
685			return false, fmt.Errorf("No updates")
686		}
687		if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower1"].FinishedAt.UnixNano() {
688			return false, fmt.Errorf("expected leader to finish before follower1: %s >= %s",
689				last.TaskStates["leader"].FinishedAt, last.TaskStates["follower1"].FinishedAt)
690		}
691		if last.TaskStates["leader"].FinishedAt.UnixNano() >= last.TaskStates["follower2"].FinishedAt.UnixNano() {
692			return false, fmt.Errorf("expected leader to finish before follower2: %s >= %s",
693				last.TaskStates["leader"].FinishedAt, last.TaskStates["follower2"].FinishedAt)
694		}
695		return true, nil
696	}, func(err error) {
697		last := upd.Last()
698		for name, state := range last.TaskStates {
699			t.Logf("%s: %s", name, state.State)
700		}
701		t.Fatalf("err: %v", err)
702	})
703}
704
705// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a
706// restored task group with a leader that failed before restoring the leader is
707// not stopped as it does not exist.
708// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932
709func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
710	t.Parallel()
711
712	alloc := mock.Alloc()
713	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
714	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
715	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
716
717	// Create a leader and follower task in the task group
718	task := alloc.Job.TaskGroups[0].Tasks[0]
719	task.Name = "follower1"
720	task.Driver = "mock_driver"
721	task.KillTimeout = 10 * time.Second
722	task.Config = map[string]interface{}{
723		"run_for": "10s",
724	}
725
726	task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
727	task2.Name = "leader"
728	task2.Driver = "mock_driver"
729	task2.Leader = true
730	task2.KillTimeout = 10 * time.Millisecond
731	task2.Config = map[string]interface{}{
732		"run_for": "10s",
733	}
734
735	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
736	alloc.AllocatedResources.Tasks[task.Name] = tr
737	alloc.AllocatedResources.Tasks[task2.Name] = tr
738
739	conf, cleanup := testAllocRunnerConfig(t, alloc)
740	defer cleanup()
741
742	// Use a memory backed statedb
743	conf.StateDB = state.NewMemDB(conf.Logger)
744
745	ar, err := NewAllocRunner(conf)
746	require.NoError(t, err)
747
748	// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
749	ar.tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled))
750	ar.tasks["follower1"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
751
752	// Create a new AllocRunner to test RestoreState and Run
753	ar2, err := NewAllocRunner(conf)
754	require.NoError(t, err)
755	defer destroy(ar2)
756
757	if err := ar2.Restore(); err != nil {
758		t.Fatalf("error restoring state: %v", err)
759	}
760	ar2.Run()
761
762	// Wait for tasks to be stopped because leader is dead
763	testutil.WaitForResult(func() (bool, error) {
764		alloc := ar2.Alloc()
765		// TODO: this test does not test anything!!! alloc.TaskStates is an empty map
766		for task, state := range alloc.TaskStates {
767			if state.State != structs.TaskStateDead {
768				return false, fmt.Errorf("Task %q should be dead: %v", task, state.State)
769			}
770		}
771		return true, nil
772	}, func(err error) {
773		t.Fatalf("err: %v", err)
774	})
775
776	// Make sure it GCs properly
777	ar2.Destroy()
778
779	select {
780	case <-ar2.DestroyCh():
781		// exited as expected
782	case <-time.After(10 * time.Second):
783		t.Fatalf("timed out waiting for AR to GC")
784	}
785}
786
787func TestAllocRunner_Restore_LifecycleHooks(t *testing.T) {
788	t.Parallel()
789
790	alloc := mock.LifecycleAlloc()
791
792	conf, cleanup := testAllocRunnerConfig(t, alloc)
793	defer cleanup()
794
795	// Use a memory backed statedb
796	conf.StateDB = state.NewMemDB(conf.Logger)
797
798	ar, err := NewAllocRunner(conf)
799	require.NoError(t, err)
800
801	// We should see all tasks with Prestart hooks are not blocked from running:
802	// i.e. the "init" and "side" task hook coordinator channels are closed
803	require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["init"].Task())), "init channel was open, should be closed")
804	require.Truef(t, isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task())), "side channel was open, should be closed")
805
806	isChannelClosed(ar.taskHookCoordinator.startConditionForTask(ar.tasks["side"].Task()))
807
808	// Mimic client dies while init task running, and client restarts after init task finished
809	ar.tasks["init"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskTerminated))
810	ar.tasks["side"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
811
812	// Create a new AllocRunner to test RestoreState and Run
813	ar2, err := NewAllocRunner(conf)
814	require.NoError(t, err)
815
816	if err := ar2.Restore(); err != nil {
817		t.Fatalf("error restoring state: %v", err)
818	}
819
820	// We want to see Restore resume execution with correct hook ordering:
821	// i.e. we should see the "web" main task hook coordinator channel is closed
822	require.Truef(t, isChannelClosed(ar2.taskHookCoordinator.startConditionForTask(ar.tasks["web"].Task())), "web channel was open, should be closed")
823}
824
825func TestAllocRunner_Update_Semantics(t *testing.T) {
826	t.Parallel()
827	require := require.New(t)
828
829	updatedAlloc := func(a *structs.Allocation) *structs.Allocation {
830		upd := a.CopySkipJob()
831		upd.AllocModifyIndex++
832
833		return upd
834	}
835
836	alloc := mock.Alloc()
837	alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
838	conf, cleanup := testAllocRunnerConfig(t, alloc)
839	defer cleanup()
840
841	ar, err := NewAllocRunner(conf)
842	require.NoError(err)
843
844	upd1 := updatedAlloc(alloc)
845	ar.Update(upd1)
846
847	// Update was placed into a queue
848	require.Len(ar.allocUpdatedCh, 1)
849
850	upd2 := updatedAlloc(alloc)
851	ar.Update(upd2)
852
853	// Allocation was _replaced_
854
855	require.Len(ar.allocUpdatedCh, 1)
856	queuedAlloc := <-ar.allocUpdatedCh
857	require.Equal(upd2, queuedAlloc)
858
859	// Requeueing older alloc is skipped
860	ar.Update(upd2)
861	ar.Update(upd1)
862
863	queuedAlloc = <-ar.allocUpdatedCh
864	require.Equal(upd2, queuedAlloc)
865
866	// Ignore after watch closed
867
868	close(ar.waitCh)
869
870	ar.Update(upd1)
871
872	// Did not queue the update
873	require.Len(ar.allocUpdatedCh, 0)
874}
875
876// TestAllocRunner_DeploymentHealth_Healthy_Migration asserts that health is
877// reported for services that got migrated; not just part of deployments.
878func TestAllocRunner_DeploymentHealth_Healthy_Migration(t *testing.T) {
879	t.Parallel()
880
881	alloc := mock.Alloc()
882
883	// Ensure the alloc is *not* part of a deployment
884	alloc.DeploymentID = ""
885
886	// Shorten the default migration healthy time
887	tg := alloc.Job.TaskGroups[0]
888	tg.Migrate = structs.DefaultMigrateStrategy()
889	tg.Migrate.MinHealthyTime = 100 * time.Millisecond
890	tg.Migrate.HealthCheck = structs.MigrateStrategyHealthStates
891
892	task := tg.Tasks[0]
893	task.Driver = "mock_driver"
894	task.Config = map[string]interface{}{
895		"run_for": "30s",
896	}
897
898	conf, cleanup := testAllocRunnerConfig(t, alloc)
899	defer cleanup()
900
901	ar, err := NewAllocRunner(conf)
902	require.NoError(t, err)
903	go ar.Run()
904	defer destroy(ar)
905
906	upd := conf.StateUpdater.(*MockStateUpdater)
907	testutil.WaitForResult(func() (bool, error) {
908		last := upd.Last()
909		if last == nil {
910			return false, fmt.Errorf("No updates")
911		}
912		if !last.DeploymentStatus.HasHealth() {
913			return false, fmt.Errorf("want deployment status unhealthy; got unset")
914		} else if !*last.DeploymentStatus.Healthy {
915			// This is fatal
916			t.Fatal("want deployment status healthy; got unhealthy")
917		}
918		return true, nil
919	}, func(err error) {
920		require.NoError(t, err)
921	})
922}
923
924// TestAllocRunner_DeploymentHealth_Healthy_NoChecks asserts that the health
925// watcher will mark the allocation as healthy based on task states alone.
926func TestAllocRunner_DeploymentHealth_Healthy_NoChecks(t *testing.T) {
927	t.Parallel()
928
929	alloc := mock.Alloc()
930
931	task := alloc.Job.TaskGroups[0].Tasks[0]
932	task.Driver = "mock_driver"
933	task.Config = map[string]interface{}{
934		"run_for": "10s",
935	}
936
937	// Create a task that takes longer to become healthy
938	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task.Copy())
939	alloc.AllocatedResources.Tasks["task2"] = alloc.AllocatedResources.Tasks["web"].Copy()
940	task2 := alloc.Job.TaskGroups[0].Tasks[1]
941	task2.Name = "task2"
942	task2.Config["start_block_for"] = "500ms"
943
944	// Make the alloc be part of a deployment that uses task states for
945	// health checks
946	alloc.DeploymentID = uuid.Generate()
947	alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy()
948	alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_TaskStates
949	alloc.Job.TaskGroups[0].Update.MaxParallel = 1
950	alloc.Job.TaskGroups[0].Update.MinHealthyTime = 100 * time.Millisecond
951
952	conf, cleanup := testAllocRunnerConfig(t, alloc)
953	defer cleanup()
954
955	ar, err := NewAllocRunner(conf)
956	require.NoError(t, err)
957
958	start, done := time.Now(), time.Time{}
959	go ar.Run()
960	defer destroy(ar)
961
962	upd := conf.StateUpdater.(*MockStateUpdater)
963	testutil.WaitForResult(func() (bool, error) {
964		last := upd.Last()
965		if last == nil {
966			return false, fmt.Errorf("No updates")
967		}
968		if !last.DeploymentStatus.HasHealth() {
969			return false, fmt.Errorf("want deployment status unhealthy; got unset")
970		} else if !*last.DeploymentStatus.Healthy {
971			// This is fatal
972			t.Fatal("want deployment status healthy; got unhealthy")
973		}
974
975		// Capture the done timestamp
976		done = last.DeploymentStatus.Timestamp
977		return true, nil
978	}, func(err error) {
979		require.NoError(t, err)
980	})
981
982	if d := done.Sub(start); d < 500*time.Millisecond {
983		t.Fatalf("didn't wait for second task group. Only took %v", d)
984	}
985}
986
987// TestAllocRunner_DeploymentHealth_Unhealthy_Checks asserts that the health
988// watcher will mark the allocation as unhealthy with failing checks.
989func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
990	t.Parallel()
991
992	alloc := mock.Alloc()
993	task := alloc.Job.TaskGroups[0].Tasks[0]
994	task.Driver = "mock_driver"
995	task.Config = map[string]interface{}{
996		"run_for": "10s",
997	}
998
999	// Set a service with check
1000	task.Services = []*structs.Service{
1001		{
1002			Name:      "fakservice",
1003			PortLabel: "http",
1004			Checks: []*structs.ServiceCheck{
1005				{
1006					Name:     "fakecheck",
1007					Type:     structs.ServiceCheckScript,
1008					Command:  "true",
1009					Interval: 30 * time.Second,
1010					Timeout:  5 * time.Second,
1011				},
1012			},
1013		},
1014	}
1015
1016	// Make the alloc be part of a deployment
1017	alloc.DeploymentID = uuid.Generate()
1018	alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy()
1019	alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_Checks
1020	alloc.Job.TaskGroups[0].Update.MaxParallel = 1
1021	alloc.Job.TaskGroups[0].Update.MinHealthyTime = 100 * time.Millisecond
1022	alloc.Job.TaskGroups[0].Update.HealthyDeadline = 1 * time.Second
1023
1024	checkUnhealthy := &api.AgentCheck{
1025		CheckID: uuid.Generate(),
1026		Status:  api.HealthWarning,
1027	}
1028
1029	conf, cleanup := testAllocRunnerConfig(t, alloc)
1030	defer cleanup()
1031
1032	// Only return the check as healthy after a duration
1033	consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
1034	consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
1035		return &consul.AllocRegistration{
1036			Tasks: map[string]*consul.ServiceRegistrations{
1037				task.Name: {
1038					Services: map[string]*consul.ServiceRegistration{
1039						"123": {
1040							Service: &api.AgentService{Service: "fakeservice"},
1041							Checks:  []*api.AgentCheck{checkUnhealthy},
1042						},
1043					},
1044				},
1045			},
1046		}, nil
1047	}
1048
1049	ar, err := NewAllocRunner(conf)
1050	require.NoError(t, err)
1051	go ar.Run()
1052	defer destroy(ar)
1053
1054	var lastUpdate *structs.Allocation
1055	upd := conf.StateUpdater.(*MockStateUpdater)
1056	testutil.WaitForResult(func() (bool, error) {
1057		lastUpdate = upd.Last()
1058		if lastUpdate == nil {
1059			return false, fmt.Errorf("No updates")
1060		}
1061		if !lastUpdate.DeploymentStatus.HasHealth() {
1062			return false, fmt.Errorf("want deployment status unhealthy; got unset")
1063		} else if *lastUpdate.DeploymentStatus.Healthy {
1064			// This is fatal
1065			t.Fatal("want deployment status unhealthy; got healthy")
1066		}
1067		return true, nil
1068	}, func(err error) {
1069		require.NoError(t, err)
1070	})
1071
1072	// Assert that we have an event explaining why we are unhealthy.
1073	require.Len(t, lastUpdate.TaskStates, 1)
1074	state := lastUpdate.TaskStates[task.Name]
1075	require.NotNil(t, state)
1076	require.NotEmpty(t, state.Events)
1077	last := state.Events[len(state.Events)-1]
1078	require.Equal(t, allochealth.AllocHealthEventSource, last.Type)
1079	require.Contains(t, last.Message, "by deadline")
1080}
1081
1082// TestAllocRunner_Destroy asserts that Destroy kills and cleans up a running
1083// alloc.
1084func TestAllocRunner_Destroy(t *testing.T) {
1085	t.Parallel()
1086
1087	// Ensure task takes some time
1088	alloc := mock.BatchAlloc()
1089	task := alloc.Job.TaskGroups[0].Tasks[0]
1090	task.Config["run_for"] = "10s"
1091
1092	conf, cleanup := testAllocRunnerConfig(t, alloc)
1093	defer cleanup()
1094
1095	// Use a MemDB to assert alloc state gets cleaned up
1096	conf.StateDB = state.NewMemDB(conf.Logger)
1097
1098	ar, err := NewAllocRunner(conf)
1099	require.NoError(t, err)
1100	go ar.Run()
1101
1102	// Wait for alloc to be running
1103	testutil.WaitForResult(func() (bool, error) {
1104		state := ar.AllocState()
1105
1106		return state.ClientStatus == structs.AllocClientStatusRunning,
1107			fmt.Errorf("got client status %v; want running", state.ClientStatus)
1108	}, func(err error) {
1109		require.NoError(t, err)
1110	})
1111
1112	// Assert state was stored
1113	ls, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name)
1114	require.NoError(t, err)
1115	require.NotNil(t, ls)
1116	require.NotNil(t, ts)
1117
1118	// Now destroy
1119	ar.Destroy()
1120
1121	select {
1122	case <-ar.DestroyCh():
1123		// Destroyed properly!
1124	case <-time.After(10 * time.Second):
1125		require.Fail(t, "timed out waiting for alloc to be destroyed")
1126	}
1127
1128	// Assert alloc is dead
1129	state := ar.AllocState()
1130	require.Equal(t, structs.AllocClientStatusComplete, state.ClientStatus)
1131
1132	// Assert the state was cleaned
1133	ls, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name)
1134	require.NoError(t, err)
1135	require.Nil(t, ls)
1136	require.Nil(t, ts)
1137
1138	// Assert the alloc directory was cleaned
1139	if _, err := os.Stat(ar.allocDir.AllocDir); err == nil {
1140		require.Fail(t, "alloc dir still exists: %v", ar.allocDir.AllocDir)
1141	} else if !os.IsNotExist(err) {
1142		require.Failf(t, "expected NotExist error", "found %v", err)
1143	}
1144}
1145
1146func TestAllocRunner_SimpleRun(t *testing.T) {
1147	t.Parallel()
1148
1149	alloc := mock.BatchAlloc()
1150
1151	conf, cleanup := testAllocRunnerConfig(t, alloc)
1152	defer cleanup()
1153	ar, err := NewAllocRunner(conf)
1154	require.NoError(t, err)
1155	go ar.Run()
1156	defer destroy(ar)
1157
1158	// Wait for alloc to be running
1159	testutil.WaitForResult(func() (bool, error) {
1160		state := ar.AllocState()
1161
1162		if state.ClientStatus != structs.AllocClientStatusComplete {
1163			return false, fmt.Errorf("got status %v; want %v", state.ClientStatus, structs.AllocClientStatusComplete)
1164		}
1165
1166		for t, s := range state.TaskStates {
1167			if s.FinishedAt.IsZero() {
1168				return false, fmt.Errorf("task %q has zero FinishedAt value", t)
1169			}
1170		}
1171
1172		return true, nil
1173	}, func(err error) {
1174		require.NoError(t, err)
1175	})
1176
1177}
1178
1179// TestAllocRunner_MoveAllocDir asserts that a rescheduled
1180// allocation copies ephemeral disk content from previous alloc run
1181func TestAllocRunner_MoveAllocDir(t *testing.T) {
1182	t.Parallel()
1183
1184	// Step 1: start and run a task
1185	alloc := mock.BatchAlloc()
1186	conf, cleanup := testAllocRunnerConfig(t, alloc)
1187	defer cleanup()
1188	ar, err := NewAllocRunner(conf)
1189	require.NoError(t, err)
1190	ar.Run()
1191	defer destroy(ar)
1192
1193	require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus)
1194
1195	// Step 2. Modify its directory
1196	task := alloc.Job.TaskGroups[0].Tasks[0]
1197	dataFile := filepath.Join(ar.allocDir.SharedDir, "data", "data_file")
1198	ioutil.WriteFile(dataFile, []byte("hello world"), os.ModePerm)
1199	taskDir := ar.allocDir.TaskDirs[task.Name]
1200	taskLocalFile := filepath.Join(taskDir.LocalDir, "local_file")
1201	ioutil.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm)
1202
1203	// Step 3. Start a new alloc
1204	alloc2 := mock.BatchAlloc()
1205	alloc2.PreviousAllocation = alloc.ID
1206	alloc2.Job.TaskGroups[0].EphemeralDisk.Sticky = true
1207
1208	conf2, cleanup := testAllocRunnerConfig(t, alloc2)
1209	conf2.PrevAllocWatcher, conf2.PrevAllocMigrator = allocwatcher.NewAllocWatcher(allocwatcher.Config{
1210		Alloc:          alloc2,
1211		PreviousRunner: ar,
1212		Logger:         conf2.Logger,
1213	})
1214	defer cleanup()
1215	ar2, err := NewAllocRunner(conf2)
1216	require.NoError(t, err)
1217
1218	ar2.Run()
1219	defer destroy(ar2)
1220
1221	require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus)
1222
1223	// Ensure that data from ar was moved to ar2
1224	dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file")
1225	fileInfo, _ := os.Stat(dataFile)
1226	require.NotNilf(t, fileInfo, "file %q not found", dataFile)
1227
1228	taskDir = ar2.allocDir.TaskDirs[task.Name]
1229	taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file")
1230	fileInfo, _ = os.Stat(taskLocalFile)
1231	require.NotNilf(t, fileInfo, "file %q not found", dataFile)
1232
1233}
1234
1235// TestAllocRuner_HandlesArtifactFailure ensures that if one task in a task group is
1236// retrying fetching an artifact, other tasks in the group should be able
1237// to proceed.
1238func TestAllocRunner_HandlesArtifactFailure(t *testing.T) {
1239	t.Parallel()
1240
1241	alloc := mock.BatchAlloc()
1242	rp := &structs.RestartPolicy{
1243		Mode:     structs.RestartPolicyModeFail,
1244		Attempts: 1,
1245		Delay:    time.Nanosecond,
1246		Interval: time.Hour,
1247	}
1248	alloc.Job.TaskGroups[0].RestartPolicy = rp
1249	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy = rp
1250
1251	// Create a new task with a bad artifact
1252	badtask := alloc.Job.TaskGroups[0].Tasks[0].Copy()
1253	badtask.Name = "bad"
1254	badtask.Artifacts = []*structs.TaskArtifact{
1255		{GetterSource: "http://127.0.0.1:0/foo/bar/baz"},
1256	}
1257
1258	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, badtask)
1259	alloc.AllocatedResources.Tasks["bad"] = &structs.AllocatedTaskResources{
1260		Cpu: structs.AllocatedCpuResources{
1261			CpuShares: 500,
1262		},
1263		Memory: structs.AllocatedMemoryResources{
1264			MemoryMB: 256,
1265		},
1266	}
1267
1268	conf, cleanup := testAllocRunnerConfig(t, alloc)
1269	defer cleanup()
1270	ar, err := NewAllocRunner(conf)
1271	require.NoError(t, err)
1272	go ar.Run()
1273	defer destroy(ar)
1274
1275	testutil.WaitForResult(func() (bool, error) {
1276		state := ar.AllocState()
1277
1278		switch state.ClientStatus {
1279		case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
1280			return true, nil
1281		default:
1282			return false, fmt.Errorf("got status %v but want terminal", state.ClientStatus)
1283		}
1284
1285	}, func(err error) {
1286		require.NoError(t, err)
1287	})
1288
1289	state := ar.AllocState()
1290	require.Equal(t, structs.AllocClientStatusFailed, state.ClientStatus)
1291	require.Equal(t, structs.TaskStateDead, state.TaskStates["web"].State)
1292	require.True(t, state.TaskStates["web"].Successful())
1293	require.Equal(t, structs.TaskStateDead, state.TaskStates["bad"].State)
1294	require.True(t, state.TaskStates["bad"].Failed)
1295}
1296
1297// Test that alloc runner kills tasks in task group when another task fails
1298func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
1299	alloc := mock.Alloc()
1300	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
1301	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
1302	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
1303
1304	// Create two tasks in the task group
1305	task := alloc.Job.TaskGroups[0].Tasks[0]
1306	task.Name = "task1"
1307	task.Driver = "mock_driver"
1308	task.KillTimeout = 10 * time.Millisecond
1309	task.Config = map[string]interface{}{
1310		"run_for": "10s",
1311	}
1312	// Set a service with check
1313	task.Services = []*structs.Service{
1314		{
1315			Name:      "fakservice",
1316			PortLabel: "http",
1317			Checks: []*structs.ServiceCheck{
1318				{
1319					Name:     "fakecheck",
1320					Type:     structs.ServiceCheckScript,
1321					Command:  "true",
1322					Interval: 30 * time.Second,
1323					Timeout:  5 * time.Second,
1324				},
1325			},
1326		},
1327	}
1328
1329	task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
1330	task2.Name = "task 2"
1331	task2.Driver = "mock_driver"
1332	task2.Config = map[string]interface{}{
1333		"start_error": "fail task please",
1334	}
1335	alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2)
1336	alloc.AllocatedResources.Tasks[task.Name] = tr
1337	alloc.AllocatedResources.Tasks[task2.Name] = tr
1338
1339	// Make the alloc be part of a deployment
1340	alloc.DeploymentID = uuid.Generate()
1341	alloc.Job.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy()
1342	alloc.Job.TaskGroups[0].Update.HealthCheck = structs.UpdateStrategyHealthCheck_Checks
1343	alloc.Job.TaskGroups[0].Update.MaxParallel = 1
1344	alloc.Job.TaskGroups[0].Update.MinHealthyTime = 10 * time.Millisecond
1345	alloc.Job.TaskGroups[0].Update.HealthyDeadline = 2 * time.Second
1346
1347	checkHealthy := &api.AgentCheck{
1348		CheckID: uuid.Generate(),
1349		Status:  api.HealthPassing,
1350	}
1351
1352	conf, cleanup := testAllocRunnerConfig(t, alloc)
1353	defer cleanup()
1354
1355	consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
1356	consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
1357		return &consul.AllocRegistration{
1358			Tasks: map[string]*consul.ServiceRegistrations{
1359				task.Name: {
1360					Services: map[string]*consul.ServiceRegistration{
1361						"123": {
1362							Service: &api.AgentService{Service: "fakeservice"},
1363							Checks:  []*api.AgentCheck{checkHealthy},
1364						},
1365					},
1366				},
1367			},
1368		}, nil
1369	}
1370
1371	ar, err := NewAllocRunner(conf)
1372	require.NoError(t, err)
1373	defer destroy(ar)
1374	go ar.Run()
1375	upd := conf.StateUpdater.(*MockStateUpdater)
1376
1377	testutil.WaitForResult(func() (bool, error) {
1378		last := upd.Last()
1379		if last == nil {
1380			return false, fmt.Errorf("No updates")
1381		}
1382		if last.ClientStatus != structs.AllocClientStatusFailed {
1383			return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
1384		}
1385
1386		// Task One should be killed
1387		state1 := last.TaskStates[task.Name]
1388		if state1.State != structs.TaskStateDead {
1389			return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
1390		}
1391		if len(state1.Events) < 2 {
1392			// At least have a received and destroyed
1393			return false, fmt.Errorf("Unexpected number of events")
1394		}
1395
1396		found := false
1397		for _, e := range state1.Events {
1398			if e.Type != structs.TaskSiblingFailed {
1399				found = true
1400			}
1401		}
1402
1403		if !found {
1404			return false, fmt.Errorf("Did not find event %v", structs.TaskSiblingFailed)
1405		}
1406
1407		// Task Two should be failed
1408		state2 := last.TaskStates[task2.Name]
1409		if state2.State != structs.TaskStateDead {
1410			return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
1411		}
1412		if !state2.Failed {
1413			return false, fmt.Errorf("task2 should have failed")
1414		}
1415
1416		if !last.DeploymentStatus.HasHealth() {
1417			return false, fmt.Errorf("Expected deployment health to be non nil")
1418		}
1419
1420		return true, nil
1421	}, func(err error) {
1422		require.Fail(t, "err: %v", err)
1423	})
1424}
1425
1426// Test that alloc becoming terminal should destroy the alloc runner
1427func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
1428	t.Parallel()
1429	alloc := mock.BatchAlloc()
1430	tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
1431	alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
1432	alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
1433	// Ensure task takes some time
1434	task := alloc.Job.TaskGroups[0].Tasks[0]
1435	task.Driver = "mock_driver"
1436	task.Config["run_for"] = "10s"
1437	alloc.AllocatedResources.Tasks[task.Name] = tr
1438
1439	conf, cleanup := testAllocRunnerConfig(t, alloc)
1440	defer cleanup()
1441	ar, err := NewAllocRunner(conf)
1442	require.NoError(t, err)
1443	defer destroy(ar)
1444	go ar.Run()
1445	upd := conf.StateUpdater.(*MockStateUpdater)
1446
1447	testutil.WaitForResult(func() (bool, error) {
1448		last := upd.Last()
1449		if last == nil {
1450			return false, fmt.Errorf("No updates")
1451		}
1452		if last.ClientStatus != structs.AllocClientStatusRunning {
1453			return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
1454		}
1455		return true, nil
1456	}, func(err error) {
1457		require.Fail(t, "err: %v", err)
1458	})
1459
1460	// Update the alloc to be terminal which should cause the alloc runner to
1461	// stop the tasks and wait for a destroy.
1462	update := ar.alloc.Copy()
1463	update.DesiredStatus = structs.AllocDesiredStatusStop
1464	ar.Update(update)
1465
1466	testutil.WaitForResult(func() (bool, error) {
1467		last := upd.Last()
1468		if last == nil {
1469			return false, fmt.Errorf("No updates")
1470		}
1471
1472		// Check the status has changed.
1473		if last.ClientStatus != structs.AllocClientStatusComplete {
1474			return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
1475		}
1476
1477		// Check the alloc directory still exists
1478		if _, err := os.Stat(ar.allocDir.AllocDir); err != nil {
1479			return false, fmt.Errorf("alloc dir destroyed: %v", ar.allocDir.AllocDir)
1480		}
1481
1482		return true, nil
1483	}, func(err error) {
1484		require.Fail(t, "err: %v", err)
1485	})
1486
1487	// Send the destroy signal and ensure the AllocRunner cleans up.
1488	ar.Destroy()
1489
1490	testutil.WaitForResult(func() (bool, error) {
1491		last := upd.Last()
1492		if last == nil {
1493			return false, fmt.Errorf("No updates")
1494		}
1495
1496		// Check the status has changed.
1497		if last.ClientStatus != structs.AllocClientStatusComplete {
1498			return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
1499		}
1500
1501		// Check the alloc directory was cleaned
1502		if _, err := os.Stat(ar.allocDir.AllocDir); err == nil {
1503			return false, fmt.Errorf("alloc dir still exists: %v", ar.allocDir.AllocDir)
1504		} else if !os.IsNotExist(err) {
1505			return false, fmt.Errorf("stat err: %v", err)
1506		}
1507
1508		return true, nil
1509	}, func(err error) {
1510		require.Fail(t, "err: %v", err)
1511	})
1512}
1513
1514// TestAllocRunner_PersistState_Destroyed asserts that destroyed allocs don't persist anymore
1515func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
1516	t.Parallel()
1517
1518	alloc := mock.BatchAlloc()
1519	taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
1520
1521	conf, cleanup := testAllocRunnerConfig(t, alloc)
1522	conf.StateDB = state.NewMemDB(conf.Logger)
1523
1524	defer cleanup()
1525	ar, err := NewAllocRunner(conf)
1526	require.NoError(t, err)
1527	defer destroy(ar)
1528
1529	go ar.Run()
1530
1531	select {
1532	case <-ar.WaitCh():
1533	case <-time.After(10 * time.Second):
1534		require.Fail(t, "timed out waiting for alloc to complete")
1535	}
1536
1537	// test final persisted state upon completion
1538	require.NoError(t, ar.PersistState())
1539	allocs, _, err := conf.StateDB.GetAllAllocations()
1540	require.NoError(t, err)
1541	require.Len(t, allocs, 1)
1542	require.Equal(t, alloc.ID, allocs[0].ID)
1543	_, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, taskName)
1544	require.NoError(t, err)
1545	require.Equal(t, structs.TaskStateDead, ts.State)
1546
1547	// check that DB alloc is empty after destroying AR
1548	ar.Destroy()
1549	select {
1550	case <-ar.DestroyCh():
1551	case <-time.After(10 * time.Second):
1552		require.Fail(t, "timedout waiting for destruction")
1553	}
1554
1555	allocs, _, err = conf.StateDB.GetAllAllocations()
1556	require.NoError(t, err)
1557	require.Empty(t, allocs)
1558	_, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, taskName)
1559	require.NoError(t, err)
1560	require.Nil(t, ts)
1561
1562	// check that DB alloc is empty after persisting state of destroyed AR
1563	ar.PersistState()
1564	allocs, _, err = conf.StateDB.GetAllAllocations()
1565	require.NoError(t, err)
1566	require.Empty(t, allocs)
1567	_, ts, err = conf.StateDB.GetTaskRunnerState(alloc.ID, taskName)
1568	require.NoError(t, err)
1569	require.Nil(t, ts)
1570}
1571