1package client
2
3import (
4	"context"
5	"fmt"
6	"sync"
7	"time"
8
9	"github.com/hashicorp/nomad/client/devicemanager"
10	"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
11	"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
12	"github.com/hashicorp/nomad/nomad/structs"
13)
14
15var (
16	// batchFirstFingerprintsTimeout is the maximum amount of time to wait for
17	// initial fingerprinting to complete before sending a batched Node update
18	batchFirstFingerprintsTimeout = 50 * time.Second
19)
20
21// batchFirstFingerprints waits for the first fingerprint response from all
22// plugin managers and sends a single Node update for all fingerprints. It
23// should only ever be called once
24func (c *Client) batchFirstFingerprints() {
25	ctx, cancel := context.WithTimeout(context.Background(), batchFirstFingerprintsTimeout)
26	defer cancel()
27
28	ch, err := c.pluginManagers.WaitForFirstFingerprint(ctx)
29	if err != nil {
30		c.logger.Warn("failed to batch initial fingerprint updates, switching to incemental updates")
31		goto SEND_BATCH
32	}
33
34	// Wait for fingerprinting to complete or timeout before processing batches
35	select {
36	case <-ch:
37	case <-ctx.Done():
38	}
39
40SEND_BATCH:
41	c.configLock.Lock()
42	defer c.configLock.Unlock()
43
44	// csi updates
45	var csiChanged bool
46	c.batchNodeUpdates.batchCSIUpdates(func(name string, info *structs.CSIInfo) {
47		if c.updateNodeFromCSIControllerLocked(name, info) {
48			if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
49				c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
50			}
51			csiChanged = true
52		}
53		if c.updateNodeFromCSINodeLocked(name, info) {
54			if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() {
55				c.config.Node.CSINodePlugins[name].UpdateTime = time.Now()
56			}
57			csiChanged = true
58		}
59	})
60
61	// driver node updates
62	var driverChanged bool
63	c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) {
64		if c.updateNodeFromDriverLocked(driver, info) {
65			c.config.Node.Drivers[driver] = info
66			if c.config.Node.Drivers[driver].UpdateTime.IsZero() {
67				c.config.Node.Drivers[driver].UpdateTime = time.Now()
68			}
69			driverChanged = true
70		}
71	})
72
73	// device node updates
74	var devicesChanged bool
75	c.batchNodeUpdates.batchDevicesUpdates(func(devices []*structs.NodeDeviceResource) {
76		if c.updateNodeFromDevicesLocked(devices) {
77			devicesChanged = true
78		}
79	})
80
81	// only update the node if changes occurred
82	if driverChanged || devicesChanged || csiChanged {
83		c.updateNodeLocked()
84	}
85
86	close(c.fpInitialized)
87}
88
89// updateNodeFromCSI receives a CSIInfo struct for the plugin and updates the
90// node accordingly
91func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) {
92	c.configLock.Lock()
93	defer c.configLock.Unlock()
94
95	changed := false
96
97	if c.updateNodeFromCSIControllerLocked(name, info) {
98		if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() {
99			c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now()
100		}
101		changed = true
102	}
103
104	if c.updateNodeFromCSINodeLocked(name, info) {
105		if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() {
106			c.config.Node.CSINodePlugins[name].UpdateTime = time.Now()
107		}
108		changed = true
109	}
110
111	if changed {
112		c.updateNodeLocked()
113	}
114}
115
116// updateNodeFromCSIControllerLocked makes the changes to the node from a csi
117// update but does not send the update to the server. c.configLock must be held
118// before calling this func.
119//
120// It is safe to call for all CSI Updates, but will only perform changes when
121// a ControllerInfo field is present.
122func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CSIInfo) bool {
123	var changed bool
124	if info.ControllerInfo == nil {
125		return false
126	}
127	i := info.Copy()
128	i.NodeInfo = nil
129
130	oldController, hadController := c.config.Node.CSIControllerPlugins[name]
131	if !hadController {
132		// If the controller info has not yet been set, do that here
133		changed = true
134		c.config.Node.CSIControllerPlugins[name] = i
135	} else {
136		// The controller info has already been set, fix it up
137		if !oldController.Equal(i) {
138			c.config.Node.CSIControllerPlugins[name] = i
139			changed = true
140		}
141
142		// If health state has changed, trigger node event
143		if oldController.Healthy != i.Healthy || oldController.HealthDescription != i.HealthDescription {
144			changed = true
145			if i.HealthDescription != "" {
146				event := structs.NewNodeEvent().
147					SetSubsystem("CSI").
148					SetMessage(i.HealthDescription).
149					AddDetail("plugin", name).
150					AddDetail("type", "controller")
151				c.triggerNodeEvent(event)
152			}
153		}
154	}
155
156	return changed
157}
158
159// updateNodeFromCSINodeLocked makes the changes to the node from a csi
160// update but does not send the update to the server. c.configLock must be hel
161// before calling this func.
162//
163// It is safe to call for all CSI Updates, but will only perform changes when
164// a NodeInfo field is present.
165func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo) bool {
166	var changed bool
167	if info.NodeInfo == nil {
168		return false
169	}
170	i := info.Copy()
171	i.ControllerInfo = nil
172
173	oldNode, hadNode := c.config.Node.CSINodePlugins[name]
174	if !hadNode {
175		// If the Node info has not yet been set, do that here
176		changed = true
177		c.config.Node.CSINodePlugins[name] = i
178	} else {
179		// The node info has already been set, fix it up
180		if !oldNode.Equal(info) {
181			c.config.Node.CSINodePlugins[name] = i
182			changed = true
183		}
184
185		// If health state has changed, trigger node event
186		if oldNode.Healthy != i.Healthy || oldNode.HealthDescription != i.HealthDescription {
187			changed = true
188			if i.HealthDescription != "" {
189				event := structs.NewNodeEvent().
190					SetSubsystem("CSI").
191					SetMessage(i.HealthDescription).
192					AddDetail("plugin", name).
193					AddDetail("type", "node")
194				c.triggerNodeEvent(event)
195			}
196		}
197	}
198
199	return changed
200}
201
202// updateNodeFromDriver receives a DriverInfo struct for the driver and updates
203// the node accordingly
204func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) {
205	c.configLock.Lock()
206	defer c.configLock.Unlock()
207
208	if c.updateNodeFromDriverLocked(name, info) {
209		c.config.Node.Drivers[name] = info
210		if c.config.Node.Drivers[name].UpdateTime.IsZero() {
211			c.config.Node.Drivers[name].UpdateTime = time.Now()
212		}
213		c.updateNodeLocked()
214	}
215}
216
217// updateNodeFromDriverLocked makes the changes to the node from a driver update
218// but does not send the update to the server. c.configLock must be held before
219// calling this func
220func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool {
221	var hasChanged bool
222
223	hadDriver := c.config.Node.Drivers[name] != nil
224	if !hadDriver {
225		// If the driver info has not yet been set, do that here
226		hasChanged = true
227		for attrName, newVal := range info.Attributes {
228			c.config.Node.Attributes[attrName] = newVal
229		}
230	} else {
231		oldVal := c.config.Node.Drivers[name]
232		// The driver info has already been set, fix it up
233		if oldVal.Detected != info.Detected {
234			hasChanged = true
235		}
236
237		// If health state has change, trigger node event
238		if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription {
239			hasChanged = true
240			if info.HealthDescription != "" {
241				event := structs.NewNodeEvent().
242					SetSubsystem("Driver").
243					SetMessage(info.HealthDescription).
244					AddDetail("driver", name)
245				c.triggerNodeEvent(event)
246			}
247		}
248
249		for attrName, newVal := range info.Attributes {
250			oldVal := c.config.Node.Drivers[name].Attributes[attrName]
251			if oldVal == newVal {
252				continue
253			}
254
255			hasChanged = true
256			if newVal == "" {
257				delete(c.config.Node.Attributes, attrName)
258			} else {
259				c.config.Node.Attributes[attrName] = newVal
260			}
261		}
262	}
263
264	// COMPAT Remove in Nomad 0.10
265	// We maintain the driver enabled attribute until all drivers expose
266	// their attributes as DriverInfo
267	driverName := fmt.Sprintf("driver.%s", name)
268	if info.Detected {
269		c.config.Node.Attributes[driverName] = "1"
270	} else {
271		delete(c.config.Node.Attributes, driverName)
272	}
273
274	return hasChanged
275}
276
277// updateNodeFromFingerprint updates the node with the result of
278// fingerprinting the node from the diff that was created
279func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
280	c.configLock.Lock()
281	defer c.configLock.Unlock()
282
283	// Not updating node.Resources: the field is deprecated and includes
284	// dispatched task resources and not appropriate for expressing
285	// node available device resources
286	if c.updateNodeFromDevicesLocked(devices) {
287		c.updateNodeLocked()
288	}
289}
290
291// updateNodeFromDevicesLocked updates the node with the results of devices,
292// but does send the update to the server. c.configLock must be held before
293// calling this func
294func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool {
295	if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) {
296		c.logger.Debug("new devices detected", "devices", len(devices))
297		c.config.Node.NodeResources.Devices = devices
298		return true
299	}
300
301	return false
302}
303
304// batchNodeUpdates allows for batching multiple Node updates from fingerprinting.
305// Once ready, the batches can be flushed and toggled to stop batching and forward
306// all updates to a configured callback to be performed incrementally
307type batchNodeUpdates struct {
308	// access to driver fields must hold driversMu lock
309	drivers        map[string]*structs.DriverInfo
310	driversBatched bool
311	driverCB       drivermanager.UpdateNodeDriverInfoFn
312	driversMu      sync.Mutex
313
314	// access to devices fields must hold devicesMu lock
315	devices        []*structs.NodeDeviceResource
316	devicesBatched bool
317	devicesCB      devicemanager.UpdateNodeDevicesFn
318	devicesMu      sync.Mutex
319
320	// access to csi fields must hold csiMu lock
321	csiNodePlugins       map[string]*structs.CSIInfo
322	csiControllerPlugins map[string]*structs.CSIInfo
323	csiBatched           bool
324	csiCB                csimanager.UpdateNodeCSIInfoFunc
325	csiMu                sync.Mutex
326}
327
328func newBatchNodeUpdates(
329	driverCB drivermanager.UpdateNodeDriverInfoFn,
330	devicesCB devicemanager.UpdateNodeDevicesFn,
331	csiCB csimanager.UpdateNodeCSIInfoFunc) *batchNodeUpdates {
332
333	return &batchNodeUpdates{
334		drivers:              make(map[string]*structs.DriverInfo),
335		driverCB:             driverCB,
336		devices:              []*structs.NodeDeviceResource{},
337		devicesCB:            devicesCB,
338		csiNodePlugins:       make(map[string]*structs.CSIInfo),
339		csiControllerPlugins: make(map[string]*structs.CSIInfo),
340		csiCB:                csiCB,
341	}
342}
343
344// updateNodeFromCSI implements csimanager.UpdateNodeCSIInfoFunc and is used in
345// the csi manager to send csi fingerprints to the server.
346func (b *batchNodeUpdates) updateNodeFromCSI(plugin string, info *structs.CSIInfo) {
347	b.csiMu.Lock()
348	defer b.csiMu.Unlock()
349	if b.csiBatched {
350		b.csiCB(plugin, info)
351		return
352	}
353
354	// Only one of these is expected to be set, but a future implementation that
355	// explicitly models monolith plugins with a single fingerprinter may set both
356	if info.ControllerInfo != nil {
357		b.csiControllerPlugins[plugin] = info
358	}
359
360	if info.NodeInfo != nil {
361		b.csiNodePlugins[plugin] = info
362	}
363}
364
365// batchCSIUpdates sends all of the batched CSI updates by calling f  for each
366// plugin batched
367func (b *batchNodeUpdates) batchCSIUpdates(f csimanager.UpdateNodeCSIInfoFunc) error {
368	b.csiMu.Lock()
369	defer b.csiMu.Unlock()
370	if b.csiBatched {
371		return fmt.Errorf("csi updates already batched")
372	}
373
374	b.csiBatched = true
375	for plugin, info := range b.csiNodePlugins {
376		f(plugin, info)
377	}
378	for plugin, info := range b.csiControllerPlugins {
379		f(plugin, info)
380	}
381	return nil
382}
383
384// updateNodeFromDriver implements drivermanager.UpdateNodeDriverInfoFn and is
385// used in the driver manager to send driver fingerprints to
386func (b *batchNodeUpdates) updateNodeFromDriver(driver string, info *structs.DriverInfo) {
387	b.driversMu.Lock()
388	defer b.driversMu.Unlock()
389	if b.driversBatched {
390		b.driverCB(driver, info)
391		return
392	}
393
394	b.drivers[driver] = info
395}
396
397// batchDriverUpdates sends all of the batched driver node updates by calling f
398// for each driver batched
399func (b *batchNodeUpdates) batchDriverUpdates(f drivermanager.UpdateNodeDriverInfoFn) error {
400	b.driversMu.Lock()
401	defer b.driversMu.Unlock()
402	if b.driversBatched {
403		return fmt.Errorf("driver updates already batched")
404	}
405
406	b.driversBatched = true
407	for driver, info := range b.drivers {
408		f(driver, info)
409	}
410	return nil
411}
412
413// updateNodeFromDevices implements devicemanager.UpdateNodeDevicesFn and is
414// used in the device manager to send device fingerprints to
415func (b *batchNodeUpdates) updateNodeFromDevices(devices []*structs.NodeDeviceResource) {
416	b.devicesMu.Lock()
417	defer b.devicesMu.Unlock()
418	if b.devicesBatched {
419		b.devicesCB(devices)
420		return
421	}
422
423	b.devices = devices
424}
425
426// batchDevicesUpdates sends the batched device node updates by calling f with
427// the devices
428func (b *batchNodeUpdates) batchDevicesUpdates(f devicemanager.UpdateNodeDevicesFn) error {
429	b.devicesMu.Lock()
430	defer b.devicesMu.Unlock()
431	if b.devicesBatched {
432		return fmt.Errorf("devices updates already batched")
433	}
434
435	b.devicesBatched = true
436	f(b.devices)
437	return nil
438}
439