1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package scheduler 21 22import ( 23 "errors" 24 "fmt" 25 "reflect" 26 "time" 27 28 "zabbix.com/internal/agent" 29 "zabbix.com/pkg/itemutil" 30 "zabbix.com/pkg/log" 31 "zabbix.com/pkg/plugin" 32 "zabbix.com/pkg/zbxlib" 33) 34 35// task priority within the same second is done by setting nanosecond component 36const ( 37 priorityConfiguratorTaskNs = iota 38 priorityStarterTaskNs 39 priorityCollectorTaskNs 40 priorityWatcherTaskNs 41 priorityExporterTaskNs 42 priorityStopperTaskNs 43) 44 45// exporterTaskAccessor is used by clients to track item exporter tasks . 46type exporterTaskAccessor interface { 47 task() *exporterTask 48} 49 50// taskBase implements common task properties and functionality 51type taskBase struct { 52 plugin *pluginAgent 53 scheduled time.Time 54 index int 55 active bool 56 recurring bool 57} 58 59func (t *taskBase) getPlugin() *pluginAgent { 60 return t.plugin 61} 62 63func (t *taskBase) getScheduled() time.Time { 64 return t.scheduled 65} 66 67func (t *taskBase) getWeight() int { 68 return 1 69} 70 71func (t *taskBase) getIndex() int { 72 return t.index 73} 74 75func (t *taskBase) setIndex(index int) { 76 t.index = index 77} 78 79func (t *taskBase) deactivate() { 80 if t.index != -1 { 81 t.plugin.removeTask(t.index) 82 } 83 t.active = false 84} 85 86func (t *taskBase) isActive() bool { 87 return t.active 88} 89 90func (t *taskBase) isRecurring() bool { 91 return t.recurring 92} 93 94// collectorTask provides access to plugin Collector interaface. 95type collectorTask struct { 96 taskBase 97 seed uint64 98} 99 100func (t *collectorTask) perform(s Scheduler) { 101 log.Debugf("plugin %s: executing collector task", t.plugin.name()) 102 go func() { 103 collector, _ := t.plugin.impl.(plugin.Collector) 104 if err := collector.Collect(); err != nil { 105 log.Warningf("plugin '%s' collector failed: %s", t.plugin.impl.Name(), err.Error()) 106 } 107 s.FinishTask(t) 108 }() 109} 110 111func (t *collectorTask) reschedule(now time.Time) (err error) { 112 collector, _ := t.plugin.impl.(plugin.Collector) 113 period := int64(collector.Period()) 114 if period == 0 { 115 return fmt.Errorf("invalid collector interval 0 seconds") 116 } 117 seconds := now.Unix() 118 nextcheck := period*(seconds/period) + int64(t.seed)%period 119 for nextcheck <= seconds { 120 nextcheck += period 121 } 122 t.scheduled = time.Unix(nextcheck, priorityCollectorTaskNs) 123 return 124} 125 126func (t *collectorTask) getWeight() int { 127 return t.plugin.maxCapacity 128} 129 130// exporterTask provides access to plugin Exporter interaface. It's used 131// for active check items. 132type exporterTask struct { 133 taskBase 134 item clientItem 135 failed bool 136 updated time.Time 137 client ClientAccessor 138 meta plugin.Meta 139 output plugin.ResultWriter 140} 141 142func (t *exporterTask) perform(s Scheduler) { 143 // pass item key as parameter so it can be safely updated while task is being processed in its goroutine 144 go func(itemkey string) { 145 var result *plugin.Result 146 exporter, _ := t.plugin.impl.(plugin.Exporter) 147 now := time.Now() 148 var key string 149 var params []string 150 var err error 151 152 if key, params, err = itemutil.ParseKey(itemkey); err == nil { 153 var ret interface{} 154 log.Debugf("executing exporter task for itemid:%d key '%s'", t.item.itemid, itemkey) 155 156 if ret, err = exporter.Export(key, params, t); err == nil { 157 log.Debugf("executed exporter task for itemid:%d key '%s'", t.item.itemid, itemkey) 158 if ret != nil { 159 rt := reflect.TypeOf(ret) 160 switch rt.Kind() { 161 case reflect.Slice: 162 fallthrough 163 case reflect.Array: 164 s := reflect.ValueOf(ret) 165 for i := 0; i < s.Len(); i++ { 166 result = itemutil.ValueToResult(t.item.itemid, now, s.Index(i).Interface()) 167 t.output.Write(result) 168 } 169 default: 170 result = itemutil.ValueToResult(t.item.itemid, now, ret) 171 t.output.Write(result) 172 } 173 } 174 } else { 175 log.Debugf("failed to execute exporter task for itemid:%d key '%s' error: '%s'", 176 t.item.itemid, itemkey, err.Error()) 177 } 178 } 179 if err != nil { 180 result = &plugin.Result{Itemid: t.item.itemid, Error: err, Ts: now} 181 t.output.Write(result) 182 } 183 // set failed state based on last result 184 if result != nil && result.Error != nil { 185 log.Warningf(`check '%s' is not supported: %s`, itemkey, result.Error) 186 t.failed = true 187 } else { 188 t.failed = false 189 } 190 191 s.FinishTask(t) 192 }(t.item.key) 193} 194 195func (t *exporterTask) reschedule(now time.Time) (err error) { 196 var nextcheck time.Time 197 nextcheck, err = zbxlib.GetNextcheck(t.item.itemid, t.item.delay, now, t.failed, t.client.RefreshUnsupported()) 198 if err != nil { 199 return 200 } 201 t.scheduled = nextcheck.Add(priorityExporterTaskNs) 202 return 203} 204 205func (t *exporterTask) task() (task *exporterTask) { 206 return t 207} 208 209// plugin.ContextProvider interface 210 211func (t *exporterTask) ClientID() (clientid uint64) { 212 return t.client.ID() 213} 214 215func (t *exporterTask) Output() (output plugin.ResultWriter) { 216 return t.output 217} 218 219func (t *exporterTask) ItemID() (itemid uint64) { 220 return t.item.itemid 221} 222 223func (t *exporterTask) Meta() (meta *plugin.Meta) { 224 return &t.meta 225} 226 227func (t *exporterTask) GlobalRegexp() plugin.RegexpMatcher { 228 return t.client.GlobalRegexp() 229} 230 231// directExporterTask provides access to plugin Exporter interaface. 232// It's used for non-recurring exporter requests - single passive checks 233// and internal requests to obtain HostnameItem, HostMetadataItem, 234// HostInterfaceItem etc values. 235type directExporterTask struct { 236 taskBase 237 item clientItem 238 done bool 239 expire time.Time 240 client ClientAccessor 241 meta plugin.Meta 242 output plugin.ResultWriter 243} 244 245func (t *directExporterTask) isRecurring() bool { 246 return !t.done 247} 248func (t *directExporterTask) perform(s Scheduler) { 249 // pass item key as parameter so it can be safely updated while task is being processed in its goroutine 250 go func(itemkey string) { 251 var result *plugin.Result 252 exporter, _ := t.plugin.impl.(plugin.Exporter) 253 now := time.Now() 254 var key string 255 var params []string 256 var err error 257 258 if now.After(t.expire) { 259 err = errors.New("No data available.") 260 log.Debugf("direct exporter task expired for key '%s' error: '%s'", itemkey, err.Error()) 261 } else { 262 if key, params, err = itemutil.ParseKey(itemkey); err == nil { 263 var ret interface{} 264 log.Debugf("executing direct exporter task for key '%s'", itemkey) 265 266 if ret, err = exporter.Export(key, params, t); err == nil { 267 log.Debugf("executed direct exporter task for key '%s'", itemkey) 268 if ret != nil { 269 rt := reflect.TypeOf(ret) 270 switch rt.Kind() { 271 case reflect.Slice, reflect.Array: 272 err = errors.New("Multiple return values are not supported for single passive checks") 273 default: 274 result = itemutil.ValueToResult(t.item.itemid, now, ret) 275 t.output.Write(result) 276 t.done = true 277 } 278 } 279 } else { 280 log.Debugf("failed to execute direct exporter task for key '%s' error: '%s'", 281 itemkey, err.Error()) 282 } 283 } 284 } 285 if err != nil { 286 result = &plugin.Result{Itemid: t.item.itemid, Error: err, Ts: now} 287 t.output.Write(result) 288 t.done = true 289 } 290 291 s.FinishTask(t) 292 }(t.item.key) 293} 294 295func (t *directExporterTask) reschedule(now time.Time) (err error) { 296 if t.scheduled.IsZero() { 297 t.scheduled = time.Unix(now.Unix(), priorityExporterTaskNs) 298 } else { 299 t.scheduled = time.Unix(now.Unix()+1, priorityExporterTaskNs) 300 } 301 return 302} 303 304// plugin.ContextProvider interface 305 306func (t *directExporterTask) ClientID() (clientid uint64) { 307 return t.client.ID() 308} 309 310func (t *directExporterTask) Output() (output plugin.ResultWriter) { 311 return t.output 312} 313 314func (t *directExporterTask) ItemID() (itemid uint64) { 315 return t.item.itemid 316} 317 318func (t *directExporterTask) Meta() (meta *plugin.Meta) { 319 return &t.meta 320} 321 322func (t *directExporterTask) GlobalRegexp() plugin.RegexpMatcher { 323 return t.client.GlobalRegexp() 324} 325 326// starterTask provides access to plugin Exporter interaface Start() method. 327type starterTask struct { 328 taskBase 329} 330 331func (t *starterTask) perform(s Scheduler) { 332 log.Debugf("plugin %s: executing starter task", t.plugin.name()) 333 go func() { 334 runner, _ := t.plugin.impl.(plugin.Runner) 335 runner.Start() 336 s.FinishTask(t) 337 }() 338} 339 340func (t *starterTask) reschedule(now time.Time) (err error) { 341 t.scheduled = time.Unix(now.Unix(), priorityStarterTaskNs) 342 return 343} 344 345func (t *starterTask) getWeight() int { 346 return t.plugin.maxCapacity 347} 348 349// stopperTask provides access to plugin Exporter interaface Start() method. 350type stopperTask struct { 351 taskBase 352} 353 354func (t *stopperTask) perform(s Scheduler) { 355 log.Debugf("plugin %s: executing stopper task", t.plugin.name()) 356 go func() { 357 runner, _ := t.plugin.impl.(plugin.Runner) 358 runner.Stop() 359 s.FinishTask(t) 360 }() 361} 362 363func (t *stopperTask) reschedule(now time.Time) (err error) { 364 t.scheduled = time.Unix(now.Unix(), priorityStopperTaskNs) 365 return 366} 367 368func (t *stopperTask) getWeight() int { 369 return t.plugin.maxCapacity 370} 371 372// stopperTask provides access to plugin Watcher interaface. 373type watcherTask struct { 374 taskBase 375 requests []*plugin.Request 376 client ClientAccessor 377} 378 379func (t *watcherTask) perform(s Scheduler) { 380 log.Debugf("plugin %s: executing watcher task", t.plugin.name()) 381 go func() { 382 watcher, _ := t.plugin.impl.(plugin.Watcher) 383 watcher.Watch(t.requests, t) 384 s.FinishTask(t) 385 }() 386} 387 388func (t *watcherTask) reschedule(now time.Time) (err error) { 389 t.scheduled = time.Unix(now.Unix(), priorityWatcherTaskNs) 390 return 391} 392 393func (t *watcherTask) getWeight() int { 394 return t.plugin.maxCapacity 395} 396 397// plugin.ContextProvider interface 398 399func (t *watcherTask) ClientID() (clientid uint64) { 400 return t.client.ID() 401} 402 403func (t *watcherTask) Output() (output plugin.ResultWriter) { 404 return t.client.Output() 405} 406 407func (t *watcherTask) ItemID() (itemid uint64) { 408 return 0 409} 410 411func (t *watcherTask) Meta() (meta *plugin.Meta) { 412 return nil 413} 414 415func (t *watcherTask) GlobalRegexp() plugin.RegexpMatcher { 416 return t.client.GlobalRegexp() 417} 418 419// configuratorTask provides access to plugin Configurator interaface. 420type configuratorTask struct { 421 taskBase 422 options *agent.AgentOptions 423} 424 425func (t *configuratorTask) perform(s Scheduler) { 426 log.Debugf("plugin %s: executing configurator task", t.plugin.name()) 427 go func() { 428 config, _ := t.plugin.impl.(plugin.Configurator) 429 config.Configure(agent.GlobalOptions(t.options), t.options.Plugins[t.plugin.name()]) 430 s.FinishTask(t) 431 }() 432} 433 434func (t *configuratorTask) reschedule(now time.Time) (err error) { 435 t.scheduled = time.Unix(now.Unix(), priorityConfiguratorTaskNs) 436 return 437} 438 439func (t *configuratorTask) getWeight() int { 440 return t.plugin.maxCapacity 441} 442