1package client 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/hashicorp/nomad/client/devicemanager" 10 "github.com/hashicorp/nomad/client/pluginmanager/csimanager" 11 "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" 12 "github.com/hashicorp/nomad/nomad/structs" 13) 14 15var ( 16 // batchFirstFingerprintsTimeout is the maximum amount of time to wait for 17 // initial fingerprinting to complete before sending a batched Node update 18 batchFirstFingerprintsTimeout = 50 * time.Second 19) 20 21// batchFirstFingerprints waits for the first fingerprint response from all 22// plugin managers and sends a single Node update for all fingerprints. It 23// should only ever be called once 24func (c *Client) batchFirstFingerprints() { 25 ctx, cancel := context.WithTimeout(context.Background(), batchFirstFingerprintsTimeout) 26 defer cancel() 27 28 ch, err := c.pluginManagers.WaitForFirstFingerprint(ctx) 29 if err != nil { 30 c.logger.Warn("failed to batch initial fingerprint updates, switching to incemental updates") 31 goto SEND_BATCH 32 } 33 34 // Wait for fingerprinting to complete or timeout before processing batches 35 select { 36 case <-ch: 37 case <-ctx.Done(): 38 } 39 40SEND_BATCH: 41 c.configLock.Lock() 42 defer c.configLock.Unlock() 43 44 // csi updates 45 var csiChanged bool 46 c.batchNodeUpdates.batchCSIUpdates(func(name string, info *structs.CSIInfo) { 47 if c.updateNodeFromCSIControllerLocked(name, info) { 48 if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() { 49 c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now() 50 } 51 csiChanged = true 52 } 53 if c.updateNodeFromCSINodeLocked(name, info) { 54 if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() { 55 c.config.Node.CSINodePlugins[name].UpdateTime = time.Now() 56 } 57 csiChanged = true 58 } 59 }) 60 61 // driver node updates 62 var driverChanged bool 63 c.batchNodeUpdates.batchDriverUpdates(func(driver string, info *structs.DriverInfo) { 64 if c.updateNodeFromDriverLocked(driver, info) { 65 c.config.Node.Drivers[driver] = info 66 if c.config.Node.Drivers[driver].UpdateTime.IsZero() { 67 c.config.Node.Drivers[driver].UpdateTime = time.Now() 68 } 69 driverChanged = true 70 } 71 }) 72 73 // device node updates 74 var devicesChanged bool 75 c.batchNodeUpdates.batchDevicesUpdates(func(devices []*structs.NodeDeviceResource) { 76 if c.updateNodeFromDevicesLocked(devices) { 77 devicesChanged = true 78 } 79 }) 80 81 // only update the node if changes occurred 82 if driverChanged || devicesChanged || csiChanged { 83 c.updateNodeLocked() 84 } 85 86 close(c.fpInitialized) 87} 88 89// updateNodeFromCSI receives a CSIInfo struct for the plugin and updates the 90// node accordingly 91func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) { 92 c.configLock.Lock() 93 defer c.configLock.Unlock() 94 95 changed := false 96 97 if c.updateNodeFromCSIControllerLocked(name, info) { 98 if c.config.Node.CSIControllerPlugins[name].UpdateTime.IsZero() { 99 c.config.Node.CSIControllerPlugins[name].UpdateTime = time.Now() 100 } 101 changed = true 102 } 103 104 if c.updateNodeFromCSINodeLocked(name, info) { 105 if c.config.Node.CSINodePlugins[name].UpdateTime.IsZero() { 106 c.config.Node.CSINodePlugins[name].UpdateTime = time.Now() 107 } 108 changed = true 109 } 110 111 if changed { 112 c.updateNodeLocked() 113 } 114} 115 116// updateNodeFromCSIControllerLocked makes the changes to the node from a csi 117// update but does not send the update to the server. c.configLock must be held 118// before calling this func. 119// 120// It is safe to call for all CSI Updates, but will only perform changes when 121// a ControllerInfo field is present. 122func (c *Client) updateNodeFromCSIControllerLocked(name string, info *structs.CSIInfo) bool { 123 var changed bool 124 if info.ControllerInfo == nil { 125 return false 126 } 127 i := info.Copy() 128 i.NodeInfo = nil 129 130 oldController, hadController := c.config.Node.CSIControllerPlugins[name] 131 if !hadController { 132 // If the controller info has not yet been set, do that here 133 changed = true 134 c.config.Node.CSIControllerPlugins[name] = i 135 } else { 136 // The controller info has already been set, fix it up 137 if !oldController.Equal(i) { 138 c.config.Node.CSIControllerPlugins[name] = i 139 changed = true 140 } 141 142 // If health state has changed, trigger node event 143 if oldController.Healthy != i.Healthy || oldController.HealthDescription != i.HealthDescription { 144 changed = true 145 if i.HealthDescription != "" { 146 event := structs.NewNodeEvent(). 147 SetSubsystem("CSI"). 148 SetMessage(i.HealthDescription). 149 AddDetail("plugin", name). 150 AddDetail("type", "controller") 151 c.triggerNodeEvent(event) 152 } 153 } 154 } 155 156 return changed 157} 158 159// updateNodeFromCSINodeLocked makes the changes to the node from a csi 160// update but does not send the update to the server. c.configLock must be hel 161// before calling this func. 162// 163// It is safe to call for all CSI Updates, but will only perform changes when 164// a NodeInfo field is present. 165func (c *Client) updateNodeFromCSINodeLocked(name string, info *structs.CSIInfo) bool { 166 var changed bool 167 if info.NodeInfo == nil { 168 return false 169 } 170 i := info.Copy() 171 i.ControllerInfo = nil 172 173 oldNode, hadNode := c.config.Node.CSINodePlugins[name] 174 if !hadNode { 175 // If the Node info has not yet been set, do that here 176 changed = true 177 c.config.Node.CSINodePlugins[name] = i 178 } else { 179 // The node info has already been set, fix it up 180 if !oldNode.Equal(info) { 181 c.config.Node.CSINodePlugins[name] = i 182 changed = true 183 } 184 185 // If health state has changed, trigger node event 186 if oldNode.Healthy != i.Healthy || oldNode.HealthDescription != i.HealthDescription { 187 changed = true 188 if i.HealthDescription != "" { 189 event := structs.NewNodeEvent(). 190 SetSubsystem("CSI"). 191 SetMessage(i.HealthDescription). 192 AddDetail("plugin", name). 193 AddDetail("type", "node") 194 c.triggerNodeEvent(event) 195 } 196 } 197 } 198 199 return changed 200} 201 202// updateNodeFromDriver receives a DriverInfo struct for the driver and updates 203// the node accordingly 204func (c *Client) updateNodeFromDriver(name string, info *structs.DriverInfo) { 205 c.configLock.Lock() 206 defer c.configLock.Unlock() 207 208 if c.updateNodeFromDriverLocked(name, info) { 209 c.config.Node.Drivers[name] = info 210 if c.config.Node.Drivers[name].UpdateTime.IsZero() { 211 c.config.Node.Drivers[name].UpdateTime = time.Now() 212 } 213 c.updateNodeLocked() 214 } 215} 216 217// updateNodeFromDriverLocked makes the changes to the node from a driver update 218// but does not send the update to the server. c.configLock must be held before 219// calling this func 220func (c *Client) updateNodeFromDriverLocked(name string, info *structs.DriverInfo) bool { 221 var hasChanged bool 222 223 hadDriver := c.config.Node.Drivers[name] != nil 224 if !hadDriver { 225 // If the driver info has not yet been set, do that here 226 hasChanged = true 227 for attrName, newVal := range info.Attributes { 228 c.config.Node.Attributes[attrName] = newVal 229 } 230 } else { 231 oldVal := c.config.Node.Drivers[name] 232 // The driver info has already been set, fix it up 233 if oldVal.Detected != info.Detected { 234 hasChanged = true 235 } 236 237 // If health state has change, trigger node event 238 if oldVal.Healthy != info.Healthy || oldVal.HealthDescription != info.HealthDescription { 239 hasChanged = true 240 if info.HealthDescription != "" { 241 event := structs.NewNodeEvent(). 242 SetSubsystem("Driver"). 243 SetMessage(info.HealthDescription). 244 AddDetail("driver", name) 245 c.triggerNodeEvent(event) 246 } 247 } 248 249 for attrName, newVal := range info.Attributes { 250 oldVal := c.config.Node.Drivers[name].Attributes[attrName] 251 if oldVal == newVal { 252 continue 253 } 254 255 hasChanged = true 256 if newVal == "" { 257 delete(c.config.Node.Attributes, attrName) 258 } else { 259 c.config.Node.Attributes[attrName] = newVal 260 } 261 } 262 } 263 264 // COMPAT Remove in Nomad 0.10 265 // We maintain the driver enabled attribute until all drivers expose 266 // their attributes as DriverInfo 267 driverName := fmt.Sprintf("driver.%s", name) 268 if info.Detected { 269 c.config.Node.Attributes[driverName] = "1" 270 } else { 271 delete(c.config.Node.Attributes, driverName) 272 } 273 274 return hasChanged 275} 276 277// updateNodeFromFingerprint updates the node with the result of 278// fingerprinting the node from the diff that was created 279func (c *Client) updateNodeFromDevices(devices []*structs.NodeDeviceResource) { 280 c.configLock.Lock() 281 defer c.configLock.Unlock() 282 283 // Not updating node.Resources: the field is deprecated and includes 284 // dispatched task resources and not appropriate for expressing 285 // node available device resources 286 if c.updateNodeFromDevicesLocked(devices) { 287 c.updateNodeLocked() 288 } 289} 290 291// updateNodeFromDevicesLocked updates the node with the results of devices, 292// but does send the update to the server. c.configLock must be held before 293// calling this func 294func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResource) bool { 295 if !structs.DevicesEquals(c.config.Node.NodeResources.Devices, devices) { 296 c.logger.Debug("new devices detected", "devices", len(devices)) 297 c.config.Node.NodeResources.Devices = devices 298 return true 299 } 300 301 return false 302} 303 304// batchNodeUpdates allows for batching multiple Node updates from fingerprinting. 305// Once ready, the batches can be flushed and toggled to stop batching and forward 306// all updates to a configured callback to be performed incrementally 307type batchNodeUpdates struct { 308 // access to driver fields must hold driversMu lock 309 drivers map[string]*structs.DriverInfo 310 driversBatched bool 311 driverCB drivermanager.UpdateNodeDriverInfoFn 312 driversMu sync.Mutex 313 314 // access to devices fields must hold devicesMu lock 315 devices []*structs.NodeDeviceResource 316 devicesBatched bool 317 devicesCB devicemanager.UpdateNodeDevicesFn 318 devicesMu sync.Mutex 319 320 // access to csi fields must hold csiMu lock 321 csiNodePlugins map[string]*structs.CSIInfo 322 csiControllerPlugins map[string]*structs.CSIInfo 323 csiBatched bool 324 csiCB csimanager.UpdateNodeCSIInfoFunc 325 csiMu sync.Mutex 326} 327 328func newBatchNodeUpdates( 329 driverCB drivermanager.UpdateNodeDriverInfoFn, 330 devicesCB devicemanager.UpdateNodeDevicesFn, 331 csiCB csimanager.UpdateNodeCSIInfoFunc) *batchNodeUpdates { 332 333 return &batchNodeUpdates{ 334 drivers: make(map[string]*structs.DriverInfo), 335 driverCB: driverCB, 336 devices: []*structs.NodeDeviceResource{}, 337 devicesCB: devicesCB, 338 csiNodePlugins: make(map[string]*structs.CSIInfo), 339 csiControllerPlugins: make(map[string]*structs.CSIInfo), 340 csiCB: csiCB, 341 } 342} 343 344// updateNodeFromCSI implements csimanager.UpdateNodeCSIInfoFunc and is used in 345// the csi manager to send csi fingerprints to the server. 346func (b *batchNodeUpdates) updateNodeFromCSI(plugin string, info *structs.CSIInfo) { 347 b.csiMu.Lock() 348 defer b.csiMu.Unlock() 349 if b.csiBatched { 350 b.csiCB(plugin, info) 351 return 352 } 353 354 // Only one of these is expected to be set, but a future implementation that 355 // explicitly models monolith plugins with a single fingerprinter may set both 356 if info.ControllerInfo != nil { 357 b.csiControllerPlugins[plugin] = info 358 } 359 360 if info.NodeInfo != nil { 361 b.csiNodePlugins[plugin] = info 362 } 363} 364 365// batchCSIUpdates sends all of the batched CSI updates by calling f for each 366// plugin batched 367func (b *batchNodeUpdates) batchCSIUpdates(f csimanager.UpdateNodeCSIInfoFunc) error { 368 b.csiMu.Lock() 369 defer b.csiMu.Unlock() 370 if b.csiBatched { 371 return fmt.Errorf("csi updates already batched") 372 } 373 374 b.csiBatched = true 375 for plugin, info := range b.csiNodePlugins { 376 f(plugin, info) 377 } 378 for plugin, info := range b.csiControllerPlugins { 379 f(plugin, info) 380 } 381 return nil 382} 383 384// updateNodeFromDriver implements drivermanager.UpdateNodeDriverInfoFn and is 385// used in the driver manager to send driver fingerprints to 386func (b *batchNodeUpdates) updateNodeFromDriver(driver string, info *structs.DriverInfo) { 387 b.driversMu.Lock() 388 defer b.driversMu.Unlock() 389 if b.driversBatched { 390 b.driverCB(driver, info) 391 return 392 } 393 394 b.drivers[driver] = info 395} 396 397// batchDriverUpdates sends all of the batched driver node updates by calling f 398// for each driver batched 399func (b *batchNodeUpdates) batchDriverUpdates(f drivermanager.UpdateNodeDriverInfoFn) error { 400 b.driversMu.Lock() 401 defer b.driversMu.Unlock() 402 if b.driversBatched { 403 return fmt.Errorf("driver updates already batched") 404 } 405 406 b.driversBatched = true 407 for driver, info := range b.drivers { 408 f(driver, info) 409 } 410 return nil 411} 412 413// updateNodeFromDevices implements devicemanager.UpdateNodeDevicesFn and is 414// used in the device manager to send device fingerprints to 415func (b *batchNodeUpdates) updateNodeFromDevices(devices []*structs.NodeDeviceResource) { 416 b.devicesMu.Lock() 417 defer b.devicesMu.Unlock() 418 if b.devicesBatched { 419 b.devicesCB(devices) 420 return 421 } 422 423 b.devices = devices 424} 425 426// batchDevicesUpdates sends the batched device node updates by calling f with 427// the devices 428func (b *batchNodeUpdates) batchDevicesUpdates(f devicemanager.UpdateNodeDevicesFn) error { 429 b.devicesMu.Lock() 430 defer b.devicesMu.Unlock() 431 if b.devicesBatched { 432 return fmt.Errorf("devices updates already batched") 433 } 434 435 b.devicesBatched = true 436 f(b.devices) 437 return nil 438} 439