1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package scheduler
21
22import (
23	"errors"
24	"fmt"
25	"reflect"
26	"time"
27
28	"zabbix.com/internal/agent"
29	"zabbix.com/pkg/itemutil"
30	"zabbix.com/pkg/log"
31	"zabbix.com/pkg/plugin"
32	"zabbix.com/pkg/zbxlib"
33)
34
35// task priority within the same second is done by setting nanosecond component
36const (
37	priorityConfiguratorTaskNs = iota
38	priorityStarterTaskNs
39	priorityCollectorTaskNs
40	priorityWatcherTaskNs
41	priorityExporterTaskNs
42	priorityStopperTaskNs
43)
44
45// exporterTaskAccessor is used by clients to track item exporter tasks .
46type exporterTaskAccessor interface {
47	task() *exporterTask
48}
49
50// taskBase implements common task properties and functionality
51type taskBase struct {
52	plugin    *pluginAgent
53	scheduled time.Time
54	index     int
55	active    bool
56	recurring bool
57}
58
59func (t *taskBase) getPlugin() *pluginAgent {
60	return t.plugin
61}
62
63func (t *taskBase) getScheduled() time.Time {
64	return t.scheduled
65}
66
67func (t *taskBase) getWeight() int {
68	return 1
69}
70
71func (t *taskBase) getIndex() int {
72	return t.index
73}
74
75func (t *taskBase) setIndex(index int) {
76	t.index = index
77}
78
79func (t *taskBase) deactivate() {
80	if t.index != -1 {
81		t.plugin.removeTask(t.index)
82	}
83	t.active = false
84}
85
86func (t *taskBase) isActive() bool {
87	return t.active
88}
89
90func (t *taskBase) isRecurring() bool {
91	return t.recurring
92}
93
94// collectorTask provides access to plugin Collector interaface.
95type collectorTask struct {
96	taskBase
97	seed uint64
98}
99
100func (t *collectorTask) perform(s Scheduler) {
101	log.Debugf("plugin %s: executing collector task", t.plugin.name())
102	go func() {
103		collector, _ := t.plugin.impl.(plugin.Collector)
104		if err := collector.Collect(); err != nil {
105			log.Warningf("plugin '%s' collector failed: %s", t.plugin.impl.Name(), err.Error())
106		}
107		s.FinishTask(t)
108	}()
109}
110
111func (t *collectorTask) reschedule(now time.Time) (err error) {
112	collector, _ := t.plugin.impl.(plugin.Collector)
113	period := int64(collector.Period())
114	if period == 0 {
115		return fmt.Errorf("invalid collector interval 0 seconds")
116	}
117	seconds := now.Unix()
118	nextcheck := period*(seconds/period) + int64(t.seed)%period
119	for nextcheck <= seconds {
120		nextcheck += period
121	}
122	t.scheduled = time.Unix(nextcheck, priorityCollectorTaskNs)
123	return
124}
125
126func (t *collectorTask) getWeight() int {
127	return t.plugin.maxCapacity
128}
129
130// exporterTask provides access to plugin Exporter interaface. It's used
131// for active check items.
132type exporterTask struct {
133	taskBase
134	item    clientItem
135	failed  bool
136	updated time.Time
137	client  ClientAccessor
138	meta    plugin.Meta
139	output  plugin.ResultWriter
140}
141
142func (t *exporterTask) perform(s Scheduler) {
143	// pass item key as parameter so it can be safely updated while task is being processed in its goroutine
144	go func(itemkey string) {
145		var result *plugin.Result
146		exporter, _ := t.plugin.impl.(plugin.Exporter)
147		now := time.Now()
148		var key string
149		var params []string
150		var err error
151
152		if key, params, err = itemutil.ParseKey(itemkey); err == nil {
153			var ret interface{}
154			log.Debugf("executing exporter task for itemid:%d key '%s'", t.item.itemid, itemkey)
155
156			if ret, err = exporter.Export(key, params, t); err == nil {
157				log.Debugf("executed exporter task for itemid:%d key '%s'", t.item.itemid, itemkey)
158				if ret != nil {
159					rt := reflect.TypeOf(ret)
160					switch rt.Kind() {
161					case reflect.Slice:
162						fallthrough
163					case reflect.Array:
164						s := reflect.ValueOf(ret)
165						for i := 0; i < s.Len(); i++ {
166							result = itemutil.ValueToResult(t.item.itemid, now, s.Index(i).Interface())
167							t.output.Write(result)
168						}
169					default:
170						result = itemutil.ValueToResult(t.item.itemid, now, ret)
171						t.output.Write(result)
172					}
173				}
174			} else {
175				log.Debugf("failed to execute exporter task for itemid:%d key '%s' error: '%s'",
176					t.item.itemid, itemkey, err.Error())
177			}
178		}
179		if err != nil {
180			result = &plugin.Result{Itemid: t.item.itemid, Error: err, Ts: now}
181			t.output.Write(result)
182		}
183		// set failed state based on last result
184		if result != nil && result.Error != nil {
185			log.Warningf(`check '%s' is not supported: %s`, itemkey, result.Error)
186			t.failed = true
187		} else {
188			t.failed = false
189		}
190
191		s.FinishTask(t)
192	}(t.item.key)
193}
194
195func (t *exporterTask) reschedule(now time.Time) (err error) {
196	var nextcheck time.Time
197	nextcheck, err = zbxlib.GetNextcheck(t.item.itemid, t.item.delay, now)
198	if err != nil {
199		return
200	}
201	t.scheduled = nextcheck.Add(priorityExporterTaskNs)
202	return
203}
204
205func (t *exporterTask) task() (task *exporterTask) {
206	return t
207}
208
209// plugin.ContextProvider interface
210
211func (t *exporterTask) ClientID() (clientid uint64) {
212	return t.client.ID()
213}
214
215func (t *exporterTask) Output() (output plugin.ResultWriter) {
216	return t.output
217}
218
219func (t *exporterTask) ItemID() (itemid uint64) {
220	return t.item.itemid
221}
222
223func (t *exporterTask) Meta() (meta *plugin.Meta) {
224	return &t.meta
225}
226
227func (t *exporterTask) GlobalRegexp() plugin.RegexpMatcher {
228	return t.client.GlobalRegexp()
229}
230
231// directExporterTask provides access to plugin Exporter interaface.
232// It's used for non-recurring exporter requests - single passive checks
233// and internal requests to obtain HostnameItem, HostMetadataItem,
234// HostInterfaceItem etc values.
235type directExporterTask struct {
236	taskBase
237	item   clientItem
238	done   bool
239	expire time.Time
240	client ClientAccessor
241	meta   plugin.Meta
242	output plugin.ResultWriter
243}
244
245func (t *directExporterTask) isRecurring() bool {
246	return !t.done
247}
248func (t *directExporterTask) perform(s Scheduler) {
249	// pass item key as parameter so it can be safely updated while task is being processed in its goroutine
250	go func(itemkey string) {
251		var result *plugin.Result
252		exporter, _ := t.plugin.impl.(plugin.Exporter)
253		now := time.Now()
254		var key string
255		var params []string
256		var err error
257
258		if now.After(t.expire) {
259			err = errors.New("No data available.")
260			log.Debugf("direct exporter task expired for key '%s' error: '%s'", itemkey, err.Error())
261		} else {
262			if key, params, err = itemutil.ParseKey(itemkey); err == nil {
263				var ret interface{}
264				log.Debugf("executing direct exporter task for key '%s'", itemkey)
265
266				if ret, err = exporter.Export(key, params, t); err == nil {
267					log.Debugf("executed direct exporter task for key '%s'", itemkey)
268					if ret != nil {
269						rt := reflect.TypeOf(ret)
270						switch rt.Kind() {
271						case reflect.Slice, reflect.Array:
272							err = errors.New("Multiple return values are not supported for single passive checks")
273						default:
274							result = itemutil.ValueToResult(t.item.itemid, now, ret)
275							t.output.Write(result)
276							t.done = true
277						}
278					}
279				} else {
280					log.Debugf("failed to execute direct exporter task for key '%s' error: '%s'",
281						itemkey, err.Error())
282				}
283			}
284		}
285		if err != nil {
286			result = &plugin.Result{Itemid: t.item.itemid, Error: err, Ts: now}
287			t.output.Write(result)
288			t.done = true
289		}
290
291		s.FinishTask(t)
292	}(t.item.key)
293}
294
295func (t *directExporterTask) reschedule(now time.Time) (err error) {
296	if t.scheduled.IsZero() {
297		t.scheduled = time.Unix(now.Unix(), priorityExporterTaskNs)
298	} else {
299		t.scheduled = time.Unix(now.Unix()+1, priorityExporterTaskNs)
300	}
301	return
302}
303
304// plugin.ContextProvider interface
305
306func (t *directExporterTask) ClientID() (clientid uint64) {
307	return t.client.ID()
308}
309
310func (t *directExporterTask) Output() (output plugin.ResultWriter) {
311	return t.output
312}
313
314func (t *directExporterTask) ItemID() (itemid uint64) {
315	return t.item.itemid
316}
317
318func (t *directExporterTask) Meta() (meta *plugin.Meta) {
319	return &t.meta
320}
321
322func (t *directExporterTask) GlobalRegexp() plugin.RegexpMatcher {
323	return t.client.GlobalRegexp()
324}
325
326// starterTask provides access to plugin Exporter interaface Start() method.
327type starterTask struct {
328	taskBase
329}
330
331func (t *starterTask) perform(s Scheduler) {
332	log.Debugf("plugin %s: executing starter task", t.plugin.name())
333	go func() {
334		runner, _ := t.plugin.impl.(plugin.Runner)
335		runner.Start()
336		s.FinishTask(t)
337	}()
338}
339
340func (t *starterTask) reschedule(now time.Time) (err error) {
341	t.scheduled = time.Unix(now.Unix(), priorityStarterTaskNs)
342	return
343}
344
345func (t *starterTask) getWeight() int {
346	return t.plugin.maxCapacity
347}
348
349// stopperTask provides access to plugin Exporter interaface Start() method.
350type stopperTask struct {
351	taskBase
352}
353
354func (t *stopperTask) perform(s Scheduler) {
355	log.Debugf("plugin %s: executing stopper task", t.plugin.name())
356	go func() {
357		runner, _ := t.plugin.impl.(plugin.Runner)
358		runner.Stop()
359		s.FinishTask(t)
360	}()
361}
362
363func (t *stopperTask) reschedule(now time.Time) (err error) {
364	t.scheduled = time.Unix(now.Unix(), priorityStopperTaskNs)
365	return
366}
367
368func (t *stopperTask) getWeight() int {
369	return t.plugin.maxCapacity
370}
371
372// stopperTask provides access to plugin Watcher interaface.
373type watcherTask struct {
374	taskBase
375	requests []*plugin.Request
376	client   ClientAccessor
377}
378
379func (t *watcherTask) perform(s Scheduler) {
380	log.Debugf("plugin %s: executing watcher task", t.plugin.name())
381	go func() {
382		watcher, _ := t.plugin.impl.(plugin.Watcher)
383		watcher.Watch(t.requests, t)
384		s.FinishTask(t)
385	}()
386}
387
388func (t *watcherTask) reschedule(now time.Time) (err error) {
389	t.scheduled = time.Unix(now.Unix(), priorityWatcherTaskNs)
390	return
391}
392
393func (t *watcherTask) getWeight() int {
394	return t.plugin.maxCapacity
395}
396
397// plugin.ContextProvider interface
398
399func (t *watcherTask) ClientID() (clientid uint64) {
400	return t.client.ID()
401}
402
403func (t *watcherTask) Output() (output plugin.ResultWriter) {
404	return t.client.Output()
405}
406
407func (t *watcherTask) ItemID() (itemid uint64) {
408	return 0
409}
410
411func (t *watcherTask) Meta() (meta *plugin.Meta) {
412	return nil
413}
414
415func (t *watcherTask) GlobalRegexp() plugin.RegexpMatcher {
416	return t.client.GlobalRegexp()
417}
418
419// configuratorTask provides access to plugin Configurator interaface.
420type configuratorTask struct {
421	taskBase
422	options *agent.AgentOptions
423}
424
425func (t *configuratorTask) perform(s Scheduler) {
426	log.Debugf("plugin %s: executing configurator task", t.plugin.name())
427	go func() {
428		config, _ := t.plugin.impl.(plugin.Configurator)
429		config.Configure(agent.GlobalOptions(t.options), t.options.Plugins[t.plugin.name()])
430		s.FinishTask(t)
431	}()
432}
433
434func (t *configuratorTask) reschedule(now time.Time) (err error) {
435	t.scheduled = time.Unix(now.Unix(), priorityConfiguratorTaskNs)
436	return
437}
438
439func (t *configuratorTask) getWeight() int {
440	return t.plugin.maxCapacity
441}
442