1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package scheduler 21 22import ( 23 "container/heap" 24 "errors" 25 "fmt" 26 "math" 27 "sort" 28 "time" 29 30 "zabbix.com/internal/agent" 31 "zabbix.com/internal/agent/alias" 32 "zabbix.com/internal/agent/keyaccess" 33 "zabbix.com/internal/monitor" 34 "zabbix.com/pkg/conf" 35 "zabbix.com/pkg/glexpr" 36 "zabbix.com/pkg/itemutil" 37 "zabbix.com/pkg/log" 38 "zabbix.com/pkg/plugin" 39) 40 41const ( 42 // number of seconds to wait for plugins to finish during scheduler shutdown 43 shutdownTimeout = 5 44 // inactive shutdown value 45 shutdownInactive = -1 46) 47 48// Manager implements Scheduler interface and manages plugin interface usage. 49type Manager struct { 50 input chan interface{} 51 plugins map[string]*pluginAgent 52 pluginQueue pluginHeap 53 clients map[uint64]*client 54 aliases *alias.Manager 55 // number of active tasks (running in their own goroutines) 56 activeTasksNum int 57 // number of seconds left on shutdown timer 58 shutdownSeconds int 59} 60 61// updateRequest contains list of metrics monitored by a client and additional client configuration data. 62type updateRequest struct { 63 clientID uint64 64 sink plugin.ResultWriter 65 requests []*plugin.Request 66 expressions []*glexpr.Expression 67} 68 69// queryRequest contains status/debug query request. 70type queryRequest struct { 71 command string 72 sink chan string 73} 74 75type Scheduler interface { 76 UpdateTasks(clientID uint64, writer plugin.ResultWriter, expressions []*glexpr.Expression, 77 requests []*plugin.Request) 78 FinishTask(task performer) 79 PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) 80 Query(command string) (status string) 81} 82 83// cleanupClient performs deactivation of plugins the client is not using anymore. 84// It's called after client update and once per hour for the client associated to 85// single passive checks. 86func (m *Manager) cleanupClient(c *client, now time.Time) { 87 // get a list of plugins the client stopped using 88 released := c.cleanup(m.plugins, now) 89 for _, p := range released { 90 // check if the plugin is used by other clients 91 if p.refcount != 0 { 92 continue 93 } 94 log.Debugf("[%d] deactivate unused plugin %s", c.id, p.name()) 95 96 // deactivate recurring tasks 97 for deactivate := true; deactivate; { 98 deactivate = false 99 for _, t := range p.tasks { 100 if t.isActive() && t.isRecurring() { 101 t.deactivate() 102 // deactivation can change tasks ordering, so repeat the iteration if task was deactivated 103 deactivate = true 104 break 105 } 106 } 107 } 108 109 // queue stopper task if plugin has Runner interface 110 if _, ok := p.impl.(plugin.Runner); ok { 111 task := &stopperTask{ 112 taskBase: taskBase{plugin: p, active: true}, 113 } 114 if err := task.reschedule(now); err != nil { 115 log.Debugf("[%d] cannot schedule stopper task for plugin %s", c.id, p.name()) 116 continue 117 } 118 p.enqueueTask(task) 119 log.Debugf("[%d] created stopper task for plugin %s", c.id, p.name()) 120 121 if p.queued() { 122 m.pluginQueue.Update(p) 123 } 124 } 125 126 // queue plugin if there are still some tasks left to be finished before deactivating 127 if len(p.tasks) != 0 { 128 if !p.queued() { 129 heap.Push(&m.pluginQueue, p) 130 } 131 } 132 } 133} 134 135// processUpdateRequest processes client update request. It's being used for multiple requests 136// (active checks on a server) and also for direct requets (single passive and internal checks). 137func (m *Manager) processUpdateRequest(update *updateRequest, now time.Time) { 138 log.Debugf("[%d] processing update request (%d requests)", update.clientID, len(update.requests)) 139 140 // immediately fail direct checks and ignore bulk requests when shutting down 141 if m.shutdownSeconds != shutdownInactive { 142 if update.clientID <= agent.MaxBuiltinClientID { 143 if len(update.requests) == 1 { 144 update.sink.Write(&plugin.Result{ 145 Itemid: update.requests[0].Itemid, 146 Error: errors.New("Cannot obtain item value during shutdown process."), 147 Ts: now, 148 }) 149 } else { 150 log.Warningf("[%d] direct checks can contain only single request while received %d requests", 151 update.clientID, len(update.requests)) 152 } 153 } 154 return 155 } 156 157 var c *client 158 var ok bool 159 if c, ok = m.clients[update.clientID]; !ok { 160 if len(update.requests) == 0 { 161 log.Debugf("[%d] skipping empty update for unregistered client", update.clientID) 162 return 163 } 164 log.Debugf("[%d] registering new client", update.clientID) 165 c = newClient(update.clientID, update.sink) 166 m.clients[update.clientID] = c 167 } 168 169 c.updateExpressions(update.expressions) 170 171 for _, r := range update.requests { 172 var key string 173 var params []string 174 var err error 175 var p *pluginAgent 176 177 r.Key = m.aliases.Get(r.Key) 178 if key, params, err = itemutil.ParseKey(r.Key); err == nil { 179 p, ok = m.plugins[key] 180 if ok && update.clientID != agent.LocalChecksClientID { 181 ok = keyaccess.CheckRules(key, params) 182 } 183 if !ok { 184 err = fmt.Errorf("Unknown metric %s", key) 185 } else { 186 err = c.addRequest(p, r, update.sink, now) 187 } 188 } 189 190 if err != nil { 191 if c.id > agent.MaxBuiltinClientID { 192 if tacc, ok := c.exporters[r.Itemid]; ok { 193 log.Debugf("deactivate exporter task for item %d because of error: %s", r.Itemid, err) 194 tacc.task().deactivate() 195 } 196 } 197 update.sink.Write(&plugin.Result{Itemid: r.Itemid, Error: err, Ts: now}) 198 log.Debugf("[%d] cannot monitor metric \"%s\": %s", update.clientID, r.Key, err.Error()) 199 continue 200 } 201 202 if !p.queued() { 203 heap.Push(&m.pluginQueue, p) 204 } else { 205 m.pluginQueue.Update(p) 206 } 207 } 208 209 m.cleanupClient(c, now) 210} 211 212// processQueue processes queued plugins/tasks 213func (m *Manager) processQueue(now time.Time) { 214 seconds := now.Unix() 215 for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() { 216 if task := p.peekTask(); task != nil { 217 if task.getScheduled().Unix() > seconds { 218 break 219 } 220 221 heap.Pop(&m.pluginQueue) 222 if !p.hasCapacity() { 223 // plugin has no free capacity for the next task, keep the plugin out of queue 224 // until active tasks finishes and the required capacity is released 225 continue 226 } 227 228 // take the task out of plugin tasks queue and perform it 229 m.activeTasksNum++ 230 p.reserveCapacity(p.popTask()) 231 task.perform(m) 232 233 // if the plugin has capacity for the next task put it back into plugin queue 234 if !p.hasCapacity() { 235 continue 236 } 237 heap.Push(&m.pluginQueue, p) 238 } else { 239 // plugins with empty task queue should not be in Manager queue 240 heap.Pop(&m.pluginQueue) 241 } 242 } 243} 244 245// processFinishRequest handles finished tasks 246func (m *Manager) processFinishRequest(task performer) { 247 m.activeTasksNum-- 248 p := task.getPlugin() 249 p.releaseCapacity(task) 250 if p.active() && task.isActive() && task.isRecurring() { 251 if err := task.reschedule(time.Now()); err != nil { 252 log.Warningf("cannot reschedule plugin %s: %s", p.impl.Name(), err) 253 } else { 254 p.enqueueTask(task) 255 } 256 } 257 if !p.queued() && p.hasCapacity() { 258 heap.Push(&m.pluginQueue, p) 259 } 260} 261 262// rescheduleQueue reschedules all queued tasks. This is done whenever time 263// difference between ticks exceeds limits (for example during daylight saving changes). 264func (m *Manager) rescheduleQueue(now time.Time) { 265 // easier to rebuild queues than update each element 266 queue := make(pluginHeap, 0, len(m.pluginQueue)) 267 for _, p := range m.pluginQueue { 268 tasks := p.tasks 269 p.tasks = make(performerHeap, 0, len(tasks)) 270 for _, t := range tasks { 271 if err := t.reschedule(now); err == nil { 272 p.enqueueTask(t) 273 } 274 } 275 heap.Push(&queue, p) 276 } 277 m.pluginQueue = queue 278} 279 280// deactivatePlugins removes all tasks and creates stopper tasks for active runner plugins 281func (m *Manager) deactivatePlugins() { 282 m.shutdownSeconds = shutdownTimeout 283 284 m.pluginQueue = make(pluginHeap, 0, len(m.pluginQueue)) 285 for _, p := range m.plugins { 286 if p.refcount != 0 { 287 p.tasks = make(performerHeap, 0) 288 if _, ok := p.impl.(plugin.Runner); ok { 289 task := &stopperTask{ 290 taskBase: taskBase{plugin: p, active: true}, 291 } 292 p.enqueueTask(task) 293 heap.Push(&m.pluginQueue, p) 294 p.refcount = 0 295 log.Debugf("created final stopper task for plugin %s", p.name()) 296 } 297 p.refcount = 0 298 } 299 } 300} 301 302// run() is the main worker loop running in own goroutine until stopped 303func (m *Manager) run() { 304 defer log.PanicHook() 305 log.Debugf("starting manager") 306 // Adjust ticker creation at the 0 nanosecond timestamp. In reality it will have at least 307 // some microseconds, which will be enough to include all scheduled tasks at this second 308 // even with nanosecond priority adjustment. 309 lastTick := time.Now() 310 cleaned := lastTick 311 time.Sleep(time.Duration(1e9 - lastTick.Nanosecond())) 312 ticker := time.NewTicker(time.Second) 313run: 314 for { 315 select { 316 case <-ticker.C: 317 now := time.Now() 318 diff := now.Sub(lastTick) 319 interval := time.Second * 10 320 if diff <= -interval || diff >= interval { 321 log.Warningf("detected %d time difference between queue checks, rescheduling tasks", 322 int(math.Abs(float64(diff))/1e9)) 323 m.rescheduleQueue(now) 324 } 325 lastTick = now 326 m.processQueue(now) 327 if m.shutdownSeconds != shutdownInactive { 328 m.shutdownSeconds-- 329 if m.shutdownSeconds == 0 { 330 break run 331 } 332 } else { 333 // cleanup plugins used by passive checks 334 if now.Sub(cleaned) >= time.Hour { 335 if passive, ok := m.clients[0]; ok { 336 m.cleanupClient(passive, now) 337 } 338 // remove inactive clients 339 for _, client := range m.clients { 340 if len(client.pluginsInfo) == 0 { 341 delete(m.clients, client.ID()) 342 } 343 } 344 cleaned = now 345 } 346 } 347 case u := <-m.input: 348 if u == nil { 349 m.deactivatePlugins() 350 if m.activeTasksNum+len(m.pluginQueue) == 0 { 351 break run 352 } 353 m.processQueue(time.Now()) 354 } 355 switch v := u.(type) { 356 case *updateRequest: 357 m.processUpdateRequest(v, time.Now()) 358 m.processQueue(time.Now()) 359 case performer: 360 m.processFinishRequest(v) 361 if m.shutdownSeconds != shutdownInactive && m.activeTasksNum+len(m.pluginQueue) == 0 { 362 break run 363 } 364 m.processQueue(time.Now()) 365 case *queryRequest: 366 if response, err := m.processQuery(v); err != nil { 367 v.sink <- "cannot process request: " + err.Error() 368 } else { 369 v.sink <- response 370 } 371 } 372 } 373 } 374 log.Debugf("manager has been stopped") 375 monitor.Unregister(monitor.Scheduler) 376} 377 378type pluginCapacity struct { 379 Capacity int `conf:"optional"` 380} 381 382func (m *Manager) init() { 383 m.input = make(chan interface{}, 10) 384 m.pluginQueue = make(pluginHeap, 0, len(plugin.Metrics)) 385 m.clients = make(map[uint64]*client) 386 m.plugins = make(map[string]*pluginAgent) 387 m.shutdownSeconds = shutdownInactive 388 389 metrics := make([]*plugin.Metric, 0, len(plugin.Metrics)) 390 391 for _, metric := range plugin.Metrics { 392 metrics = append(metrics, metric) 393 } 394 sort.Slice(metrics, func(i, j int) bool { 395 return metrics[i].Plugin.Name() < metrics[j].Plugin.Name() 396 }) 397 398 pagent := &pluginAgent{} 399 for _, metric := range metrics { 400 if metric.Plugin != pagent.impl { 401 capacity := metric.Plugin.Capacity() 402 var opts pluginCapacity 403 optsRaw := agent.Options.Plugins[metric.Plugin.Name()] 404 if optsRaw != nil { 405 if err := conf.Unmarshal(optsRaw, &opts, false); err != nil { 406 log.Warningf("invalid plugin %s configuration: %s", metric.Plugin.Name(), err) 407 log.Warningf("using default plugin capacity settings: %d", plugin.DefaultCapacity) 408 capacity = plugin.DefaultCapacity 409 } else { 410 if opts.Capacity != 0 { 411 capacity = opts.Capacity 412 } 413 } 414 } 415 416 if capacity > metric.Plugin.Capacity() { 417 log.Warningf("lowering the plugin %s capacity to %d as the configured capacity %d exceeds limits", 418 metric.Plugin.Name(), metric.Plugin.Capacity(), capacity) 419 capacity = metric.Plugin.Capacity() 420 } 421 422 pagent = &pluginAgent{ 423 impl: metric.Plugin, 424 tasks: make(performerHeap, 0), 425 maxCapacity: capacity, 426 usedCapacity: 0, 427 index: -1, 428 refcount: 0, 429 } 430 431 interfaces := "" 432 if _, ok := metric.Plugin.(plugin.Exporter); ok { 433 interfaces += "exporter, " 434 } 435 if _, ok := metric.Plugin.(plugin.Collector); ok { 436 interfaces += "collector, " 437 } 438 if _, ok := metric.Plugin.(plugin.Runner); ok { 439 interfaces += "runner, " 440 } 441 if _, ok := metric.Plugin.(plugin.Watcher); ok { 442 interfaces += "watcher, " 443 } 444 if _, ok := metric.Plugin.(plugin.Configurator); ok { 445 interfaces += "configurator, " 446 } 447 interfaces = interfaces[:len(interfaces)-2] 448 log.Infof("using plugin '%s' providing following interfaces: %s", metric.Plugin.Name(), interfaces) 449 } 450 m.plugins[metric.Key] = pagent 451 } 452} 453func (m *Manager) Start() { 454 monitor.Register(monitor.Scheduler) 455 go m.run() 456} 457 458func (m *Manager) Stop() { 459 m.input <- nil 460} 461 462func (m *Manager) UpdateTasks(clientID uint64, writer plugin.ResultWriter, 463 expressions []*glexpr.Expression, requests []*plugin.Request) { 464 465 m.input <- &updateRequest{clientID: clientID, 466 sink: writer, 467 requests: requests, 468 expressions: expressions, 469 } 470} 471 472type resultWriter chan *plugin.Result 473 474func (r resultWriter) Write(result *plugin.Result) { 475 r <- result 476} 477 478func (r resultWriter) Flush() { 479} 480 481func (r resultWriter) SlotsAvailable() int { 482 return 1 483} 484 485func (r resultWriter) PersistSlotsAvailable() int { 486 return 1 487} 488 489func (m *Manager) PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) { 490 var lastLogsize uint64 491 var mtime int 492 493 w := make(resultWriter, 1) 494 495 m.UpdateTasks(clientID, w, nil, []*plugin.Request{{Key: key, LastLogsize: &lastLogsize, Mtime: &mtime}}) 496 497 select { 498 case r := <-w: 499 if r.Error == nil { 500 if r.Value != nil { 501 result = *r.Value 502 } else { 503 // single metric requests do not support empty values, return error instead 504 err = errors.New("No values have been gathered yet.") 505 } 506 } else { 507 err = r.Error 508 } 509 case <-time.After(timeout): 510 err = fmt.Errorf("Timeout occurred while gathering data.") 511 } 512 return 513} 514 515func (m *Manager) FinishTask(task performer) { 516 m.input <- task 517} 518 519func (m *Manager) Query(command string) (status string) { 520 request := &queryRequest{command: command, sink: make(chan string)} 521 m.input <- request 522 return <-request.sink 523} 524 525func (m *Manager) validatePlugins(options *agent.AgentOptions) (err error) { 526 for _, p := range plugin.Plugins { 527 if c, ok := p.(plugin.Configurator); ok { 528 if err = c.Validate(options.Plugins[p.Name()]); err != nil { 529 return fmt.Errorf("invalid plugin %s configuration: %s", p.Name(), err) 530 } 531 } 532 } 533 return 534} 535 536func (m *Manager) configure(options *agent.AgentOptions) (err error) { 537 m.aliases, err = alias.NewManager(options) 538 return 539} 540 541func NewManager(options *agent.AgentOptions) (mannager *Manager, err error) { 542 var m Manager 543 m.init() 544 if err = m.validatePlugins(options); err != nil { 545 return 546 } 547 return &m, m.configure(options) 548} 549