1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package scheduler 21 22import ( 23 "errors" 24 "fmt" 25 "hash/fnv" 26 "sync/atomic" 27 "time" 28 "unsafe" 29 30 "zabbix.com/internal/agent" 31 "zabbix.com/pkg/glexpr" 32 "zabbix.com/pkg/log" 33 "zabbix.com/pkg/plugin" 34 "zabbix.com/pkg/zbxlib" 35) 36 37// clientItem represents item monitored by client 38type clientItem struct { 39 itemid uint64 40 delay string 41 key string 42} 43 44// pluginInfo is used to track plugin usage by client 45type pluginInfo struct { 46 used time.Time 47 // temporary link to a watcherTask during update 48 watcher *watcherTask 49} 50 51// client represents source of items (metrics) to be queried. 52// Each server for active checks is represented by a separate client. 53// There is a predefined clients to handle: 54// all single passive checks (client id 1) 55// all internal checks (resolving HostnameItem, HostMetadataItem, HostInterfaceItem) (client id 0) 56type client struct { 57 // Client id. Predefined clients have ids < 100, while clients active checks servers (ServerActive) 58 // have auto incrementing id starting with 100. 59 id uint64 60 // A map of itemids to the associated exporter tasks. It's used to update task when item parameters change. 61 exporters map[uint64]exporterTaskAccessor 62 // plugins used by client 63 pluginsInfo map[*pluginAgent]*pluginInfo 64 // server global regular expression bundle 65 globalRegexp unsafe.Pointer 66 // plugin result sink, can be nil for bulk passive checks (in future) 67 output plugin.ResultWriter 68} 69 70// ClientAccessor interface exports client data required for scheduler tasks. 71type ClientAccessor interface { 72 Output() plugin.ResultWriter 73 GlobalRegexp() *glexpr.Bundle 74 ID() uint64 75} 76 77// GlobalRegexp returns global regular expression bundle. 78// This function is used by tasks to implement ContextProvider interface. 79// It can be accessed by plugins and replaced by scheduler at the same time, 80// so pointer access must be synchronized. The global regexp contents are never changed, 81// only replaced, so pointer synchronization is enough. 82func (c *client) GlobalRegexp() *glexpr.Bundle { 83 return (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp)) 84} 85 86// ID returns client id. 87// While it's used by tasks to implement ContextProvider interface, client ID cannot 88// change, so no synchronization is required. 89func (c *client) ID() uint64 { 90 return c.id 91} 92 93// Output returns client output interface where plugins results can be written. 94// While it's used by tasks to implement ContextProvider interface, client output cannot 95// change, so no synchronization is required. 96func (c *client) Output() plugin.ResultWriter { 97 return c.output 98} 99 100// addRequest requests client to start monitoring/update item described by request 'r' using plugin 'p' (*pluginAgent) 101// with output writer 'sink' 102func (c *client) addRequest(p *pluginAgent, r *plugin.Request, sink plugin.ResultWriter, now time.Time) (err error) { 103 var info *pluginInfo 104 var ok bool 105 106 log.Debugf("[%d] adding new request for key: '%s'", c.id, r.Key) 107 108 if info, ok = c.pluginsInfo[p]; !ok { 109 info = &pluginInfo{} 110 } 111 112 // list of created tasks to be queued 113 tasks := make([]performer, 0, 6) 114 115 // handle Collector interface 116 if col, ok := p.impl.(plugin.Collector); ok { 117 if p.refcount == 0 { 118 // calculate collector seed to avoid scheduling all collectors at the same time 119 h := fnv.New32a() 120 _, _ = h.Write([]byte(p.impl.Name())) 121 task := &collectorTask{ 122 taskBase: taskBase{plugin: p, active: true, recurring: true}, 123 seed: uint64(h.Sum32())} 124 if err = task.reschedule(now); err != nil { 125 return 126 } 127 tasks = append(tasks, task) 128 log.Debugf("[%d] created collector task for plugin %s with collecting interval %d", c.id, p.name(), 129 col.Period()) 130 } 131 } 132 133 // handle Exporter interface 134 if _, ok := p.impl.(plugin.Exporter); ok { 135 var tacc exporterTaskAccessor 136 137 if c.id > agent.MaxBuiltinClientID { 138 var task *exporterTask 139 140 if _, err = zbxlib.GetNextcheck(r.Itemid, r.Delay, now); err != nil { 141 return err 142 } 143 if tacc, ok = c.exporters[r.Itemid]; ok { 144 task = tacc.task() 145 if task.updated.Equal(now) { 146 return errors.New("duplicate itemid found") 147 } 148 if task.plugin != p { 149 // decativate current exporter task and create new one if the item key has been changed 150 // and the new metric is handled by other plugin 151 task.deactivate() 152 ok = false 153 } 154 } 155 156 if !ok { 157 // create and register new exporter task 158 task = &exporterTask{ 159 taskBase: taskBase{plugin: p, active: true, recurring: true}, 160 item: clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key}, 161 updated: now, 162 client: c, 163 output: sink, 164 } 165 if err = task.reschedule(now); err != nil { 166 return 167 } 168 c.exporters[r.Itemid] = task 169 tasks = append(tasks, task) 170 log.Debugf("[%d] created exporter task for plugin '%s' itemid:%d key '%s'", 171 c.id, p.name(), task.item.itemid, task.item.key) 172 } else { 173 // update existing exporter task 174 task = tacc.task() 175 task.updated = now 176 task.item.key = r.Key 177 if task.item.delay != r.Delay { 178 task.item.delay = r.Delay 179 if err = task.reschedule(now); err != nil { 180 return 181 } 182 p.tasks.Update(task) 183 log.Debugf("[%d] updated exporter task for plugin '%s' itemid:%d key '%s'", 184 c.id, p.name(), task.item.itemid, task.item.key) 185 } 186 } 187 task.meta.SetLastLogsize(*r.LastLogsize) 188 task.meta.SetMtime(int32(*r.Mtime)) 189 190 } else { 191 // handle single passive check or internal request 192 task := &directExporterTask{ 193 taskBase: taskBase{plugin: p, active: true, recurring: true}, 194 item: clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key}, 195 expire: now.Add(time.Duration(agent.Options.Timeout) * time.Second), 196 client: c, 197 output: sink, 198 } 199 if err = task.reschedule(now); err != nil { 200 return 201 } 202 tasks = append(tasks, task) 203 log.Debugf("[%d] created direct exporter task for plugin '%s' itemid:%d key '%s'", 204 c.id, p.name(), task.item.itemid, task.item.key) 205 } 206 } else if c.id <= agent.MaxBuiltinClientID { 207 return fmt.Errorf(`The "%s" key is not supported in test or single passive check mode`, r.Key) 208 } 209 210 // handle runner interface for inactive plugins 211 if _, ok := p.impl.(plugin.Runner); ok { 212 if p.refcount == 0 { 213 task := &starterTask{ 214 taskBase: taskBase{plugin: p, active: true}, 215 } 216 if err = task.reschedule(now); err != nil { 217 return 218 } 219 tasks = append(tasks, task) 220 log.Debugf("[%d] created starter task for plugin %s", c.id, p.name()) 221 } 222 } 223 224 // handle Watcher interface (not supported by single passive check or internal requests) 225 if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID { 226 if info.watcher == nil { 227 info.watcher = &watcherTask{ 228 taskBase: taskBase{plugin: p, active: true}, 229 requests: make([]*plugin.Request, 0, 1), 230 client: c, 231 } 232 if err = info.watcher.reschedule(now); err != nil { 233 return 234 } 235 tasks = append(tasks, info.watcher) 236 237 log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name()) 238 } 239 info.watcher.requests = append(info.watcher.requests, r) 240 } 241 242 // handle configurator interface for inactive plugins 243 if _, ok := p.impl.(plugin.Configurator); ok { 244 if p.refcount == 0 { 245 task := &configuratorTask{ 246 taskBase: taskBase{plugin: p, active: true}, 247 options: &agent.Options, 248 } 249 _ = task.reschedule(now) 250 tasks = append(tasks, task) 251 log.Debugf("[%d] created configurator task for plugin %s", c.id, p.name()) 252 } 253 } 254 255 for _, t := range tasks { 256 p.enqueueTask(t) 257 } 258 259 // update plugin usage information 260 if info.used.IsZero() { 261 p.refcount++ 262 c.pluginsInfo[p] = info 263 } 264 info.used = now 265 266 return nil 267} 268 269// cleanup releases unused uplugins. For external clients it's done after update, 270// while for internal clients once per hour. 271func (c *client) cleanup(plugins map[string]*pluginAgent, now time.Time) (released []*pluginAgent) { 272 released = make([]*pluginAgent, 0, len(c.pluginsInfo)) 273 // remove reference to temporary watcher tasks 274 for _, p := range c.pluginsInfo { 275 p.watcher = nil 276 } 277 278 // unmap not monitored exporter tasks 279 for _, tacc := range c.exporters { 280 task := tacc.task() 281 if task.updated.Before(now) { 282 delete(c.exporters, task.item.itemid) 283 log.Debugf("[%d] released unused exporter for itemid:%d", c.id, task.item.itemid) 284 task.deactivate() 285 } 286 } 287 288 var expiry time.Time 289 // Direct requests are handled by special clients with id <= MaxBuiltinClientID. 290 // Such requests have day+hour (to keep once per day checks without expiring) 291 // expiry time before used plugins are released. 292 if c.id > agent.MaxBuiltinClientID { 293 expiry = now 294 } else { 295 expiry = now.Add(-time.Hour * 25) 296 } 297 298 // deactivate plugins 299 for _, p := range plugins { 300 if info, ok := c.pluginsInfo[p]; ok { 301 if info.used.Before(expiry) { 302 // perform empty watch task before releasing plugin, so it could 303 // release internal resources allocated to monitor this client 304 if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID { 305 task := &watcherTask{ 306 taskBase: taskBase{plugin: p, active: true}, 307 requests: make([]*plugin.Request, 0), 308 client: c, 309 } 310 if err := task.reschedule(now); err == nil { 311 p.enqueueTask(task) 312 log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name()) 313 } else { 314 // currently watcher rescheduling cannot fail, but log a warning for future 315 log.Warningf("[%d] cannot reschedule plugin '%s' closing watcher task: %s", 316 c.id, p.impl.Name(), err) 317 } 318 } 319 320 // release plugin 321 released = append(released, p) 322 delete(c.pluginsInfo, p) 323 p.refcount-- 324 // TODO: define uniform time format 325 if c.id > agent.MaxBuiltinClientID { 326 log.Debugf("[%d] released unused plugin %s", c.id, p.name()) 327 } else { 328 log.Debugf("[%d] released plugin %s as not used since %s", c.id, p.name(), 329 info.used.Format(time.Stamp)) 330 } 331 } 332 } 333 } 334 return 335} 336 337// updateExpressions updates server global regular expression bundle 338func (c *client) updateExpressions(expressions []*glexpr.Expression) { 339 // reset expressions if changed 340 glexpr.SortExpressions(expressions) 341 var grxp *glexpr.Bundle 342 if c.globalRegexp != nil { 343 grxp = (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp)) 344 if !grxp.CompareExpressions(expressions) { 345 grxp = nil 346 } 347 } 348 349 if grxp == nil { 350 grxp = glexpr.NewBundle(expressions) 351 atomic.StorePointer(&c.globalRegexp, unsafe.Pointer(grxp)) 352 } 353} 354 355// newClient creates new client 356func newClient(id uint64, output plugin.ResultWriter) (b *client) { 357 b = &client{ 358 id: id, 359 exporters: make(map[uint64]exporterTaskAccessor), 360 pluginsInfo: make(map[*pluginAgent]*pluginInfo), 361 output: output, 362 } 363 364 return 365} 366