1package client
2
3import (
4	"fmt"
5	"io/ioutil"
6	"net"
7	"os"
8	"path/filepath"
9	"runtime"
10	"sort"
11	"testing"
12	"time"
13
14	memdb "github.com/hashicorp/go-memdb"
15	trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
16	"github.com/hashicorp/nomad/client/config"
17	consulApi "github.com/hashicorp/nomad/client/consul"
18	"github.com/hashicorp/nomad/client/fingerprint"
19	"github.com/hashicorp/nomad/client/state"
20	"github.com/hashicorp/nomad/command/agent/consul"
21	"github.com/hashicorp/nomad/helper/pluginutils/catalog"
22	"github.com/hashicorp/nomad/helper/pluginutils/singleton"
23	"github.com/hashicorp/nomad/helper/testlog"
24	"github.com/hashicorp/nomad/helper/uuid"
25	"github.com/hashicorp/nomad/nomad"
26	"github.com/hashicorp/nomad/nomad/mock"
27	"github.com/hashicorp/nomad/nomad/structs"
28	nconfig "github.com/hashicorp/nomad/nomad/structs/config"
29	"github.com/hashicorp/nomad/plugins/device"
30	psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
31	"github.com/hashicorp/nomad/testutil"
32	"github.com/stretchr/testify/assert"
33
34	cstate "github.com/hashicorp/nomad/client/state"
35	"github.com/stretchr/testify/require"
36)
37
38func testACLServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, *structs.ACLToken, func()) {
39	server, token, cleanup := nomad.TestACLServer(t, cb)
40	return server, server.GetConfig().RPCAddr.String(), token, cleanup
41}
42
43func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string, func()) {
44	server, cleanup := nomad.TestServer(t, cb)
45	return server, server.GetConfig().RPCAddr.String(), cleanup
46}
47
48func TestClient_StartStop(t *testing.T) {
49	t.Parallel()
50	client, cleanup := TestClient(t, nil)
51	defer cleanup()
52	if err := client.Shutdown(); err != nil {
53		t.Fatalf("err: %v", err)
54	}
55}
56
57// Certain labels for metrics are dependant on client initial setup. This tests
58// that the client has properly initialized before we assign values to labels
59func TestClient_BaseLabels(t *testing.T) {
60	t.Parallel()
61	assert := assert.New(t)
62
63	client, cleanup := TestClient(t, nil)
64	if err := client.Shutdown(); err != nil {
65		t.Fatalf("err: %v", err)
66	}
67	defer cleanup()
68
69	// directly invoke this function, as otherwise this will fail on a CI build
70	// due to a race condition
71	client.emitStats()
72
73	baseLabels := client.baseLabels
74	assert.NotEqual(0, len(baseLabels))
75
76	nodeID := client.Node().ID
77	for _, e := range baseLabels {
78		if e.Name == "node_id" {
79			assert.Equal(nodeID, e.Value)
80		}
81	}
82}
83
84func TestClient_RPC(t *testing.T) {
85	t.Parallel()
86
87	_, addr, cleanupS1 := testServer(t, nil)
88	defer cleanupS1()
89
90	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
91		c.Servers = []string{addr}
92	})
93	defer cleanupC1()
94
95	// RPC should succeed
96	testutil.WaitForResult(func() (bool, error) {
97		var out struct{}
98		err := c1.RPC("Status.Ping", struct{}{}, &out)
99		return err == nil, err
100	}, func(err error) {
101		t.Fatalf("err: %v", err)
102	})
103}
104
105func TestClient_RPC_FireRetryWatchers(t *testing.T) {
106	t.Parallel()
107
108	_, addr, cleanupS1 := testServer(t, nil)
109	defer cleanupS1()
110
111	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
112		c.Servers = []string{addr}
113	})
114	defer cleanupC1()
115
116	watcher := c1.rpcRetryWatcher()
117
118	// RPC should succeed
119	testutil.WaitForResult(func() (bool, error) {
120		var out struct{}
121		err := c1.RPC("Status.Ping", struct{}{}, &out)
122		return err == nil, err
123	}, func(err error) {
124		t.Fatalf("err: %v", err)
125	})
126
127	select {
128	case <-watcher:
129	default:
130		t.Fatal("watcher should be fired")
131	}
132}
133
134func TestClient_RPC_Passthrough(t *testing.T) {
135	t.Parallel()
136
137	s1, _, cleanupS1 := testServer(t, nil)
138	defer cleanupS1()
139
140	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
141		c.RPCHandler = s1
142	})
143	defer cleanupC1()
144
145	// RPC should succeed
146	testutil.WaitForResult(func() (bool, error) {
147		var out struct{}
148		err := c1.RPC("Status.Ping", struct{}{}, &out)
149		return err == nil, err
150	}, func(err error) {
151		t.Fatalf("err: %v", err)
152	})
153}
154
155func TestClient_Fingerprint(t *testing.T) {
156	t.Parallel()
157
158	c, cleanup := TestClient(t, nil)
159	defer cleanup()
160
161	// Ensure we are fingerprinting
162	testutil.WaitForResult(func() (bool, error) {
163		node := c.Node()
164		if _, ok := node.Attributes["kernel.name"]; !ok {
165			return false, fmt.Errorf("Expected value for kernel.name")
166		}
167		if _, ok := node.Attributes["cpu.arch"]; !ok {
168			return false, fmt.Errorf("Expected value for cpu.arch")
169		}
170		return true, nil
171	}, func(err error) {
172		t.Fatalf("err: %v", err)
173	})
174}
175
176// TestClient_Fingerprint_Periodic asserts that driver node attributes are
177// periodically fingerprinted.
178func TestClient_Fingerprint_Periodic(t *testing.T) {
179	t.Parallel()
180
181	c1, cleanup := TestClient(t, func(c *config.Config) {
182		confs := []*nconfig.PluginConfig{
183			{
184				Name: "mock_driver",
185				Config: map[string]interface{}{
186					"shutdown_periodic_after":    true,
187					"shutdown_periodic_duration": time.Second,
188				},
189			},
190		}
191		c.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", nil, confs)
192	})
193	defer cleanup()
194
195	node := c1.config.Node
196	{
197		// Ensure the mock driver is registered on the client
198		testutil.WaitForResult(func() (bool, error) {
199			c1.configLock.Lock()
200			defer c1.configLock.Unlock()
201
202			// assert that the driver is set on the node attributes
203			mockDriverInfoAttr := node.Attributes["driver.mock_driver"]
204			if mockDriverInfoAttr == "" {
205				return false, fmt.Errorf("mock driver is empty when it should be set on the node attributes")
206			}
207
208			mockDriverInfo := node.Drivers["mock_driver"]
209
210			// assert that the Driver information for the node is also set correctly
211			if mockDriverInfo == nil {
212				return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
213			}
214			if !mockDriverInfo.Detected {
215				return false, fmt.Errorf("mock driver should be set as detected")
216			}
217			if !mockDriverInfo.Healthy {
218				return false, fmt.Errorf("mock driver should be set as healthy")
219			}
220			if mockDriverInfo.HealthDescription == "" {
221				return false, fmt.Errorf("mock driver description should not be empty")
222			}
223			return true, nil
224		}, func(err error) {
225			t.Fatalf("err: %v", err)
226		})
227	}
228
229	{
230		testutil.WaitForResult(func() (bool, error) {
231			c1.configLock.Lock()
232			defer c1.configLock.Unlock()
233			mockDriverInfo := node.Drivers["mock_driver"]
234			// assert that the Driver information for the node is also set correctly
235			if mockDriverInfo == nil {
236				return false, fmt.Errorf("mock driver is nil when it should be set on node Drivers")
237			}
238			if mockDriverInfo.Detected {
239				return false, fmt.Errorf("mock driver should not be set as detected")
240			}
241			if mockDriverInfo.Healthy {
242				return false, fmt.Errorf("mock driver should not be set as healthy")
243			}
244			if mockDriverInfo.HealthDescription == "" {
245				return false, fmt.Errorf("mock driver description should not be empty")
246			}
247			return true, nil
248		}, func(err error) {
249			t.Fatalf("err: %v", err)
250		})
251	}
252}
253
254// TestClient_MixedTLS asserts that when a server is running with TLS enabled
255// it will reject any RPC connections from clients that lack TLS. See #2525
256func TestClient_MixedTLS(t *testing.T) {
257	t.Parallel()
258	const (
259		cafile  = "../helper/tlsutil/testdata/ca.pem"
260		foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
261		fookey  = "../helper/tlsutil/testdata/nomad-foo-key.pem"
262	)
263	s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) {
264		c.TLSConfig = &nconfig.TLSConfig{
265			EnableHTTP:           true,
266			EnableRPC:            true,
267			VerifyServerHostname: true,
268			CAFile:               cafile,
269			CertFile:             foocert,
270			KeyFile:              fookey,
271		}
272	})
273	defer cleanupS1()
274	testutil.WaitForLeader(t, s1.RPC)
275
276	c1, cleanup := TestClient(t, func(c *config.Config) {
277		c.Servers = []string{addr}
278	})
279	defer cleanup()
280
281	req := structs.NodeSpecificRequest{
282		NodeID:       c1.Node().ID,
283		QueryOptions: structs.QueryOptions{Region: "global"},
284	}
285	var out structs.SingleNodeResponse
286	testutil.AssertUntil(100*time.Millisecond,
287		func() (bool, error) {
288			err := c1.RPC("Node.GetNode", &req, &out)
289			if err == nil {
290				return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out)
291			}
292			return true, nil
293		},
294		func(err error) {
295			t.Fatalf(err.Error())
296		},
297	)
298}
299
300// TestClient_BadTLS asserts that when a client and server are running with TLS
301// enabled -- but their certificates are signed by different CAs -- they're
302// unable to communicate.
303func TestClient_BadTLS(t *testing.T) {
304	t.Parallel()
305
306	const (
307		cafile  = "../helper/tlsutil/testdata/ca.pem"
308		foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
309		fookey  = "../helper/tlsutil/testdata/nomad-foo-key.pem"
310		badca   = "../helper/tlsutil/testdata/ca-bad.pem"
311		badcert = "../helper/tlsutil/testdata/nomad-bad.pem"
312		badkey  = "../helper/tlsutil/testdata/nomad-bad-key.pem"
313	)
314	s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) {
315		c.TLSConfig = &nconfig.TLSConfig{
316			EnableHTTP:           true,
317			EnableRPC:            true,
318			VerifyServerHostname: true,
319			CAFile:               cafile,
320			CertFile:             foocert,
321			KeyFile:              fookey,
322		}
323	})
324	defer cleanupS1()
325	testutil.WaitForLeader(t, s1.RPC)
326
327	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
328		c.Servers = []string{addr}
329		c.TLSConfig = &nconfig.TLSConfig{
330			EnableHTTP:           true,
331			EnableRPC:            true,
332			VerifyServerHostname: true,
333			CAFile:               badca,
334			CertFile:             badcert,
335			KeyFile:              badkey,
336		}
337	})
338	defer cleanupC1()
339
340	req := structs.NodeSpecificRequest{
341		NodeID:       c1.Node().ID,
342		QueryOptions: structs.QueryOptions{Region: "global"},
343	}
344	var out structs.SingleNodeResponse
345	testutil.AssertUntil(100*time.Millisecond,
346		func() (bool, error) {
347			err := c1.RPC("Node.GetNode", &req, &out)
348			if err == nil {
349				return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", out)
350			}
351			return true, nil
352		},
353		func(err error) {
354			t.Fatalf(err.Error())
355		},
356	)
357}
358
359func TestClient_Register(t *testing.T) {
360	t.Parallel()
361
362	s1, _, cleanupS1 := testServer(t, nil)
363	defer cleanupS1()
364	testutil.WaitForLeader(t, s1.RPC)
365
366	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
367		c.RPCHandler = s1
368	})
369	defer cleanupC1()
370
371	req := structs.NodeSpecificRequest{
372		NodeID:       c1.Node().ID,
373		QueryOptions: structs.QueryOptions{Region: "global"},
374	}
375	var out structs.SingleNodeResponse
376
377	// Register should succeed
378	testutil.WaitForResult(func() (bool, error) {
379		err := s1.RPC("Node.GetNode", &req, &out)
380		if err != nil {
381			return false, err
382		}
383		if out.Node == nil {
384			return false, fmt.Errorf("missing reg")
385		}
386		return out.Node.ID == req.NodeID, nil
387	}, func(err error) {
388		t.Fatalf("err: %v", err)
389	})
390}
391
392func TestClient_Heartbeat(t *testing.T) {
393	t.Parallel()
394
395	s1, _, cleanupS1 := testServer(t, func(c *nomad.Config) {
396		c.MinHeartbeatTTL = 50 * time.Millisecond
397	})
398	defer cleanupS1()
399	testutil.WaitForLeader(t, s1.RPC)
400
401	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
402		c.RPCHandler = s1
403	})
404	defer cleanupC1()
405
406	req := structs.NodeSpecificRequest{
407		NodeID:       c1.Node().ID,
408		QueryOptions: structs.QueryOptions{Region: "global"},
409	}
410	var out structs.SingleNodeResponse
411
412	// Register should succeed
413	testutil.WaitForResult(func() (bool, error) {
414		err := s1.RPC("Node.GetNode", &req, &out)
415		if err != nil {
416			return false, err
417		}
418		if out.Node == nil {
419			return false, fmt.Errorf("missing reg")
420		}
421		return out.Node.Status == structs.NodeStatusReady, nil
422	}, func(err error) {
423		t.Fatalf("err: %v", err)
424	})
425}
426
427// TestClient_UpdateAllocStatus that once running allocations send updates to
428// the server.
429func TestClient_UpdateAllocStatus(t *testing.T) {
430	t.Parallel()
431
432	s1, _, cleanupS1 := testServer(t, nil)
433	defer cleanupS1()
434
435	_, cleanup := TestClient(t, func(c *config.Config) {
436		c.RPCHandler = s1
437	})
438	defer cleanup()
439
440	job := mock.Job()
441	// allow running job on any node including self client, that may not be a Linux box
442	job.Constraints = nil
443	job.TaskGroups[0].Count = 1
444	task := job.TaskGroups[0].Tasks[0]
445	task.Driver = "mock_driver"
446	task.Config = map[string]interface{}{
447		"run_for": "10s",
448	}
449	task.Services = nil
450
451	// WaitForRunning polls the server until the ClientStatus is running
452	testutil.WaitForRunning(t, s1.RPC, job)
453}
454
455func TestClient_WatchAllocs(t *testing.T) {
456	t.Parallel()
457
458	s1, _, cleanupS1 := testServer(t, nil)
459	defer cleanupS1()
460	testutil.WaitForLeader(t, s1.RPC)
461
462	c1, cleanup := TestClient(t, func(c *config.Config) {
463		c.RPCHandler = s1
464	})
465	defer cleanup()
466
467	// Wait until the node is ready
468	waitTilNodeReady(c1, t)
469
470	// Create mock allocations
471	job := mock.Job()
472	job.TaskGroups[0].Count = 3
473	job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
474	job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
475		"run_for": "10s",
476	}
477	alloc1 := mock.Alloc()
478	alloc1.JobID = job.ID
479	alloc1.Job = job
480	alloc1.NodeID = c1.Node().ID
481	alloc2 := mock.Alloc()
482	alloc2.NodeID = c1.Node().ID
483	alloc2.JobID = job.ID
484	alloc2.Job = job
485
486	state := s1.State()
487	if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil {
488		t.Fatal(err)
489	}
490	if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
491		t.Fatal(err)
492	}
493	err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2})
494	if err != nil {
495		t.Fatalf("err: %v", err)
496	}
497
498	// Both allocations should get registered
499	testutil.WaitForResult(func() (bool, error) {
500		c1.allocLock.RLock()
501		num := len(c1.allocs)
502		c1.allocLock.RUnlock()
503		return num == 2, nil
504	}, func(err error) {
505		t.Fatalf("err: %v", err)
506	})
507
508	// Delete one allocation
509	if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil {
510		t.Fatalf("err: %v", err)
511	}
512
513	// Update the other allocation. Have to make a copy because the allocs are
514	// shared in memory in the test and the modify index would be updated in the
515	// alloc runner.
516	alloc2_2 := alloc2.Copy()
517	alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop
518	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 104, []*structs.Allocation{alloc2_2}); err != nil {
519		t.Fatalf("err upserting stopped alloc: %v", err)
520	}
521
522	// One allocation should get GC'd and removed
523	testutil.WaitForResult(func() (bool, error) {
524		c1.allocLock.RLock()
525		num := len(c1.allocs)
526		c1.allocLock.RUnlock()
527		return num == 1, nil
528	}, func(err error) {
529		t.Fatalf("err: %v", err)
530	})
531
532	// One allocations should get updated
533	testutil.WaitForResult(func() (bool, error) {
534		c1.allocLock.RLock()
535		ar := c1.allocs[alloc2.ID]
536		c1.allocLock.RUnlock()
537		return ar.Alloc().DesiredStatus == structs.AllocDesiredStatusStop, nil
538	}, func(err error) {
539		t.Fatalf("err: %v", err)
540	})
541}
542
543func waitTilNodeReady(client *Client, t *testing.T) {
544	testutil.WaitForResult(func() (bool, error) {
545		n := client.Node()
546		if n.Status != structs.NodeStatusReady {
547			return false, fmt.Errorf("node not registered")
548		}
549		return true, nil
550	}, func(err error) {
551		t.Fatalf("err: %v", err)
552	})
553}
554
555func TestClient_SaveRestoreState(t *testing.T) {
556	t.Parallel()
557
558	s1, _, cleanupS1 := testServer(t, nil)
559	defer cleanupS1()
560	testutil.WaitForLeader(t, s1.RPC)
561
562	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
563		c.DevMode = false
564		c.RPCHandler = s1
565	})
566	defer cleanupC1()
567
568	// Wait until the node is ready
569	waitTilNodeReady(c1, t)
570
571	// Create mock allocations
572	job := mock.Job()
573	alloc1 := mock.Alloc()
574	alloc1.NodeID = c1.Node().ID
575	alloc1.Job = job
576	alloc1.JobID = job.ID
577	alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
578	alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
579		"run_for": "10s",
580	}
581	alloc1.ClientStatus = structs.AllocClientStatusRunning
582
583	state := s1.State()
584	if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job); err != nil {
585		t.Fatal(err)
586	}
587	if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil {
588		t.Fatal(err)
589	}
590	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}); err != nil {
591		t.Fatalf("err: %v", err)
592	}
593
594	// Allocations should get registered
595	testutil.WaitForResult(func() (bool, error) {
596		c1.allocLock.RLock()
597		ar := c1.allocs[alloc1.ID]
598		c1.allocLock.RUnlock()
599		if ar == nil {
600			return false, fmt.Errorf("nil alloc runner")
601		}
602		if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
603			return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning)
604		}
605		return true, nil
606	}, func(err error) {
607		t.Fatalf("err: %v", err)
608	})
609
610	// Shutdown the client, saves state
611	if err := c1.Shutdown(); err != nil {
612		t.Fatalf("err: %v", err)
613	}
614
615	// Create a new client
616	logger := testlog.HCLogger(t)
617	c1.config.Logger = logger
618	consulCatalog := consul.NewMockCatalog(logger)
619	mockService := consulApi.NewMockConsulServiceClient(t, logger)
620
621	// ensure we use non-shutdown driver instances
622	c1.config.PluginLoader = catalog.TestPluginLoaderWithOptions(t, "", c1.config.Options, nil)
623	c1.config.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader)
624
625	c2, err := NewClient(c1.config, consulCatalog, nil, mockService, nil)
626	if err != nil {
627		t.Fatalf("err: %v", err)
628	}
629	defer c2.Shutdown()
630
631	// Ensure the allocation is running
632	testutil.WaitForResult(func() (bool, error) {
633		c2.allocLock.RLock()
634		ar := c2.allocs[alloc1.ID]
635		c2.allocLock.RUnlock()
636		status := ar.Alloc().ClientStatus
637		alive := status == structs.AllocClientStatusRunning || status == structs.AllocClientStatusPending
638		if !alive {
639			return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
640		}
641		return true, nil
642	}, func(err error) {
643		t.Fatalf("err: %v", err)
644	})
645
646	// Destroy all the allocations
647	for _, ar := range c2.getAllocRunners() {
648		ar.Destroy()
649	}
650
651	for _, ar := range c2.getAllocRunners() {
652		<-ar.DestroyCh()
653	}
654}
655
656func TestClient_AddAllocError(t *testing.T) {
657	t.Parallel()
658	require := require.New(t)
659
660	s1, _, cleanupS1 := testServer(t, nil)
661	defer cleanupS1()
662	testutil.WaitForLeader(t, s1.RPC)
663
664	c1, cleanupC1 := TestClient(t, func(c *config.Config) {
665		c.DevMode = false
666		c.RPCHandler = s1
667	})
668	defer cleanupC1()
669
670	// Wait until the node is ready
671	waitTilNodeReady(c1, t)
672
673	// Create mock allocation with invalid task group name
674	job := mock.Job()
675	alloc1 := mock.Alloc()
676	alloc1.NodeID = c1.Node().ID
677	alloc1.Job = job
678	alloc1.JobID = job.ID
679	alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
680	alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
681		"run_for": "10s",
682	}
683	alloc1.ClientStatus = structs.AllocClientStatusPending
684
685	// Set these two fields to nil to cause alloc runner creation to fail
686	alloc1.AllocatedResources = nil
687	alloc1.TaskResources = nil
688
689	state := s1.State()
690	err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
691	require.Nil(err)
692
693	err = state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID))
694	require.Nil(err)
695
696	err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1})
697	require.Nil(err)
698
699	// Push this alloc update to the client
700	allocUpdates := &allocUpdates{
701		pulled: map[string]*structs.Allocation{
702			alloc1.ID: alloc1,
703		},
704	}
705	c1.runAllocs(allocUpdates)
706
707	// Ensure the allocation has been marked as invalid and failed on the server
708	testutil.WaitForResult(func() (bool, error) {
709		c1.allocLock.RLock()
710		ar := c1.allocs[alloc1.ID]
711		_, isInvalid := c1.invalidAllocs[alloc1.ID]
712		c1.allocLock.RUnlock()
713		if ar != nil {
714			return false, fmt.Errorf("expected nil alloc runner")
715		}
716		if !isInvalid {
717			return false, fmt.Errorf("expected alloc to be marked as invalid")
718		}
719		alloc, err := s1.State().AllocByID(nil, alloc1.ID)
720		require.Nil(err)
721		failed := alloc.ClientStatus == structs.AllocClientStatusFailed
722		if !failed {
723			return false, fmt.Errorf("Expected failed client status, but got %v", alloc.ClientStatus)
724		}
725		return true, nil
726	}, func(err error) {
727		require.NoError(err)
728	})
729
730}
731
732func TestClient_Init(t *testing.T) {
733	t.Parallel()
734	dir, err := ioutil.TempDir("", "nomad")
735	if err != nil {
736		t.Fatalf("err: %s", err)
737	}
738	defer os.RemoveAll(dir)
739	allocDir := filepath.Join(dir, "alloc")
740
741	client := &Client{
742		config: &config.Config{
743			AllocDir:       allocDir,
744			StateDBFactory: cstate.GetStateDBFactory(true),
745		},
746		logger: testlog.HCLogger(t),
747	}
748
749	if err := client.init(); err != nil {
750		t.Fatalf("err: %s", err)
751	}
752
753	if _, err := os.Stat(allocDir); err != nil {
754		t.Fatalf("err: %s", err)
755	}
756}
757
758func TestClient_BlockedAllocations(t *testing.T) {
759	t.Parallel()
760
761	s1, _, cleanupS1 := testServer(t, nil)
762	defer cleanupS1()
763	testutil.WaitForLeader(t, s1.RPC)
764
765	c1, cleanup := TestClient(t, func(c *config.Config) {
766		c.RPCHandler = s1
767	})
768	defer cleanup()
769
770	// Wait for the node to be ready
771	state := s1.State()
772	testutil.WaitForResult(func() (bool, error) {
773		ws := memdb.NewWatchSet()
774		out, err := state.NodeByID(ws, c1.Node().ID)
775		if err != nil {
776			return false, err
777		}
778		if out == nil || out.Status != structs.NodeStatusReady {
779			return false, fmt.Errorf("bad node: %#v", out)
780		}
781		return true, nil
782	}, func(err error) {
783		t.Fatalf("err: %v", err)
784	})
785
786	// Add an allocation
787	alloc := mock.Alloc()
788	alloc.NodeID = c1.Node().ID
789	alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
790	alloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
791		"kill_after":  "1s",
792		"run_for":     "100s",
793		"exit_code":   0,
794		"exit_signal": 0,
795	}
796
797	state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
798	state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
799
800	// Wait until the client downloads and starts the allocation
801	testutil.WaitForResult(func() (bool, error) {
802		ws := memdb.NewWatchSet()
803		out, err := state.AllocByID(ws, alloc.ID)
804		if err != nil {
805			return false, err
806		}
807		if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
808			return false, fmt.Errorf("bad alloc: %#v", out)
809		}
810		return true, nil
811	}, func(err error) {
812		t.Fatalf("err: %v", err)
813	})
814
815	// Add a new chained alloc
816	alloc2 := alloc.Copy()
817	alloc2.ID = uuid.Generate()
818	alloc2.Job = alloc.Job
819	alloc2.JobID = alloc.JobID
820	alloc2.PreviousAllocation = alloc.ID
821	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 200, []*structs.Allocation{alloc2}); err != nil {
822		t.Fatalf("err: %v", err)
823	}
824
825	// Ensure that the chained allocation is being tracked as blocked
826	testutil.WaitForResult(func() (bool, error) {
827		ar := c1.getAllocRunners()[alloc2.ID]
828		if ar == nil {
829			return false, fmt.Errorf("alloc 2's alloc runner does not exist")
830		}
831		if !ar.IsWaiting() {
832			return false, fmt.Errorf("alloc 2 is not blocked")
833		}
834		return true, nil
835	}, func(err error) {
836		t.Fatalf("err: %v", err)
837	})
838
839	// Change the desired state of the parent alloc to stop
840	alloc1 := alloc.Copy()
841	alloc1.DesiredStatus = structs.AllocDesiredStatusStop
842	if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 300, []*structs.Allocation{alloc1}); err != nil {
843		t.Fatalf("err: %v", err)
844	}
845
846	// Ensure that there are no blocked allocations
847	testutil.WaitForResult(func() (bool, error) {
848		for id, ar := range c1.getAllocRunners() {
849			if ar.IsWaiting() {
850				return false, fmt.Errorf("%q still blocked", id)
851			}
852			if ar.IsMigrating() {
853				return false, fmt.Errorf("%q still migrating", id)
854			}
855		}
856		return true, nil
857	}, func(err error) {
858		t.Fatalf("err: %v", err)
859	})
860
861	// Destroy all the allocations
862	for _, ar := range c1.getAllocRunners() {
863		ar.Destroy()
864	}
865
866	for _, ar := range c1.getAllocRunners() {
867		<-ar.DestroyCh()
868	}
869}
870
871func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) {
872	t.Parallel()
873	assert := assert.New(t)
874
875	c, cleanup := TestClient(t, func(c *config.Config) {
876		c.ACLEnabled = true
877	})
878	defer cleanup()
879
880	alloc := mock.Alloc()
881	validToken, err := structs.GenerateMigrateToken(alloc.ID, c.secretNodeID())
882	assert.Nil(err)
883
884	assert.Equal(c.ValidateMigrateToken(alloc.ID, validToken), true)
885}
886
887func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) {
888	t.Parallel()
889	assert := assert.New(t)
890
891	c, cleanup := TestClient(t, func(c *config.Config) {
892		c.ACLEnabled = true
893	})
894	defer cleanup()
895
896	assert.Equal(c.ValidateMigrateToken("", ""), false)
897
898	alloc := mock.Alloc()
899	assert.Equal(c.ValidateMigrateToken(alloc.ID, alloc.ID), false)
900	assert.Equal(c.ValidateMigrateToken(alloc.ID, ""), false)
901}
902
903func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) {
904	t.Parallel()
905	assert := assert.New(t)
906
907	c, cleanup := TestClient(t, func(c *config.Config) {})
908	defer cleanup()
909
910	assert.Equal(c.ValidateMigrateToken("", ""), true)
911}
912
913func TestClient_ReloadTLS_UpgradePlaintextToTLS(t *testing.T) {
914	t.Parallel()
915	assert := assert.New(t)
916
917	s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) {
918		c.Region = "global"
919	})
920	defer cleanupS1()
921	testutil.WaitForLeader(t, s1.RPC)
922
923	const (
924		cafile  = "../helper/tlsutil/testdata/ca.pem"
925		foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
926		fookey  = "../helper/tlsutil/testdata/nomad-foo-key.pem"
927	)
928
929	c1, cleanup := TestClient(t, func(c *config.Config) {
930		c.Servers = []string{addr}
931	})
932	defer cleanup()
933
934	// Registering a node over plaintext should succeed
935	{
936		req := structs.NodeSpecificRequest{
937			NodeID:       c1.Node().ID,
938			QueryOptions: structs.QueryOptions{Region: "global"},
939		}
940
941		testutil.WaitForResult(func() (bool, error) {
942			var out structs.SingleNodeResponse
943			err := c1.RPC("Node.GetNode", &req, &out)
944			if err != nil {
945				return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err)
946			}
947			return true, nil
948		},
949			func(err error) {
950				t.Fatalf(err.Error())
951			},
952		)
953	}
954
955	newConfig := &nconfig.TLSConfig{
956		EnableHTTP:           true,
957		EnableRPC:            true,
958		VerifyServerHostname: true,
959		CAFile:               cafile,
960		CertFile:             foocert,
961		KeyFile:              fookey,
962	}
963
964	err := c1.reloadTLSConnections(newConfig)
965	assert.Nil(err)
966
967	// Registering a node over plaintext should fail after the node has upgraded
968	// to TLS
969	{
970		req := structs.NodeSpecificRequest{
971			NodeID:       c1.Node().ID,
972			QueryOptions: structs.QueryOptions{Region: "global"},
973		}
974		testutil.WaitForResult(func() (bool, error) {
975			var out structs.SingleNodeResponse
976			err := c1.RPC("Node.GetNode", &req, &out)
977			if err == nil {
978				return false, fmt.Errorf("client RPC succeeded when it should have failed:\n%+v", err)
979			}
980			return true, nil
981		},
982			func(err error) {
983				t.Fatalf(err.Error())
984			},
985		)
986	}
987}
988
989func TestClient_ReloadTLS_DowngradeTLSToPlaintext(t *testing.T) {
990	t.Parallel()
991	assert := assert.New(t)
992
993	s1, addr, cleanupS1 := testServer(t, func(c *nomad.Config) {
994		c.Region = "global"
995	})
996	defer cleanupS1()
997	testutil.WaitForLeader(t, s1.RPC)
998
999	const (
1000		cafile  = "../helper/tlsutil/testdata/ca.pem"
1001		foocert = "../helper/tlsutil/testdata/nomad-foo.pem"
1002		fookey  = "../helper/tlsutil/testdata/nomad-foo-key.pem"
1003	)
1004
1005	c1, cleanup := TestClient(t, func(c *config.Config) {
1006		c.Servers = []string{addr}
1007		c.TLSConfig = &nconfig.TLSConfig{
1008			EnableHTTP:           true,
1009			EnableRPC:            true,
1010			VerifyServerHostname: true,
1011			CAFile:               cafile,
1012			CertFile:             foocert,
1013			KeyFile:              fookey,
1014		}
1015	})
1016	defer cleanup()
1017
1018	// assert that when one node is running in encrypted mode, a RPC request to a
1019	// node running in plaintext mode should fail
1020	{
1021		req := structs.NodeSpecificRequest{
1022			NodeID:       c1.Node().ID,
1023			QueryOptions: structs.QueryOptions{Region: "global"},
1024		}
1025		testutil.WaitForResult(func() (bool, error) {
1026			var out structs.SingleNodeResponse
1027			err := c1.RPC("Node.GetNode", &req, &out)
1028			if err == nil {
1029				return false, fmt.Errorf("client RPC succeeded when it should have failed :\n%+v", err)
1030			}
1031			return true, nil
1032		}, func(err error) {
1033			t.Fatalf(err.Error())
1034		},
1035		)
1036	}
1037
1038	newConfig := &nconfig.TLSConfig{}
1039
1040	err := c1.reloadTLSConnections(newConfig)
1041	assert.Nil(err)
1042
1043	// assert that when both nodes are in plaintext mode, a RPC request should
1044	// succeed
1045	{
1046		req := structs.NodeSpecificRequest{
1047			NodeID:       c1.Node().ID,
1048			QueryOptions: structs.QueryOptions{Region: "global"},
1049		}
1050		testutil.WaitForResult(func() (bool, error) {
1051			var out structs.SingleNodeResponse
1052			err := c1.RPC("Node.GetNode", &req, &out)
1053			if err != nil {
1054				return false, fmt.Errorf("client RPC failed when it should have succeeded:\n%+v", err)
1055			}
1056			return true, nil
1057		}, func(err error) {
1058			t.Fatalf(err.Error())
1059		},
1060		)
1061	}
1062}
1063
1064// TestClient_ServerList tests client methods that interact with the internal
1065// nomad server list.
1066func TestClient_ServerList(t *testing.T) {
1067	t.Parallel()
1068	client, cleanup := TestClient(t, func(c *config.Config) {})
1069	defer cleanup()
1070
1071	if s := client.GetServers(); len(s) != 0 {
1072		t.Fatalf("expected server lit to be empty but found: %+q", s)
1073	}
1074	if _, err := client.SetServers(nil); err != noServersErr {
1075		t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
1076	}
1077	if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
1078		t.Fatalf("expected setting a bad server to return an error")
1079	}
1080	if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
1081		t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
1082	}
1083	s := client.GetServers()
1084	if len(s) != 0 {
1085		t.Fatalf("expected 2 servers but received: %+q", s)
1086	}
1087}
1088
1089func TestClient_UpdateNodeFromDevicesAccumulates(t *testing.T) {
1090	t.Parallel()
1091	client, cleanup := TestClient(t, func(c *config.Config) {})
1092	defer cleanup()
1093
1094	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1095		NodeResources: &structs.NodeResources{
1096			Cpu: structs.NodeCpuResources{CpuShares: 123},
1097		},
1098	})
1099
1100	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1101		NodeResources: &structs.NodeResources{
1102			Memory: structs.NodeMemoryResources{MemoryMB: 1024},
1103		},
1104	})
1105
1106	client.updateNodeFromDevices([]*structs.NodeDeviceResource{
1107		{
1108			Vendor: "vendor",
1109			Type:   "type",
1110		},
1111	})
1112
1113	// initial check
1114	expectedResources := &structs.NodeResources{
1115		// computed through test client initialization
1116		Networks:     client.configCopy.Node.NodeResources.Networks,
1117		NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
1118		Disk:         client.configCopy.Node.NodeResources.Disk,
1119
1120		// injected
1121		Cpu: structs.NodeCpuResources{
1122			CpuShares:          123,
1123			ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores,
1124			TotalCpuCores:      client.configCopy.Node.NodeResources.Cpu.TotalCpuCores,
1125		},
1126		Memory: structs.NodeMemoryResources{MemoryMB: 1024},
1127		Devices: []*structs.NodeDeviceResource{
1128			{
1129				Vendor: "vendor",
1130				Type:   "type",
1131			},
1132		},
1133	}
1134
1135	assert.EqualValues(t, expectedResources, client.configCopy.Node.NodeResources)
1136
1137	// overrides of values
1138
1139	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1140		NodeResources: &structs.NodeResources{
1141			Memory: structs.NodeMemoryResources{MemoryMB: 2048},
1142		},
1143	})
1144
1145	client.updateNodeFromDevices([]*structs.NodeDeviceResource{
1146		{
1147			Vendor: "vendor",
1148			Type:   "type",
1149		},
1150		{
1151			Vendor: "vendor2",
1152			Type:   "type2",
1153		},
1154	})
1155
1156	expectedResources2 := &structs.NodeResources{
1157		// computed through test client initialization
1158		Networks:     client.configCopy.Node.NodeResources.Networks,
1159		NodeNetworks: client.configCopy.Node.NodeResources.NodeNetworks,
1160		Disk:         client.configCopy.Node.NodeResources.Disk,
1161
1162		// injected
1163		Cpu: structs.NodeCpuResources{
1164			CpuShares:          123,
1165			ReservableCpuCores: client.configCopy.Node.NodeResources.Cpu.ReservableCpuCores,
1166			TotalCpuCores:      client.configCopy.Node.NodeResources.Cpu.TotalCpuCores,
1167		},
1168		Memory: structs.NodeMemoryResources{MemoryMB: 2048},
1169		Devices: []*structs.NodeDeviceResource{
1170			{
1171				Vendor: "vendor",
1172				Type:   "type",
1173			},
1174			{
1175				Vendor: "vendor2",
1176				Type:   "type2",
1177			},
1178		},
1179	}
1180
1181	assert.EqualValues(t, expectedResources2, client.configCopy.Node.NodeResources)
1182
1183}
1184
1185// TestClient_UpdateNodeFromFingerprintKeepsConfig asserts manually configured
1186// network interfaces take precedence over fingerprinted ones.
1187func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) {
1188	t.Parallel()
1189	if runtime.GOOS != "linux" {
1190		t.Skip("assertions assume linux platform")
1191	}
1192
1193	// Client without network configured updates to match fingerprint
1194	client, cleanup := TestClient(t, nil)
1195	defer cleanup()
1196
1197	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1198		NodeResources: &structs.NodeResources{
1199			Cpu:      structs.NodeCpuResources{CpuShares: 123},
1200			Networks: []*structs.NetworkResource{{Mode: "host", Device: "any-interface"}},
1201		},
1202		Resources: &structs.Resources{
1203			CPU: 80,
1204		},
1205	})
1206	idx := len(client.config.Node.NodeResources.Networks) - 1
1207	require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
1208	require.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[idx].Device)
1209	require.Equal(t, 80, client.config.Node.Resources.CPU)
1210
1211	// lookup an interface. client.Node starts with a hardcoded value, eth0,
1212	// and is only updated async through fingerprinter.
1213	// Let's just lookup network device; anyone will do for this test
1214	interfaces, err := net.Interfaces()
1215	require.NoError(t, err)
1216	require.NotEmpty(t, interfaces)
1217	dev := interfaces[0].Name
1218
1219	// Client with network interface configured keeps the config
1220	// setting on update
1221	name := "TestClient_UpdateNodeFromFingerprintKeepsConfig2"
1222	client, cleanup = TestClient(t, func(c *config.Config) {
1223		c.NetworkInterface = dev
1224		c.Node.Name = name
1225		c.Options["fingerprint.denylist"] = "network"
1226		// Node is already a mock.Node, with a device
1227		c.Node.NodeResources.Networks[0].Device = dev
1228	})
1229	defer cleanup()
1230	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1231		NodeResources: &structs.NodeResources{
1232			Cpu: structs.NodeCpuResources{CpuShares: 123},
1233			Networks: []*structs.NetworkResource{
1234				{Mode: "host", Device: "any-interface", MBits: 20},
1235			},
1236		},
1237	})
1238	require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares)
1239	// only the configured device is kept
1240	require.Equal(t, 2, len(client.config.Node.NodeResources.Networks))
1241	require.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device)
1242	require.Equal(t, "bridge", client.config.Node.NodeResources.Networks[1].Mode)
1243
1244	// Network speed is applied to all NetworkResources
1245	client.config.NetworkInterface = ""
1246	client.config.NetworkSpeed = 100
1247	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1248		NodeResources: &structs.NodeResources{
1249			Cpu: structs.NodeCpuResources{CpuShares: 123},
1250			Networks: []*structs.NetworkResource{
1251				{Mode: "host", Device: "any-interface", MBits: 20},
1252			},
1253		},
1254		Resources: &structs.Resources{
1255			CPU: 80,
1256		},
1257	})
1258	assert.Equal(t, 3, len(client.config.Node.NodeResources.Networks))
1259	assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[2].Device)
1260	assert.Equal(t, 100, client.config.Node.NodeResources.Networks[2].MBits)
1261	assert.Equal(t, 0, client.config.Node.NodeResources.Networks[1].MBits)
1262}
1263
1264// Support multiple IP addresses (ipv4 vs. 6, e.g.) on the configured network interface
1265func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) {
1266	t.Parallel()
1267
1268	var dev string
1269	switch runtime.GOOS {
1270	case "linux":
1271		dev = "lo"
1272	case "darwin":
1273		dev = "lo0"
1274	}
1275
1276	// Client without network configured updates to match fingerprint
1277	client, cleanup := TestClient(t, func(c *config.Config) {
1278		c.NetworkInterface = dev
1279		c.Options["fingerprint.denylist"] = "network,cni,bridge"
1280		c.Node.Resources.Networks = c.Node.NodeResources.Networks
1281	})
1282	defer cleanup()
1283
1284	client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{
1285		NodeResources: &structs.NodeResources{
1286			Cpu: structs.NodeCpuResources{CpuShares: 123},
1287			Networks: []*structs.NetworkResource{
1288				{Device: dev, IP: "127.0.0.1"},
1289				{Device: dev, IP: "::1"},
1290			},
1291		},
1292	})
1293
1294	nets := structs.Networks{
1295		mock.Node().NodeResources.Networks[0],
1296		{Device: dev, IP: "127.0.0.1"},
1297		{Device: dev, IP: "::1"},
1298	}
1299
1300	require.Equal(t, nets, client.config.Node.NodeResources.Networks)
1301}
1302
1303func TestClient_computeAllocatedDeviceStats(t *testing.T) {
1304	logger := testlog.HCLogger(t)
1305	c := &Client{logger: logger}
1306
1307	newDeviceStats := func(strValue string) *device.DeviceStats {
1308		return &device.DeviceStats{
1309			Summary: &psstructs.StatValue{
1310				StringVal: &strValue,
1311			},
1312		}
1313	}
1314
1315	allocatedDevices := []*structs.AllocatedDeviceResource{
1316		{
1317			Vendor:    "vendor",
1318			Type:      "type",
1319			Name:      "name",
1320			DeviceIDs: []string{"d2", "d3", "notfoundid"},
1321		},
1322		{
1323			Vendor:    "vendor2",
1324			Type:      "type2",
1325			Name:      "name2",
1326			DeviceIDs: []string{"a2"},
1327		},
1328		{
1329			Vendor:    "vendor_notfound",
1330			Type:      "type_notfound",
1331			Name:      "name_notfound",
1332			DeviceIDs: []string{"d3"},
1333		},
1334	}
1335
1336	hostDeviceGroupStats := []*device.DeviceGroupStats{
1337		{
1338			Vendor: "vendor",
1339			Type:   "type",
1340			Name:   "name",
1341			InstanceStats: map[string]*device.DeviceStats{
1342				"unallocated": newDeviceStats("unallocated"),
1343				"d2":          newDeviceStats("d2"),
1344				"d3":          newDeviceStats("d3"),
1345			},
1346		},
1347		{
1348			Vendor: "vendor2",
1349			Type:   "type2",
1350			Name:   "name2",
1351			InstanceStats: map[string]*device.DeviceStats{
1352				"a2": newDeviceStats("a2"),
1353			},
1354		},
1355		{
1356			Vendor: "vendor_unused",
1357			Type:   "type_unused",
1358			Name:   "name_unused",
1359			InstanceStats: map[string]*device.DeviceStats{
1360				"unallocated_unused": newDeviceStats("unallocated_unused"),
1361			},
1362		},
1363	}
1364
1365	// test some edge conditions
1366	assert.Empty(t, c.computeAllocatedDeviceGroupStats(nil, nil))
1367	assert.Empty(t, c.computeAllocatedDeviceGroupStats(nil, hostDeviceGroupStats))
1368	assert.Empty(t, c.computeAllocatedDeviceGroupStats(allocatedDevices, nil))
1369
1370	// actual test
1371	result := c.computeAllocatedDeviceGroupStats(allocatedDevices, hostDeviceGroupStats)
1372	sort.Slice(result, func(i, j int) bool {
1373		return result[i].Vendor < result[j].Vendor
1374	})
1375
1376	expected := []*device.DeviceGroupStats{
1377		{
1378			Vendor: "vendor",
1379			Type:   "type",
1380			Name:   "name",
1381			InstanceStats: map[string]*device.DeviceStats{
1382				"d2": newDeviceStats("d2"),
1383				"d3": newDeviceStats("d3"),
1384			},
1385		},
1386		{
1387			Vendor: "vendor2",
1388			Type:   "type2",
1389			Name:   "name2",
1390			InstanceStats: map[string]*device.DeviceStats{
1391				"a2": newDeviceStats("a2"),
1392			},
1393		},
1394	}
1395
1396	assert.EqualValues(t, expected, result)
1397}
1398
1399func TestClient_getAllocatedResources(t *testing.T) {
1400	t.Parallel()
1401	require := require.New(t)
1402	client, cleanup := TestClient(t, nil)
1403	defer cleanup()
1404
1405	allocStops := mock.BatchAlloc()
1406	allocStops.Job.TaskGroups[0].Count = 1
1407	allocStops.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
1408	allocStops.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1409		"run_for":   "1ms",
1410		"exit_code": "0",
1411	}
1412	allocStops.Job.TaskGroups[0].RestartPolicy.Attempts = 0
1413	allocStops.AllocatedResources.Shared.DiskMB = 64
1414	allocStops.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 64}
1415	allocStops.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 64}
1416	require.Nil(client.addAlloc(allocStops, ""))
1417
1418	allocFails := mock.BatchAlloc()
1419	allocFails.Job.TaskGroups[0].Count = 1
1420	allocFails.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
1421	allocFails.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1422		"run_for":   "1ms",
1423		"exit_code": "1",
1424	}
1425	allocFails.Job.TaskGroups[0].RestartPolicy.Attempts = 0
1426	allocFails.AllocatedResources.Shared.DiskMB = 128
1427	allocFails.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 128}
1428	allocFails.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 128}
1429	require.Nil(client.addAlloc(allocFails, ""))
1430
1431	allocRuns := mock.Alloc()
1432	allocRuns.Job.TaskGroups[0].Count = 1
1433	allocRuns.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
1434	allocRuns.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1435		"run_for": "3s",
1436	}
1437	allocRuns.AllocatedResources.Shared.DiskMB = 256
1438	allocRuns.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 256}
1439	allocRuns.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 256}
1440	require.Nil(client.addAlloc(allocRuns, ""))
1441
1442	allocPends := mock.Alloc()
1443	allocPends.Job.TaskGroups[0].Count = 1
1444	allocPends.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
1445	allocPends.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
1446		"run_for":         "5s",
1447		"start_block_for": "10s",
1448	}
1449	allocPends.AllocatedResources.Shared.DiskMB = 512
1450	allocPends.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 512}
1451	allocPends.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 512}
1452	require.Nil(client.addAlloc(allocPends, ""))
1453
1454	// wait for allocStops to stop running and for allocRuns to be pending/running
1455	testutil.WaitForResult(func() (bool, error) {
1456		as, err := client.GetAllocState(allocPends.ID)
1457		if err != nil {
1458			return false, err
1459		} else if as.ClientStatus != structs.AllocClientStatusPending {
1460			return false, fmt.Errorf("allocPends not yet pending: %#v", as)
1461		}
1462
1463		as, err = client.GetAllocState(allocRuns.ID)
1464		if as.ClientStatus != structs.AllocClientStatusRunning {
1465			return false, fmt.Errorf("allocRuns not yet running: %#v", as)
1466		} else if err != nil {
1467			return false, err
1468		}
1469
1470		as, err = client.GetAllocState(allocStops.ID)
1471		if err != nil {
1472			return false, err
1473		} else if as.ClientStatus != structs.AllocClientStatusComplete {
1474			return false, fmt.Errorf("allocStops not yet complete: %#v", as)
1475		}
1476
1477		as, err = client.GetAllocState(allocFails.ID)
1478		if err != nil {
1479			return false, err
1480		} else if as.ClientStatus != structs.AllocClientStatusFailed {
1481			return false, fmt.Errorf("allocFails not yet failed: %#v", as)
1482		}
1483
1484		return true, nil
1485	}, func(err error) {
1486		require.NoError(err)
1487	})
1488
1489	result := client.getAllocatedResources(client.config.Node)
1490
1491	// Ignore comparing networks for now
1492	result.Flattened.Networks = nil
1493
1494	expected := structs.ComparableResources{
1495		Flattened: structs.AllocatedTaskResources{
1496			Cpu: structs.AllocatedCpuResources{
1497				CpuShares:     768,
1498				ReservedCores: []uint16{},
1499			},
1500			Memory: structs.AllocatedMemoryResources{
1501				MemoryMB:    768,
1502				MemoryMaxMB: 768,
1503			},
1504			Networks: nil,
1505		},
1506		Shared: structs.AllocatedSharedResources{
1507			DiskMB: 768,
1508		},
1509	}
1510
1511	assert.EqualValues(t, expected, *result)
1512}
1513
1514func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
1515	t.Parallel()
1516	client, cleanup := TestClient(t, nil)
1517	defer cleanup()
1518
1519	// initial update
1520	{
1521		info := &structs.DriverInfo{
1522			Detected:          true,
1523			Healthy:           false,
1524			HealthDescription: "not healthy at start",
1525			Attributes: map[string]string{
1526				"node.mock.testattr1": "val1",
1527			},
1528		}
1529		client.updateNodeFromDriver("mock", info)
1530		n := client.config.Node
1531
1532		updatedInfo := *n.Drivers["mock"]
1533		// compare without update time
1534		updatedInfo.UpdateTime = info.UpdateTime
1535		assert.EqualValues(t, updatedInfo, *info)
1536
1537		// check node attributes
1538		assert.Equal(t, "val1", n.Attributes["node.mock.testattr1"])
1539	}
1540
1541	// initial update
1542	{
1543		info := &structs.DriverInfo{
1544			Detected:          true,
1545			Healthy:           true,
1546			HealthDescription: "healthy",
1547			Attributes: map[string]string{
1548				"node.mock.testattr1": "val2",
1549			},
1550		}
1551		client.updateNodeFromDriver("mock", info)
1552		n := client.Node()
1553
1554		updatedInfo := *n.Drivers["mock"]
1555		// compare without update time
1556		updatedInfo.UpdateTime = info.UpdateTime
1557		assert.EqualValues(t, updatedInfo, *info)
1558
1559		// check node attributes are updated
1560		assert.Equal(t, "val2", n.Attributes["node.mock.testattr1"])
1561
1562		// update once more with the same info, updateTime shouldn't change
1563		client.updateNodeFromDriver("mock", info)
1564		un := client.Node()
1565		assert.EqualValues(t, n, un)
1566	}
1567
1568	// update once more to unhealthy because why not
1569	{
1570		info := &structs.DriverInfo{
1571			Detected:          true,
1572			Healthy:           false,
1573			HealthDescription: "lost track",
1574			Attributes: map[string]string{
1575				"node.mock.testattr1": "",
1576			},
1577		}
1578		client.updateNodeFromDriver("mock", info)
1579		n := client.Node()
1580
1581		updatedInfo := *n.Drivers["mock"]
1582		// compare without update time
1583		updatedInfo.UpdateTime = info.UpdateTime
1584		assert.EqualValues(t, updatedInfo, *info)
1585
1586		// check node attributes are updated
1587		assert.Equal(t, "", n.Attributes["node.mock.testattr1"])
1588
1589		// update once more with the same info, updateTime shouldn't change
1590		client.updateNodeFromDriver("mock", info)
1591		un := client.Node()
1592		assert.EqualValues(t, n, un)
1593	}
1594}
1595
1596// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
1597func TestClient_hasLocalState(t *testing.T) {
1598	t.Parallel()
1599
1600	c, cleanup := TestClient(t, nil)
1601	defer cleanup()
1602
1603	c.stateDB = state.NewMemDB(c.logger)
1604
1605	t.Run("plain alloc", func(t *testing.T) {
1606		alloc := mock.BatchAlloc()
1607		c.stateDB.PutAllocation(alloc)
1608
1609		require.False(t, c.hasLocalState(alloc))
1610	})
1611
1612	t.Run("alloc with a task with local state", func(t *testing.T) {
1613		alloc := mock.BatchAlloc()
1614		taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
1615		ls := &trstate.LocalState{}
1616
1617		c.stateDB.PutAllocation(alloc)
1618		c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls)
1619
1620		require.True(t, c.hasLocalState(alloc))
1621	})
1622
1623	t.Run("alloc with a task with task state", func(t *testing.T) {
1624		alloc := mock.BatchAlloc()
1625		taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
1626		ts := &structs.TaskState{
1627			State: structs.TaskStateRunning,
1628		}
1629
1630		c.stateDB.PutAllocation(alloc)
1631		c.stateDB.PutTaskState(alloc.ID, taskName, ts)
1632
1633		require.True(t, c.hasLocalState(alloc))
1634	})
1635}
1636
1637func Test_verifiedTasks(t *testing.T) {
1638	t.Parallel()
1639	logger := testlog.HCLogger(t)
1640
1641	// produce a result and check against expected tasks and/or error output
1642	try := func(t *testing.T, a *structs.Allocation, tasks, expTasks []string, expErr string) {
1643		result, err := verifiedTasks(logger, a, tasks)
1644		if expErr != "" {
1645			require.EqualError(t, err, expErr)
1646		} else {
1647			require.NoError(t, err)
1648			require.Equal(t, expTasks, result)
1649		}
1650	}
1651
1652	// create an alloc with TaskGroup=g1, tasks configured given g1Tasks
1653	alloc := func(g1Tasks []string) *structs.Allocation {
1654		var tasks []*structs.Task
1655		for _, taskName := range g1Tasks {
1656			tasks = append(tasks, &structs.Task{Name: taskName})
1657		}
1658
1659		return &structs.Allocation{
1660			Job: &structs.Job{
1661				TaskGroups: []*structs.TaskGroup{
1662					{Name: "g0", Tasks: []*structs.Task{{Name: "g0t1"}}},
1663					{Name: "g1", Tasks: tasks},
1664				},
1665			},
1666			TaskGroup: "g1",
1667		}
1668	}
1669
1670	t.Run("nil alloc", func(t *testing.T) {
1671		tasks := []string{"g1t1"}
1672		try(t, nil, tasks, nil, "nil allocation")
1673	})
1674
1675	t.Run("missing task names", func(t *testing.T) {
1676		var tasks []string
1677		tgTasks := []string{"g1t1"}
1678		try(t, alloc(tgTasks), tasks, nil, "missing task names")
1679	})
1680
1681	t.Run("missing group", func(t *testing.T) {
1682		tasks := []string{"g1t1"}
1683		a := alloc(tasks)
1684		a.TaskGroup = "other"
1685		try(t, a, tasks, nil, "group name in allocation is not present in job")
1686	})
1687
1688	t.Run("nonexistent task", func(t *testing.T) {
1689		tasks := []string{"missing"}
1690		try(t, alloc([]string{"task1"}), tasks, nil, `task "missing" not found in allocation`)
1691	})
1692
1693	t.Run("matching task", func(t *testing.T) {
1694		tasks := []string{"g1t1"}
1695		try(t, alloc(tasks), tasks, tasks, "")
1696	})
1697
1698	t.Run("matching task subset", func(t *testing.T) {
1699		tasks := []string{"g1t1", "g1t3"}
1700		tgTasks := []string{"g1t1", "g1t2", "g1t3"}
1701		try(t, alloc(tgTasks), tasks, tasks, "")
1702	})
1703}
1704