1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package scheduler
21
22import (
23	"container/heap"
24	"errors"
25	"fmt"
26	"math"
27	"sort"
28	"time"
29
30	"zabbix.com/internal/agent"
31	"zabbix.com/internal/agent/alias"
32	"zabbix.com/internal/agent/keyaccess"
33	"zabbix.com/internal/monitor"
34	"zabbix.com/pkg/conf"
35	"zabbix.com/pkg/glexpr"
36	"zabbix.com/pkg/itemutil"
37	"zabbix.com/pkg/log"
38	"zabbix.com/pkg/plugin"
39)
40
41const (
42	// number of seconds to wait for plugins to finish during scheduler shutdown
43	shutdownTimeout = 5
44	// inactive shutdown value
45	shutdownInactive = -1
46)
47
48// Manager implements Scheduler interface and manages plugin interface usage.
49type Manager struct {
50	input       chan interface{}
51	plugins     map[string]*pluginAgent
52	pluginQueue pluginHeap
53	clients     map[uint64]*client
54	aliases     *alias.Manager
55	// number of active tasks (running in their own goroutines)
56	activeTasksNum int
57	// number of seconds left on shutdown timer
58	shutdownSeconds int
59}
60
61// updateRequest contains list of metrics monitored by a client and additional client configuration data.
62type updateRequest struct {
63	clientID uint64
64	sink     plugin.ResultWriter
65	requests []*plugin.Request
66	expressions []*glexpr.Expression
67}
68
69// queryRequest contains status/debug query request.
70type queryRequest struct {
71	command string
72	sink    chan string
73}
74
75type Scheduler interface {
76	UpdateTasks(clientID uint64, writer plugin.ResultWriter, expressions []*glexpr.Expression,
77		requests []*plugin.Request)
78	FinishTask(task performer)
79	PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error)
80	Query(command string) (status string)
81}
82
83// cleanupClient performs deactivation of plugins the client is not using anymore.
84// It's called after client update and once per hour for the client associated to
85// single passive checks.
86func (m *Manager) cleanupClient(c *client, now time.Time) {
87	// get a list of plugins the client stopped using
88	released := c.cleanup(m.plugins, now)
89	for _, p := range released {
90		// check if the plugin is used by other clients
91		if p.refcount != 0 {
92			continue
93		}
94		log.Debugf("[%d] deactivate unused plugin %s", c.id, p.name())
95
96		// deactivate recurring tasks
97		for deactivate := true; deactivate; {
98			deactivate = false
99			for _, t := range p.tasks {
100				if t.isActive() && t.isRecurring() {
101					t.deactivate()
102					// deactivation can change tasks ordering, so repeat the iteration if task was deactivated
103					deactivate = true
104					break
105				}
106			}
107		}
108
109		// queue stopper task if plugin has Runner interface
110		if _, ok := p.impl.(plugin.Runner); ok {
111			task := &stopperTask{
112				taskBase: taskBase{plugin: p, active: true},
113			}
114			if err := task.reschedule(now); err != nil {
115				log.Debugf("[%d] cannot schedule stopper task for plugin %s", c.id, p.name())
116				continue
117			}
118			p.enqueueTask(task)
119			log.Debugf("[%d] created stopper task for plugin %s", c.id, p.name())
120
121			if p.queued() {
122				m.pluginQueue.Update(p)
123			}
124		}
125
126		// queue plugin if there are still some tasks left to be finished before deactivating
127		if len(p.tasks) != 0 {
128			if !p.queued() {
129				heap.Push(&m.pluginQueue, p)
130			}
131		}
132	}
133}
134
135// processUpdateRequest processes client update request. It's being used for multiple requests
136// (active checks on a server) and also for direct requets (single passive and internal checks).
137func (m *Manager) processUpdateRequest(update *updateRequest, now time.Time) {
138	log.Debugf("[%d] processing update request (%d requests)", update.clientID, len(update.requests))
139
140	// immediately fail direct checks and ignore bulk requests when shutting down
141	if m.shutdownSeconds != shutdownInactive {
142		if update.clientID <= agent.MaxBuiltinClientID {
143			if len(update.requests) == 1 {
144				update.sink.Write(&plugin.Result{
145					Itemid: update.requests[0].Itemid,
146					Error:  errors.New("Cannot obtain item value during shutdown process."),
147					Ts:     now,
148				})
149			} else {
150				log.Warningf("[%d] direct checks can contain only single request while received %d requests",
151					update.clientID, len(update.requests))
152			}
153		}
154		return
155	}
156
157	var c *client
158	var ok bool
159	if c, ok = m.clients[update.clientID]; !ok {
160		if len(update.requests) == 0 {
161			log.Debugf("[%d] skipping empty update for unregistered client", update.clientID)
162			return
163		}
164		log.Debugf("[%d] registering new client", update.clientID)
165		c = newClient(update.clientID, update.sink)
166		m.clients[update.clientID] = c
167	}
168
169	c.updateExpressions(update.expressions)
170
171	for _, r := range update.requests {
172		var key string
173		var params []string
174		var err error
175		var p *pluginAgent
176
177		r.Key = m.aliases.Get(r.Key)
178		if key, params, err = itemutil.ParseKey(r.Key); err == nil {
179			p, ok = m.plugins[key]
180			if ok && update.clientID != agent.LocalChecksClientID {
181				ok = keyaccess.CheckRules(key, params)
182			}
183			if !ok {
184				err = fmt.Errorf("Unknown metric %s", key)
185			} else {
186				err = c.addRequest(p, r, update.sink, now)
187			}
188		}
189
190		if err != nil {
191			if c.id > agent.MaxBuiltinClientID {
192				if tacc, ok := c.exporters[r.Itemid]; ok {
193					log.Debugf("deactivate exporter task for item %d because of error: %s", r.Itemid, err)
194					tacc.task().deactivate()
195				}
196			}
197			update.sink.Write(&plugin.Result{Itemid: r.Itemid, Error: err, Ts: now})
198			log.Debugf("[%d] cannot monitor metric \"%s\": %s", update.clientID, r.Key, err.Error())
199			continue
200		}
201
202		if !p.queued() {
203			heap.Push(&m.pluginQueue, p)
204		} else {
205			m.pluginQueue.Update(p)
206		}
207	}
208
209	m.cleanupClient(c, now)
210}
211
212// processQueue processes queued plugins/tasks
213func (m *Manager) processQueue(now time.Time) {
214	seconds := now.Unix()
215	for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() {
216		if task := p.peekTask(); task != nil {
217			if task.getScheduled().Unix() > seconds {
218				break
219			}
220
221			heap.Pop(&m.pluginQueue)
222			if !p.hasCapacity() {
223				// plugin has no free capacity for the next task, keep the plugin out of queue
224				// until active tasks finishes and the required capacity is released
225				continue
226			}
227
228			// take the task out of plugin tasks queue and perform it
229			m.activeTasksNum++
230			p.reserveCapacity(p.popTask())
231			task.perform(m)
232
233			// if the plugin has capacity for the next task put it back into plugin queue
234			if !p.hasCapacity() {
235				continue
236			}
237			heap.Push(&m.pluginQueue, p)
238		} else {
239			// plugins with empty task queue should not be in Manager queue
240			heap.Pop(&m.pluginQueue)
241		}
242	}
243}
244
245// processFinishRequest handles finished tasks
246func (m *Manager) processFinishRequest(task performer) {
247	m.activeTasksNum--
248	p := task.getPlugin()
249	p.releaseCapacity(task)
250	if p.active() && task.isActive() && task.isRecurring() {
251		if err := task.reschedule(time.Now()); err != nil {
252			log.Warningf("cannot reschedule plugin %s: %s", p.impl.Name(), err)
253		} else {
254			p.enqueueTask(task)
255		}
256	}
257	if !p.queued() && p.hasCapacity() {
258		heap.Push(&m.pluginQueue, p)
259	}
260}
261
262// rescheduleQueue reschedules all queued tasks. This is done whenever time
263// difference between ticks exceeds limits (for example during daylight saving changes).
264func (m *Manager) rescheduleQueue(now time.Time) {
265	// easier to rebuild queues than update each element
266	queue := make(pluginHeap, 0, len(m.pluginQueue))
267	for _, p := range m.pluginQueue {
268		tasks := p.tasks
269		p.tasks = make(performerHeap, 0, len(tasks))
270		for _, t := range tasks {
271			if err := t.reschedule(now); err == nil {
272				p.enqueueTask(t)
273			}
274		}
275		heap.Push(&queue, p)
276	}
277	m.pluginQueue = queue
278}
279
280// deactivatePlugins removes all tasks and creates stopper tasks for active runner plugins
281func (m *Manager) deactivatePlugins() {
282	m.shutdownSeconds = shutdownTimeout
283
284	m.pluginQueue = make(pluginHeap, 0, len(m.pluginQueue))
285	for _, p := range m.plugins {
286		if p.refcount != 0 {
287			p.tasks = make(performerHeap, 0)
288			if _, ok := p.impl.(plugin.Runner); ok {
289				task := &stopperTask{
290					taskBase: taskBase{plugin: p, active: true},
291				}
292				p.enqueueTask(task)
293				heap.Push(&m.pluginQueue, p)
294				p.refcount = 0
295				log.Debugf("created final stopper task for plugin %s", p.name())
296			}
297			p.refcount = 0
298		}
299	}
300}
301
302// run() is the main worker loop running in own goroutine until stopped
303func (m *Manager) run() {
304	defer log.PanicHook()
305	log.Debugf("starting manager")
306	// Adjust ticker creation at the 0 nanosecond timestamp. In reality it will have at least
307	// some microseconds, which will be enough to include all scheduled tasks at this second
308	// even with nanosecond priority adjustment.
309	lastTick := time.Now()
310	cleaned := lastTick
311	time.Sleep(time.Duration(1e9 - lastTick.Nanosecond()))
312	ticker := time.NewTicker(time.Second)
313run:
314	for {
315		select {
316		case <-ticker.C:
317			now := time.Now()
318			diff := now.Sub(lastTick)
319			interval := time.Second * 10
320			if diff <= -interval || diff >= interval {
321				log.Warningf("detected %d time difference between queue checks, rescheduling tasks",
322					int(math.Abs(float64(diff))/1e9))
323				m.rescheduleQueue(now)
324			}
325			lastTick = now
326			m.processQueue(now)
327			if m.shutdownSeconds != shutdownInactive {
328				m.shutdownSeconds--
329				if m.shutdownSeconds == 0 {
330					break run
331				}
332			} else {
333				// cleanup plugins used by passive checks
334				if now.Sub(cleaned) >= time.Hour {
335					if passive, ok := m.clients[0]; ok {
336						m.cleanupClient(passive, now)
337					}
338					// remove inactive clients
339					for _, client := range m.clients {
340						if len(client.pluginsInfo) == 0 {
341							delete(m.clients, client.ID())
342						}
343					}
344					cleaned = now
345				}
346			}
347		case u := <-m.input:
348			if u == nil {
349				m.deactivatePlugins()
350				if m.activeTasksNum+len(m.pluginQueue) == 0 {
351					break run
352				}
353				m.processQueue(time.Now())
354			}
355			switch v := u.(type) {
356			case *updateRequest:
357				m.processUpdateRequest(v, time.Now())
358				m.processQueue(time.Now())
359			case performer:
360				m.processFinishRequest(v)
361				if m.shutdownSeconds != shutdownInactive && m.activeTasksNum+len(m.pluginQueue) == 0 {
362					break run
363				}
364				m.processQueue(time.Now())
365			case *queryRequest:
366				if response, err := m.processQuery(v); err != nil {
367					v.sink <- "cannot process request: " + err.Error()
368				} else {
369					v.sink <- response
370				}
371			}
372		}
373	}
374	log.Debugf("manager has been stopped")
375	monitor.Unregister(monitor.Scheduler)
376}
377
378type pluginCapacity struct {
379	Capacity int `conf:"optional"`
380}
381
382func (m *Manager) init() {
383	m.input = make(chan interface{}, 10)
384	m.pluginQueue = make(pluginHeap, 0, len(plugin.Metrics))
385	m.clients = make(map[uint64]*client)
386	m.plugins = make(map[string]*pluginAgent)
387	m.shutdownSeconds = shutdownInactive
388
389	metrics := make([]*plugin.Metric, 0, len(plugin.Metrics))
390
391	for _, metric := range plugin.Metrics {
392		metrics = append(metrics, metric)
393	}
394	sort.Slice(metrics, func(i, j int) bool {
395		return metrics[i].Plugin.Name() < metrics[j].Plugin.Name()
396	})
397
398	pagent := &pluginAgent{}
399	for _, metric := range metrics {
400		if metric.Plugin != pagent.impl {
401			capacity := metric.Plugin.Capacity()
402			var opts pluginCapacity
403			optsRaw := agent.Options.Plugins[metric.Plugin.Name()]
404			if optsRaw != nil {
405				if err := conf.Unmarshal(optsRaw, &opts, false); err != nil {
406					log.Warningf("invalid plugin %s configuration: %s", metric.Plugin.Name(), err)
407					log.Warningf("using default plugin capacity settings: %d", plugin.DefaultCapacity)
408					capacity = plugin.DefaultCapacity
409				} else {
410					if opts.Capacity != 0 {
411						capacity = opts.Capacity
412					}
413				}
414			}
415
416			if capacity > metric.Plugin.Capacity() {
417				log.Warningf("lowering the plugin %s capacity to %d as the configured capacity %d exceeds limits",
418					metric.Plugin.Name(), metric.Plugin.Capacity(), capacity)
419				capacity = metric.Plugin.Capacity()
420			}
421
422			pagent = &pluginAgent{
423				impl:         metric.Plugin,
424				tasks:        make(performerHeap, 0),
425				maxCapacity:  capacity,
426				usedCapacity: 0,
427				index:        -1,
428				refcount:     0,
429			}
430
431			interfaces := ""
432			if _, ok := metric.Plugin.(plugin.Exporter); ok {
433				interfaces += "exporter, "
434			}
435			if _, ok := metric.Plugin.(plugin.Collector); ok {
436				interfaces += "collector, "
437			}
438			if _, ok := metric.Plugin.(plugin.Runner); ok {
439				interfaces += "runner, "
440			}
441			if _, ok := metric.Plugin.(plugin.Watcher); ok {
442				interfaces += "watcher, "
443			}
444			if _, ok := metric.Plugin.(plugin.Configurator); ok {
445				interfaces += "configurator, "
446			}
447			interfaces = interfaces[:len(interfaces)-2]
448			log.Infof("using plugin '%s' providing following interfaces: %s", metric.Plugin.Name(), interfaces)
449		}
450		m.plugins[metric.Key] = pagent
451	}
452}
453func (m *Manager) Start() {
454	monitor.Register(monitor.Scheduler)
455	go m.run()
456}
457
458func (m *Manager) Stop() {
459	m.input <- nil
460}
461
462func (m *Manager) UpdateTasks(clientID uint64, writer plugin.ResultWriter,
463	expressions []*glexpr.Expression, requests []*plugin.Request) {
464
465	m.input <- &updateRequest{clientID: clientID,
466		sink:     writer,
467		requests: requests,
468		expressions: expressions,
469	}
470}
471
472type resultWriter chan *plugin.Result
473
474func (r resultWriter) Write(result *plugin.Result) {
475	r <- result
476}
477
478func (r resultWriter) Flush() {
479}
480
481func (r resultWriter) SlotsAvailable() int {
482	return 1
483}
484
485func (r resultWriter) PersistSlotsAvailable() int {
486	return 1
487}
488
489func (m *Manager) PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) {
490	var lastLogsize uint64
491	var mtime int
492
493	w := make(resultWriter, 1)
494
495	m.UpdateTasks(clientID, w, nil, []*plugin.Request{{Key: key, LastLogsize: &lastLogsize, Mtime: &mtime}})
496
497	select {
498	case r := <-w:
499		if r.Error == nil {
500			if r.Value != nil {
501				result = *r.Value
502			} else {
503				// single metric requests do not support empty values, return error instead
504				err = errors.New("No values have been gathered yet.")
505			}
506		} else {
507			err = r.Error
508		}
509	case <-time.After(timeout):
510		err = fmt.Errorf("Timeout occurred while gathering data.")
511	}
512	return
513}
514
515func (m *Manager) FinishTask(task performer) {
516	m.input <- task
517}
518
519func (m *Manager) Query(command string) (status string) {
520	request := &queryRequest{command: command, sink: make(chan string)}
521	m.input <- request
522	return <-request.sink
523}
524
525func (m *Manager) validatePlugins(options *agent.AgentOptions) (err error) {
526	for _, p := range plugin.Plugins {
527		if c, ok := p.(plugin.Configurator); ok {
528			if err = c.Validate(options.Plugins[p.Name()]); err != nil {
529				return fmt.Errorf("invalid plugin %s configuration: %s", p.Name(), err)
530			}
531		}
532	}
533	return
534}
535
536func (m *Manager) configure(options *agent.AgentOptions) (err error) {
537	m.aliases, err = alias.NewManager(options)
538	return
539}
540
541func NewManager(options *agent.AgentOptions) (mannager *Manager, err error) {
542	var m Manager
543	m.init()
544	if err = m.validatePlugins(options); err != nil {
545		return
546	}
547	return &m, m.configure(options)
548}
549