1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package scheduler 21 22import ( 23 "container/heap" 24 "errors" 25 "fmt" 26 "math" 27 "sort" 28 "time" 29 30 "zabbix.com/internal/agent" 31 "zabbix.com/internal/agent/alias" 32 "zabbix.com/internal/agent/keyaccess" 33 "zabbix.com/internal/monitor" 34 "zabbix.com/pkg/conf" 35 "zabbix.com/pkg/glexpr" 36 "zabbix.com/pkg/itemutil" 37 "zabbix.com/pkg/log" 38 "zabbix.com/pkg/plugin" 39) 40 41const ( 42 // number of seconds to wait for plugins to finish during scheduler shutdown 43 shutdownTimeout = 5 44 // inactive shutdown value 45 shutdownInactive = -1 46) 47 48// Manager implements Scheduler interface and manages plugin interface usage. 49type Manager struct { 50 input chan interface{} 51 plugins map[string]*pluginAgent 52 pluginQueue pluginHeap 53 clients map[uint64]*client 54 aliases *alias.Manager 55 // number of active tasks (running in their own goroutines) 56 activeTasksNum int 57 // number of seconds left on shutdown timer 58 shutdownSeconds int 59} 60 61// updateRequest contains list of metrics monitored by a client and additional client configuration data. 62type updateRequest struct { 63 clientID uint64 64 sink plugin.ResultWriter 65 requests []*plugin.Request 66 refreshUnsupported int 67 expressions []*glexpr.Expression 68} 69 70// queryRequest contains status/debug query request. 71type queryRequest struct { 72 command string 73 sink chan string 74} 75 76type Scheduler interface { 77 UpdateTasks(clientID uint64, writer plugin.ResultWriter, refreshUnsupported int, expressions []*glexpr.Expression, 78 requests []*plugin.Request) 79 FinishTask(task performer) 80 PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) 81 Query(command string) (status string) 82} 83 84// cleanupClient performs deactivation of plugins the client is not using anymore. 85// It's called after client update and once per hour for the client associated to 86// single passive checks. 87func (m *Manager) cleanupClient(c *client, now time.Time) { 88 // get a list of plugins the client stopped using 89 released := c.cleanup(m.plugins, now) 90 for _, p := range released { 91 // check if the plugin is used by other clients 92 if p.refcount != 0 { 93 continue 94 } 95 log.Debugf("[%d] deactivate unused plugin %s", c.id, p.name()) 96 97 // deactivate recurring tasks 98 for deactivate := true; deactivate; { 99 deactivate = false 100 for _, t := range p.tasks { 101 if t.isActive() && t.isRecurring() { 102 t.deactivate() 103 // deactivation can change tasks ordering, so repeat the iteration if task was deactivated 104 deactivate = true 105 break 106 } 107 } 108 } 109 110 // queue stopper task if plugin has Runner interface 111 if _, ok := p.impl.(plugin.Runner); ok { 112 task := &stopperTask{ 113 taskBase: taskBase{plugin: p, active: true}, 114 } 115 if err := task.reschedule(now); err != nil { 116 log.Debugf("[%d] cannot schedule stopper task for plugin %s", c.id, p.name()) 117 continue 118 } 119 p.enqueueTask(task) 120 log.Debugf("[%d] created stopper task for plugin %s", c.id, p.name()) 121 122 if p.queued() { 123 m.pluginQueue.Update(p) 124 } 125 } 126 127 // queue plugin if there are still some tasks left to be finished before deactivating 128 if len(p.tasks) != 0 { 129 if !p.queued() { 130 heap.Push(&m.pluginQueue, p) 131 } 132 } 133 } 134} 135 136// processUpdateRequest processes client update request. It's being used for multiple requests 137// (active checks on a server) and also for direct requets (single passive and internal checks). 138func (m *Manager) processUpdateRequest(update *updateRequest, now time.Time) { 139 log.Debugf("[%d] processing update request (%d requests)", update.clientID, len(update.requests)) 140 141 // immediately fail direct checks and ignore bulk requests when shutting down 142 if m.shutdownSeconds != shutdownInactive { 143 if update.clientID <= agent.MaxBuiltinClientID { 144 if len(update.requests) == 1 { 145 update.sink.Write(&plugin.Result{ 146 Itemid: update.requests[0].Itemid, 147 Error: errors.New("Cannot obtain item value during shutdown process."), 148 Ts: now, 149 }) 150 } else { 151 log.Warningf("[%d] direct checks can contain only single request while received %d requests", 152 update.clientID, len(update.requests)) 153 } 154 } 155 return 156 } 157 158 var c *client 159 var ok bool 160 if c, ok = m.clients[update.clientID]; !ok { 161 if len(update.requests) == 0 { 162 log.Debugf("[%d] skipping empty update for unregistered client", update.clientID) 163 return 164 } 165 log.Debugf("[%d] registering new client", update.clientID) 166 c = newClient(update.clientID, update.sink) 167 m.clients[update.clientID] = c 168 } 169 170 c.refreshUnsupported = update.refreshUnsupported 171 c.updateExpressions(update.expressions) 172 173 for _, r := range update.requests { 174 var key string 175 var params []string 176 var err error 177 var p *pluginAgent 178 179 r.Key = m.aliases.Get(r.Key) 180 if key, params, err = itemutil.ParseKey(r.Key); err == nil { 181 p, ok = m.plugins[key] 182 if ok && update.clientID != agent.LocalChecksClientID { 183 ok = keyaccess.CheckRules(key, params) 184 } 185 if !ok { 186 err = fmt.Errorf("Unknown metric %s", key) 187 } else { 188 err = c.addRequest(p, r, update.sink, now) 189 } 190 } 191 192 if err != nil { 193 if c.id > agent.MaxBuiltinClientID { 194 if tacc, ok := c.exporters[r.Itemid]; ok { 195 log.Debugf("deactivate exporter task for item %d because of error: %s", r.Itemid, err) 196 tacc.task().deactivate() 197 } 198 } 199 update.sink.Write(&plugin.Result{Itemid: r.Itemid, Error: err, Ts: now}) 200 log.Debugf("[%d] cannot monitor metric \"%s\": %s", update.clientID, r.Key, err.Error()) 201 continue 202 } 203 204 if !p.queued() { 205 heap.Push(&m.pluginQueue, p) 206 } else { 207 m.pluginQueue.Update(p) 208 } 209 } 210 211 m.cleanupClient(c, now) 212} 213 214// processQueue processes queued plugins/tasks 215func (m *Manager) processQueue(now time.Time) { 216 seconds := now.Unix() 217 for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() { 218 if task := p.peekTask(); task != nil { 219 if task.getScheduled().Unix() > seconds { 220 break 221 } 222 223 heap.Pop(&m.pluginQueue) 224 if !p.hasCapacity() { 225 // plugin has no free capacity for the next task, keep the plugin out of queue 226 // until active tasks finishes and the required capacity is released 227 continue 228 } 229 230 // take the task out of plugin tasks queue and perform it 231 m.activeTasksNum++ 232 p.reserveCapacity(p.popTask()) 233 task.perform(m) 234 235 // if the plugin has capacity for the next task put it back into plugin queue 236 if !p.hasCapacity() { 237 continue 238 } 239 heap.Push(&m.pluginQueue, p) 240 } else { 241 // plugins with empty task queue should not be in Manager queue 242 heap.Pop(&m.pluginQueue) 243 } 244 } 245} 246 247// processFinishRequest handles finished tasks 248func (m *Manager) processFinishRequest(task performer) { 249 m.activeTasksNum-- 250 p := task.getPlugin() 251 p.releaseCapacity(task) 252 if p.active() && task.isActive() && task.isRecurring() { 253 if err := task.reschedule(time.Now()); err != nil { 254 log.Warningf("cannot reschedule plugin %s: %s", p.impl.Name(), err) 255 } else { 256 p.enqueueTask(task) 257 } 258 } 259 if !p.queued() && p.hasCapacity() { 260 heap.Push(&m.pluginQueue, p) 261 } 262} 263 264// rescheduleQueue reschedules all queued tasks. This is done whenever time 265// difference between ticks exceeds limits (for example during daylight saving changes). 266func (m *Manager) rescheduleQueue(now time.Time) { 267 // easier to rebuild queues than update each element 268 queue := make(pluginHeap, 0, len(m.pluginQueue)) 269 for _, p := range m.pluginQueue { 270 tasks := p.tasks 271 p.tasks = make(performerHeap, 0, len(tasks)) 272 for _, t := range tasks { 273 if err := t.reschedule(now); err == nil { 274 p.enqueueTask(t) 275 } 276 } 277 heap.Push(&queue, p) 278 } 279 m.pluginQueue = queue 280} 281 282// deactivatePlugins removes all tasks and creates stopper tasks for active runner plugins 283func (m *Manager) deactivatePlugins() { 284 m.shutdownSeconds = shutdownTimeout 285 286 m.pluginQueue = make(pluginHeap, 0, len(m.pluginQueue)) 287 for _, p := range m.plugins { 288 if p.refcount != 0 { 289 p.tasks = make(performerHeap, 0) 290 if _, ok := p.impl.(plugin.Runner); ok { 291 task := &stopperTask{ 292 taskBase: taskBase{plugin: p, active: true}, 293 } 294 p.enqueueTask(task) 295 heap.Push(&m.pluginQueue, p) 296 p.refcount = 0 297 log.Debugf("created final stopper task for plugin %s", p.name()) 298 } 299 p.refcount = 0 300 } 301 } 302} 303 304// run() is the main worker loop running in own goroutine until stopped 305func (m *Manager) run() { 306 defer log.PanicHook() 307 log.Debugf("starting manager") 308 // Adjust ticker creation at the 0 nanosecond timestamp. In reality it will have at least 309 // some microseconds, which will be enough to include all scheduled tasks at this second 310 // even with nanosecond priority adjustment. 311 lastTick := time.Now() 312 cleaned := lastTick 313 time.Sleep(time.Duration(1e9 - lastTick.Nanosecond())) 314 ticker := time.NewTicker(time.Second) 315run: 316 for { 317 select { 318 case <-ticker.C: 319 now := time.Now() 320 diff := now.Sub(lastTick) 321 interval := time.Second * 10 322 if diff <= -interval || diff >= interval { 323 log.Warningf("detected %d time difference between queue checks, rescheduling tasks", 324 int(math.Abs(float64(diff))/1e9)) 325 m.rescheduleQueue(now) 326 } 327 lastTick = now 328 m.processQueue(now) 329 if m.shutdownSeconds != shutdownInactive { 330 m.shutdownSeconds-- 331 if m.shutdownSeconds == 0 { 332 break run 333 } 334 } else { 335 // cleanup plugins used by passive checks 336 if now.Sub(cleaned) >= time.Hour { 337 if passive, ok := m.clients[0]; ok { 338 m.cleanupClient(passive, now) 339 } 340 // remove inactive clients 341 for _, client := range m.clients { 342 if len(client.pluginsInfo) == 0 { 343 delete(m.clients, client.ID()) 344 } 345 } 346 cleaned = now 347 } 348 } 349 case u := <-m.input: 350 if u == nil { 351 m.deactivatePlugins() 352 if m.activeTasksNum+len(m.pluginQueue) == 0 { 353 break run 354 } 355 m.processQueue(time.Now()) 356 } 357 switch v := u.(type) { 358 case *updateRequest: 359 m.processUpdateRequest(v, time.Now()) 360 m.processQueue(time.Now()) 361 case performer: 362 m.processFinishRequest(v) 363 if m.shutdownSeconds != shutdownInactive && m.activeTasksNum+len(m.pluginQueue) == 0 { 364 break run 365 } 366 m.processQueue(time.Now()) 367 case *queryRequest: 368 if response, err := m.processQuery(v); err != nil { 369 v.sink <- "cannot process request: " + err.Error() 370 } else { 371 v.sink <- response 372 } 373 } 374 } 375 } 376 log.Debugf("manager has been stopped") 377 monitor.Unregister(monitor.Scheduler) 378} 379 380type pluginCapacity struct { 381 Capacity int `conf:"optional"` 382} 383 384func (m *Manager) init() { 385 m.input = make(chan interface{}, 10) 386 m.pluginQueue = make(pluginHeap, 0, len(plugin.Metrics)) 387 m.clients = make(map[uint64]*client) 388 m.plugins = make(map[string]*pluginAgent) 389 m.shutdownSeconds = shutdownInactive 390 391 metrics := make([]*plugin.Metric, 0, len(plugin.Metrics)) 392 393 for _, metric := range plugin.Metrics { 394 metrics = append(metrics, metric) 395 } 396 sort.Slice(metrics, func(i, j int) bool { 397 return metrics[i].Plugin.Name() < metrics[j].Plugin.Name() 398 }) 399 400 pagent := &pluginAgent{} 401 for _, metric := range metrics { 402 if metric.Plugin != pagent.impl { 403 capacity := metric.Plugin.Capacity() 404 var opts pluginCapacity 405 optsRaw := agent.Options.Plugins[metric.Plugin.Name()] 406 if optsRaw != nil { 407 if err := conf.Unmarshal(optsRaw, &opts, false); err != nil { 408 log.Warningf("invalid plugin %s configuration: %s", metric.Plugin.Name(), err) 409 log.Warningf("using default plugin capacity settings: %d", plugin.DefaultCapacity) 410 capacity = plugin.DefaultCapacity 411 } else { 412 if opts.Capacity != 0 { 413 capacity = opts.Capacity 414 } 415 } 416 } 417 418 if capacity > metric.Plugin.Capacity() { 419 log.Warningf("lowering the plugin %s capacity to %d as the configured capacity %d exceeds limits", 420 metric.Plugin.Name(), metric.Plugin.Capacity(), capacity) 421 capacity = metric.Plugin.Capacity() 422 } 423 424 pagent = &pluginAgent{ 425 impl: metric.Plugin, 426 tasks: make(performerHeap, 0), 427 maxCapacity: capacity, 428 usedCapacity: 0, 429 index: -1, 430 refcount: 0, 431 } 432 433 interfaces := "" 434 if _, ok := metric.Plugin.(plugin.Exporter); ok { 435 interfaces += "exporter, " 436 } 437 if _, ok := metric.Plugin.(plugin.Collector); ok { 438 interfaces += "collector, " 439 } 440 if _, ok := metric.Plugin.(plugin.Runner); ok { 441 interfaces += "runner, " 442 } 443 if _, ok := metric.Plugin.(plugin.Watcher); ok { 444 interfaces += "watcher, " 445 } 446 if _, ok := metric.Plugin.(plugin.Configurator); ok { 447 interfaces += "configurator, " 448 } 449 interfaces = interfaces[:len(interfaces)-2] 450 log.Infof("using plugin '%s' providing following interfaces: %s", metric.Plugin.Name(), interfaces) 451 } 452 m.plugins[metric.Key] = pagent 453 } 454} 455func (m *Manager) Start() { 456 monitor.Register(monitor.Scheduler) 457 go m.run() 458} 459 460func (m *Manager) Stop() { 461 m.input <- nil 462} 463 464func (m *Manager) UpdateTasks(clientID uint64, writer plugin.ResultWriter, refreshUnsupported int, 465 expressions []*glexpr.Expression, requests []*plugin.Request) { 466 467 m.input <- &updateRequest{clientID: clientID, 468 sink: writer, 469 requests: requests, 470 refreshUnsupported: refreshUnsupported, 471 expressions: expressions, 472 } 473} 474 475type resultWriter chan *plugin.Result 476 477func (r resultWriter) Write(result *plugin.Result) { 478 r <- result 479} 480 481func (r resultWriter) Flush() { 482} 483 484func (r resultWriter) SlotsAvailable() int { 485 return 1 486} 487 488func (r resultWriter) PersistSlotsAvailable() int { 489 return 1 490} 491 492func (m *Manager) PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) { 493 var lastLogsize uint64 494 var mtime int 495 496 w := make(resultWriter, 1) 497 498 m.UpdateTasks(clientID, w, 0, nil, []*plugin.Request{{Key: key, LastLogsize: &lastLogsize, Mtime: &mtime}}) 499 500 select { 501 case r := <-w: 502 if r.Error == nil { 503 if r.Value != nil { 504 result = *r.Value 505 } else { 506 // single metric requests do not support empty values, return error instead 507 err = errors.New("No values have been gathered yet.") 508 } 509 } else { 510 err = r.Error 511 } 512 case <-time.After(timeout): 513 err = fmt.Errorf("Timeout occurred while gathering data.") 514 } 515 return 516} 517 518func (m *Manager) FinishTask(task performer) { 519 m.input <- task 520} 521 522func (m *Manager) Query(command string) (status string) { 523 request := &queryRequest{command: command, sink: make(chan string)} 524 m.input <- request 525 return <-request.sink 526} 527 528func (m *Manager) validatePlugins(options *agent.AgentOptions) (err error) { 529 for _, p := range plugin.Plugins { 530 if c, ok := p.(plugin.Configurator); ok { 531 if err = c.Validate(options.Plugins[p.Name()]); err != nil { 532 return fmt.Errorf("invalid plugin %s configuration: %s", p.Name(), err) 533 } 534 } 535 } 536 return 537} 538 539func (m *Manager) configure(options *agent.AgentOptions) (err error) { 540 m.aliases, err = alias.NewManager(options) 541 return 542} 543 544func NewManager(options *agent.AgentOptions) (mannager *Manager, err error) { 545 var m Manager 546 m.init() 547 if err = m.validatePlugins(options); err != nil { 548 return 549 } 550 return &m, m.configure(options) 551} 552