1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package scheduler 21 22import ( 23 "errors" 24 "hash/fnv" 25 "sync/atomic" 26 "time" 27 "unsafe" 28 29 "zabbix.com/internal/agent" 30 "zabbix.com/pkg/glexpr" 31 "zabbix.com/pkg/log" 32 "zabbix.com/pkg/plugin" 33 "zabbix.com/pkg/zbxlib" 34) 35 36// clientItem represents item monitored by client 37type clientItem struct { 38 itemid uint64 39 delay string 40 key string 41} 42 43// pluginInfo is used to track plugin usage by client 44type pluginInfo struct { 45 used time.Time 46 // temporary link to a watcherTask during update 47 watcher *watcherTask 48} 49 50// client represents source of items (metrics) to be queried. 51// Each server for active checks is represented by a separate client. 52// There is a predefined clients to handle: 53// all single passive checks (client id 1) 54// all internal checks (resolving HostnameItem, HostMetadataItem, HostInterfaceItem) (client id 0) 55type client struct { 56 // Client id. Predefined clients have ids < 100, while clients active checks servers (ServerActive) 57 // have auto incrementing id starting with 100. 58 id uint64 59 // A map of itemids to the associated exporter tasks. It's used to update task when item parameters change. 60 exporters map[uint64]exporterTaskAccessor 61 // plugins used by client 62 pluginsInfo map[*pluginAgent]*pluginInfo 63 // server refresh unsupported value 64 refreshUnsupported int 65 // server global regular expression bundle 66 globalRegexp unsafe.Pointer 67 // plugin result sink, can be nil for bulk passive checks (in future) 68 output plugin.ResultWriter 69} 70 71// ClientAccessor interface exports client data required for scheduler tasks. 72type ClientAccessor interface { 73 RefreshUnsupported() int 74 Output() plugin.ResultWriter 75 GlobalRegexp() *glexpr.Bundle 76 ID() uint64 77} 78 79// RefreshUnsupported returns scheduling interval for unsupported items. 80// This function is used only by scheduler, no synchronization is required. 81func (c *client) RefreshUnsupported() int { 82 return c.refreshUnsupported 83} 84 85// GlobalRegexp returns global regular expression bundle. 86// This function is used by tasks to implement ContextProvider interface. 87// It can be accessed by plugins and replaced by scheduler at the same time, 88// so pointer access must be synchronized. The global regexp contents are never changed, 89// only replaced, so pointer synchronization is enough. 90func (c *client) GlobalRegexp() *glexpr.Bundle { 91 return (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp)) 92} 93 94// ID returns client id. 95// While it's used by tasks to implement ContextProvider interface, client ID cannot 96// change, so no synchronization is required. 97func (c *client) ID() uint64 { 98 return c.id 99} 100 101// Output returns client output interface where plugins results can be written. 102// While it's used by tasks to implement ContextProvider interface, client output cannot 103// change, so no synchronization is required. 104func (c *client) Output() plugin.ResultWriter { 105 return c.output 106} 107 108// addRequest requests client to start monitoring/update item described by request 'r' using plugin 'p' (*pluginAgent) 109// with output writer 'sink' 110func (c *client) addRequest(p *pluginAgent, r *plugin.Request, sink plugin.ResultWriter, now time.Time) (err error) { 111 var info *pluginInfo 112 var ok bool 113 114 log.Debugf("[%d] adding new request for key: '%s'", c.id, r.Key) 115 116 if info, ok = c.pluginsInfo[p]; !ok { 117 info = &pluginInfo{} 118 } 119 120 // list of created tasks to be queued 121 tasks := make([]performer, 0, 6) 122 123 // handle Collector interface 124 if col, ok := p.impl.(plugin.Collector); ok { 125 if p.refcount == 0 { 126 // calculate collector seed to avoid scheduling all collectors at the same time 127 h := fnv.New32a() 128 _, _ = h.Write([]byte(p.impl.Name())) 129 task := &collectorTask{ 130 taskBase: taskBase{plugin: p, active: true, recurring: true}, 131 seed: uint64(h.Sum32())} 132 if err = task.reschedule(now); err != nil { 133 return 134 } 135 tasks = append(tasks, task) 136 log.Debugf("[%d] created collector task for plugin %s with collecting interval %d", c.id, p.name(), 137 col.Period()) 138 } 139 } 140 141 // handle Exporter interface 142 if _, ok := p.impl.(plugin.Exporter); ok { 143 var tacc exporterTaskAccessor 144 145 if c.id > agent.MaxBuiltinClientID { 146 var task *exporterTask 147 148 if _, err = zbxlib.GetNextcheck(r.Itemid, r.Delay, now, false, c.refreshUnsupported); err != nil { 149 return err 150 } 151 if tacc, ok = c.exporters[r.Itemid]; ok { 152 task = tacc.task() 153 if task.updated.Equal(now) { 154 return errors.New("duplicate itemid found") 155 } 156 if task.plugin != p { 157 // decativate current exporter task and create new one if the item key has been changed 158 // and the new metric is handled by other plugin 159 task.deactivate() 160 ok = false 161 } 162 } 163 164 if !ok { 165 // create and register new exporter task 166 task = &exporterTask{ 167 taskBase: taskBase{plugin: p, active: true, recurring: true}, 168 item: clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key}, 169 updated: now, 170 client: c, 171 output: sink, 172 } 173 if err = task.reschedule(now); err != nil { 174 return 175 } 176 c.exporters[r.Itemid] = task 177 tasks = append(tasks, task) 178 log.Debugf("[%d] created exporter task for plugin '%s' itemid:%d key '%s'", 179 c.id, p.name(), task.item.itemid, task.item.key) 180 } else { 181 // update existing exporter task 182 task = tacc.task() 183 task.updated = now 184 task.item.key = r.Key 185 if task.item.delay != r.Delay { 186 task.item.delay = r.Delay 187 if err = task.reschedule(now); err != nil { 188 return 189 } 190 p.tasks.Update(task) 191 log.Debugf("[%d] updated exporter task for plugin '%s' itemid:%d key '%s'", 192 c.id, p.name(), task.item.itemid, task.item.key) 193 } 194 } 195 task.meta.SetLastLogsize(*r.LastLogsize) 196 task.meta.SetMtime(int32(*r.Mtime)) 197 198 } else { 199 // handle single passive check or internal request 200 task := &directExporterTask{ 201 taskBase: taskBase{plugin: p, active: true, recurring: true}, 202 item: clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key}, 203 expire: now.Add(time.Duration(agent.Options.Timeout) * time.Second), 204 client: c, 205 output: sink, 206 } 207 if err = task.reschedule(now); err != nil { 208 return 209 } 210 tasks = append(tasks, task) 211 log.Debugf("[%d] created direct exporter task for plugin '%s' itemid:%d key '%s'", 212 c.id, p.name(), task.item.itemid, task.item.key) 213 } 214 } 215 216 // handle runner interface for inactive plugins 217 if _, ok := p.impl.(plugin.Runner); ok { 218 if p.refcount == 0 { 219 task := &starterTask{ 220 taskBase: taskBase{plugin: p, active: true}, 221 } 222 if err = task.reschedule(now); err != nil { 223 return 224 } 225 tasks = append(tasks, task) 226 log.Debugf("[%d] created starter task for plugin %s", c.id, p.name()) 227 } 228 } 229 230 // handle Watcher interface (not supported by single passive check or internal requests) 231 if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID { 232 if info.watcher == nil { 233 info.watcher = &watcherTask{ 234 taskBase: taskBase{plugin: p, active: true}, 235 requests: make([]*plugin.Request, 0, 1), 236 client: c, 237 } 238 if err = info.watcher.reschedule(now); err != nil { 239 return 240 } 241 tasks = append(tasks, info.watcher) 242 243 log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name()) 244 } 245 info.watcher.requests = append(info.watcher.requests, r) 246 } 247 248 // handle configurator interface for inactive plugins 249 if _, ok := p.impl.(plugin.Configurator); ok { 250 if p.refcount == 0 { 251 task := &configuratorTask{ 252 taskBase: taskBase{plugin: p, active: true}, 253 options: &agent.Options, 254 } 255 _ = task.reschedule(now) 256 tasks = append(tasks, task) 257 log.Debugf("[%d] created configurator task for plugin %s", c.id, p.name()) 258 } 259 } 260 261 for _, t := range tasks { 262 p.enqueueTask(t) 263 } 264 265 // update plugin usage information 266 if info.used.IsZero() { 267 p.refcount++ 268 c.pluginsInfo[p] = info 269 } 270 info.used = now 271 272 return nil 273} 274 275// cleanup releases unused uplugins. For external clients it's done after update, 276// while for internal clients once per hour. 277func (c *client) cleanup(plugins map[string]*pluginAgent, now time.Time) (released []*pluginAgent) { 278 released = make([]*pluginAgent, 0, len(c.pluginsInfo)) 279 // remove reference to temporary watcher tasks 280 for _, p := range c.pluginsInfo { 281 p.watcher = nil 282 } 283 284 // unmap not monitored exporter tasks 285 for _, tacc := range c.exporters { 286 task := tacc.task() 287 if task.updated.Before(now) { 288 delete(c.exporters, task.item.itemid) 289 log.Debugf("[%d] released unused exporter for itemid:%d", c.id, task.item.itemid) 290 task.deactivate() 291 } 292 } 293 294 var expiry time.Time 295 // Direct requests are handled by special clients with id <= MaxBuiltinClientID. 296 // Such requests have day+hour (to keep once per day checks without expiring) 297 // expiry time before used plugins are released. 298 if c.id > agent.MaxBuiltinClientID { 299 expiry = now 300 } else { 301 expiry = now.Add(-time.Hour * 25) 302 } 303 304 // deactivate plugins 305 for _, p := range plugins { 306 if info, ok := c.pluginsInfo[p]; ok { 307 if info.used.Before(expiry) { 308 // perform empty watch task before releasing plugin, so it could 309 // release internal resources allocated to monitor this client 310 if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID { 311 task := &watcherTask{ 312 taskBase: taskBase{plugin: p, active: true}, 313 requests: make([]*plugin.Request, 0), 314 client: c, 315 } 316 if err := task.reschedule(now); err == nil { 317 p.enqueueTask(task) 318 log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name()) 319 } else { 320 // currently watcher rescheduling cannot fail, but log a warning for future 321 log.Warningf("[%d] cannot reschedule plugin '%s' closing watcher task: %s", 322 c.id, p.impl.Name(), err) 323 } 324 } 325 326 // release plugin 327 released = append(released, p) 328 delete(c.pluginsInfo, p) 329 p.refcount-- 330 // TODO: define uniform time format 331 if c.id > agent.MaxBuiltinClientID { 332 log.Debugf("[%d] released unused plugin %s", c.id, p.name()) 333 } else { 334 log.Debugf("[%d] released plugin %s as not used since %s", c.id, p.name(), 335 info.used.Format(time.Stamp)) 336 } 337 } 338 } 339 } 340 return 341} 342 343// updateExpressions updates server global regular expression bundle 344func (c *client) updateExpressions(expressions []*glexpr.Expression) { 345 // reset expressions if changed 346 glexpr.SortExpressions(expressions) 347 var grxp *glexpr.Bundle 348 if c.globalRegexp != nil { 349 grxp = (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp)) 350 if !grxp.CompareExpressions(expressions) { 351 grxp = nil 352 } 353 } 354 355 if grxp == nil { 356 grxp = glexpr.NewBundle(expressions) 357 atomic.StorePointer(&c.globalRegexp, unsafe.Pointer(grxp)) 358 } 359} 360 361// newClient creates new client 362func newClient(id uint64, output plugin.ResultWriter) (b *client) { 363 b = &client{ 364 id: id, 365 exporters: make(map[uint64]exporterTaskAccessor), 366 pluginsInfo: make(map[*pluginAgent]*pluginInfo), 367 output: output, 368 } 369 370 return 371} 372