1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package scheduler
21
22import (
23	"container/heap"
24	"errors"
25	"fmt"
26	"math"
27	"sort"
28	"time"
29
30	"zabbix.com/internal/agent"
31	"zabbix.com/internal/agent/alias"
32	"zabbix.com/internal/agent/keyaccess"
33	"zabbix.com/internal/monitor"
34	"zabbix.com/pkg/conf"
35	"zabbix.com/pkg/glexpr"
36	"zabbix.com/pkg/itemutil"
37	"zabbix.com/pkg/log"
38	"zabbix.com/pkg/plugin"
39)
40
41const (
42	// number of seconds to wait for plugins to finish during scheduler shutdown
43	shutdownTimeout = 5
44	// inactive shutdown value
45	shutdownInactive = -1
46)
47
48// Manager implements Scheduler interface and manages plugin interface usage.
49type Manager struct {
50	input       chan interface{}
51	plugins     map[string]*pluginAgent
52	pluginQueue pluginHeap
53	clients     map[uint64]*client
54	aliases     *alias.Manager
55	// number of active tasks (running in their own goroutines)
56	activeTasksNum int
57	// number of seconds left on shutdown timer
58	shutdownSeconds int
59}
60
61// updateRequest contains list of metrics monitored by a client and additional client configuration data.
62type updateRequest struct {
63	clientID           uint64
64	sink               plugin.ResultWriter
65	requests           []*plugin.Request
66	refreshUnsupported int
67	expressions        []*glexpr.Expression
68}
69
70// queryRequest contains status/debug query request.
71type queryRequest struct {
72	command string
73	sink    chan string
74}
75
76type Scheduler interface {
77	UpdateTasks(clientID uint64, writer plugin.ResultWriter, refreshUnsupported int, expressions []*glexpr.Expression,
78		requests []*plugin.Request)
79	FinishTask(task performer)
80	PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error)
81	Query(command string) (status string)
82}
83
84// cleanupClient performs deactivation of plugins the client is not using anymore.
85// It's called after client update and once per hour for the client associated to
86// single passive checks.
87func (m *Manager) cleanupClient(c *client, now time.Time) {
88	// get a list of plugins the client stopped using
89	released := c.cleanup(m.plugins, now)
90	for _, p := range released {
91		// check if the plugin is used by other clients
92		if p.refcount != 0 {
93			continue
94		}
95		log.Debugf("[%d] deactivate unused plugin %s", c.id, p.name())
96
97		// deactivate recurring tasks
98		for deactivate := true; deactivate; {
99			deactivate = false
100			for _, t := range p.tasks {
101				if t.isActive() && t.isRecurring() {
102					t.deactivate()
103					// deactivation can change tasks ordering, so repeat the iteration if task was deactivated
104					deactivate = true
105					break
106				}
107			}
108		}
109
110		// queue stopper task if plugin has Runner interface
111		if _, ok := p.impl.(plugin.Runner); ok {
112			task := &stopperTask{
113				taskBase: taskBase{plugin: p, active: true},
114			}
115			if err := task.reschedule(now); err != nil {
116				log.Debugf("[%d] cannot schedule stopper task for plugin %s", c.id, p.name())
117				continue
118			}
119			p.enqueueTask(task)
120			log.Debugf("[%d] created stopper task for plugin %s", c.id, p.name())
121
122			if p.queued() {
123				m.pluginQueue.Update(p)
124			}
125		}
126
127		// queue plugin if there are still some tasks left to be finished before deactivating
128		if len(p.tasks) != 0 {
129			if !p.queued() {
130				heap.Push(&m.pluginQueue, p)
131			}
132		}
133	}
134}
135
136// processUpdateRequest processes client update request. It's being used for multiple requests
137// (active checks on a server) and also for direct requets (single passive and internal checks).
138func (m *Manager) processUpdateRequest(update *updateRequest, now time.Time) {
139	log.Debugf("[%d] processing update request (%d requests)", update.clientID, len(update.requests))
140
141	// immediately fail direct checks and ignore bulk requests when shutting down
142	if m.shutdownSeconds != shutdownInactive {
143		if update.clientID <= agent.MaxBuiltinClientID {
144			if len(update.requests) == 1 {
145				update.sink.Write(&plugin.Result{
146					Itemid: update.requests[0].Itemid,
147					Error:  errors.New("Cannot obtain item value during shutdown process."),
148					Ts:     now,
149				})
150			} else {
151				log.Warningf("[%d] direct checks can contain only single request while received %d requests",
152					update.clientID, len(update.requests))
153			}
154		}
155		return
156	}
157
158	var c *client
159	var ok bool
160	if c, ok = m.clients[update.clientID]; !ok {
161		if len(update.requests) == 0 {
162			log.Debugf("[%d] skipping empty update for unregistered client", update.clientID)
163			return
164		}
165		log.Debugf("[%d] registering new client", update.clientID)
166		c = newClient(update.clientID, update.sink)
167		m.clients[update.clientID] = c
168	}
169
170	c.refreshUnsupported = update.refreshUnsupported
171	c.updateExpressions(update.expressions)
172
173	for _, r := range update.requests {
174		var key string
175		var params []string
176		var err error
177		var p *pluginAgent
178
179		r.Key = m.aliases.Get(r.Key)
180		if key, params, err = itemutil.ParseKey(r.Key); err == nil {
181			p, ok = m.plugins[key]
182			if ok && update.clientID != agent.LocalChecksClientID {
183				ok = keyaccess.CheckRules(key, params)
184			}
185			if !ok {
186				err = fmt.Errorf("Unknown metric %s", key)
187			} else {
188				err = c.addRequest(p, r, update.sink, now)
189			}
190		}
191
192		if err != nil {
193			if c.id > agent.MaxBuiltinClientID {
194				if tacc, ok := c.exporters[r.Itemid]; ok {
195					log.Debugf("deactivate exporter task for item %d because of error: %s", r.Itemid, err)
196					tacc.task().deactivate()
197				}
198			}
199			update.sink.Write(&plugin.Result{Itemid: r.Itemid, Error: err, Ts: now})
200			log.Debugf("[%d] cannot monitor metric \"%s\": %s", update.clientID, r.Key, err.Error())
201			continue
202		}
203
204		if !p.queued() {
205			heap.Push(&m.pluginQueue, p)
206		} else {
207			m.pluginQueue.Update(p)
208		}
209	}
210
211	m.cleanupClient(c, now)
212}
213
214// processQueue processes queued plugins/tasks
215func (m *Manager) processQueue(now time.Time) {
216	seconds := now.Unix()
217	for p := m.pluginQueue.Peek(); p != nil; p = m.pluginQueue.Peek() {
218		if task := p.peekTask(); task != nil {
219			if task.getScheduled().Unix() > seconds {
220				break
221			}
222
223			heap.Pop(&m.pluginQueue)
224			if !p.hasCapacity() {
225				// plugin has no free capacity for the next task, keep the plugin out of queue
226				// until active tasks finishes and the required capacity is released
227				continue
228			}
229
230			// take the task out of plugin tasks queue and perform it
231			m.activeTasksNum++
232			p.reserveCapacity(p.popTask())
233			task.perform(m)
234
235			// if the plugin has capacity for the next task put it back into plugin queue
236			if !p.hasCapacity() {
237				continue
238			}
239			heap.Push(&m.pluginQueue, p)
240		} else {
241			// plugins with empty task queue should not be in Manager queue
242			heap.Pop(&m.pluginQueue)
243		}
244	}
245}
246
247// processFinishRequest handles finished tasks
248func (m *Manager) processFinishRequest(task performer) {
249	m.activeTasksNum--
250	p := task.getPlugin()
251	p.releaseCapacity(task)
252	if p.active() && task.isActive() && task.isRecurring() {
253		if err := task.reschedule(time.Now()); err != nil {
254			log.Warningf("cannot reschedule plugin %s: %s", p.impl.Name(), err)
255		} else {
256			p.enqueueTask(task)
257		}
258	}
259	if !p.queued() && p.hasCapacity() {
260		heap.Push(&m.pluginQueue, p)
261	}
262}
263
264// rescheduleQueue reschedules all queued tasks. This is done whenever time
265// difference between ticks exceeds limits (for example during daylight saving changes).
266func (m *Manager) rescheduleQueue(now time.Time) {
267	// easier to rebuild queues than update each element
268	queue := make(pluginHeap, 0, len(m.pluginQueue))
269	for _, p := range m.pluginQueue {
270		tasks := p.tasks
271		p.tasks = make(performerHeap, 0, len(tasks))
272		for _, t := range tasks {
273			if err := t.reschedule(now); err == nil {
274				p.enqueueTask(t)
275			}
276		}
277		heap.Push(&queue, p)
278	}
279	m.pluginQueue = queue
280}
281
282// deactivatePlugins removes all tasks and creates stopper tasks for active runner plugins
283func (m *Manager) deactivatePlugins() {
284	m.shutdownSeconds = shutdownTimeout
285
286	m.pluginQueue = make(pluginHeap, 0, len(m.pluginQueue))
287	for _, p := range m.plugins {
288		if p.refcount != 0 {
289			p.tasks = make(performerHeap, 0)
290			if _, ok := p.impl.(plugin.Runner); ok {
291				task := &stopperTask{
292					taskBase: taskBase{plugin: p, active: true},
293				}
294				p.enqueueTask(task)
295				heap.Push(&m.pluginQueue, p)
296				p.refcount = 0
297				log.Debugf("created final stopper task for plugin %s", p.name())
298			}
299			p.refcount = 0
300		}
301	}
302}
303
304// run() is the main worker loop running in own goroutine until stopped
305func (m *Manager) run() {
306	defer log.PanicHook()
307	log.Debugf("starting manager")
308	// Adjust ticker creation at the 0 nanosecond timestamp. In reality it will have at least
309	// some microseconds, which will be enough to include all scheduled tasks at this second
310	// even with nanosecond priority adjustment.
311	lastTick := time.Now()
312	cleaned := lastTick
313	time.Sleep(time.Duration(1e9 - lastTick.Nanosecond()))
314	ticker := time.NewTicker(time.Second)
315run:
316	for {
317		select {
318		case <-ticker.C:
319			now := time.Now()
320			diff := now.Sub(lastTick)
321			interval := time.Second * 10
322			if diff <= -interval || diff >= interval {
323				log.Warningf("detected %d time difference between queue checks, rescheduling tasks",
324					int(math.Abs(float64(diff))/1e9))
325				m.rescheduleQueue(now)
326			}
327			lastTick = now
328			m.processQueue(now)
329			if m.shutdownSeconds != shutdownInactive {
330				m.shutdownSeconds--
331				if m.shutdownSeconds == 0 {
332					break run
333				}
334			} else {
335				// cleanup plugins used by passive checks
336				if now.Sub(cleaned) >= time.Hour {
337					if passive, ok := m.clients[0]; ok {
338						m.cleanupClient(passive, now)
339					}
340					// remove inactive clients
341					for _, client := range m.clients {
342						if len(client.pluginsInfo) == 0 {
343							delete(m.clients, client.ID())
344						}
345					}
346					cleaned = now
347				}
348			}
349		case u := <-m.input:
350			if u == nil {
351				m.deactivatePlugins()
352				if m.activeTasksNum+len(m.pluginQueue) == 0 {
353					break run
354				}
355				m.processQueue(time.Now())
356			}
357			switch v := u.(type) {
358			case *updateRequest:
359				m.processUpdateRequest(v, time.Now())
360				m.processQueue(time.Now())
361			case performer:
362				m.processFinishRequest(v)
363				if m.shutdownSeconds != shutdownInactive && m.activeTasksNum+len(m.pluginQueue) == 0 {
364					break run
365				}
366				m.processQueue(time.Now())
367			case *queryRequest:
368				if response, err := m.processQuery(v); err != nil {
369					v.sink <- "cannot process request: " + err.Error()
370				} else {
371					v.sink <- response
372				}
373			}
374		}
375	}
376	log.Debugf("manager has been stopped")
377	monitor.Unregister(monitor.Scheduler)
378}
379
380type pluginCapacity struct {
381	Capacity int `conf:"optional"`
382}
383
384func (m *Manager) init() {
385	m.input = make(chan interface{}, 10)
386	m.pluginQueue = make(pluginHeap, 0, len(plugin.Metrics))
387	m.clients = make(map[uint64]*client)
388	m.plugins = make(map[string]*pluginAgent)
389	m.shutdownSeconds = shutdownInactive
390
391	metrics := make([]*plugin.Metric, 0, len(plugin.Metrics))
392
393	for _, metric := range plugin.Metrics {
394		metrics = append(metrics, metric)
395	}
396	sort.Slice(metrics, func(i, j int) bool {
397		return metrics[i].Plugin.Name() < metrics[j].Plugin.Name()
398	})
399
400	pagent := &pluginAgent{}
401	for _, metric := range metrics {
402		if metric.Plugin != pagent.impl {
403			capacity := metric.Plugin.Capacity()
404			var opts pluginCapacity
405			optsRaw := agent.Options.Plugins[metric.Plugin.Name()]
406			if optsRaw != nil {
407				if err := conf.Unmarshal(optsRaw, &opts, false); err != nil {
408					log.Warningf("invalid plugin %s configuration: %s", metric.Plugin.Name(), err)
409					log.Warningf("using default plugin capacity settings: %d", plugin.DefaultCapacity)
410					capacity = plugin.DefaultCapacity
411				} else {
412					if opts.Capacity != 0 {
413						capacity = opts.Capacity
414					}
415				}
416			}
417
418			if capacity > metric.Plugin.Capacity() {
419				log.Warningf("lowering the plugin %s capacity to %d as the configured capacity %d exceeds limits",
420					metric.Plugin.Name(), metric.Plugin.Capacity(), capacity)
421				capacity = metric.Plugin.Capacity()
422			}
423
424			pagent = &pluginAgent{
425				impl:         metric.Plugin,
426				tasks:        make(performerHeap, 0),
427				maxCapacity:  capacity,
428				usedCapacity: 0,
429				index:        -1,
430				refcount:     0,
431			}
432
433			interfaces := ""
434			if _, ok := metric.Plugin.(plugin.Exporter); ok {
435				interfaces += "exporter, "
436			}
437			if _, ok := metric.Plugin.(plugin.Collector); ok {
438				interfaces += "collector, "
439			}
440			if _, ok := metric.Plugin.(plugin.Runner); ok {
441				interfaces += "runner, "
442			}
443			if _, ok := metric.Plugin.(plugin.Watcher); ok {
444				interfaces += "watcher, "
445			}
446			if _, ok := metric.Plugin.(plugin.Configurator); ok {
447				interfaces += "configurator, "
448			}
449			interfaces = interfaces[:len(interfaces)-2]
450			log.Infof("using plugin '%s' providing following interfaces: %s", metric.Plugin.Name(), interfaces)
451		}
452		m.plugins[metric.Key] = pagent
453	}
454}
455func (m *Manager) Start() {
456	monitor.Register(monitor.Scheduler)
457	go m.run()
458}
459
460func (m *Manager) Stop() {
461	m.input <- nil
462}
463
464func (m *Manager) UpdateTasks(clientID uint64, writer plugin.ResultWriter, refreshUnsupported int,
465	expressions []*glexpr.Expression, requests []*plugin.Request) {
466
467	m.input <- &updateRequest{clientID: clientID,
468		sink:               writer,
469		requests:           requests,
470		refreshUnsupported: refreshUnsupported,
471		expressions:        expressions,
472	}
473}
474
475type resultWriter chan *plugin.Result
476
477func (r resultWriter) Write(result *plugin.Result) {
478	r <- result
479}
480
481func (r resultWriter) Flush() {
482}
483
484func (r resultWriter) SlotsAvailable() int {
485	return 1
486}
487
488func (r resultWriter) PersistSlotsAvailable() int {
489	return 1
490}
491
492func (m *Manager) PerformTask(key string, timeout time.Duration, clientID uint64) (result string, err error) {
493	var lastLogsize uint64
494	var mtime int
495
496	w := make(resultWriter, 1)
497
498	m.UpdateTasks(clientID, w, 0, nil, []*plugin.Request{{Key: key, LastLogsize: &lastLogsize, Mtime: &mtime}})
499
500	select {
501	case r := <-w:
502		if r.Error == nil {
503			if r.Value != nil {
504				result = *r.Value
505			} else {
506				// single metric requests do not support empty values, return error instead
507				err = errors.New("No values have been gathered yet.")
508			}
509		} else {
510			err = r.Error
511		}
512	case <-time.After(timeout):
513		err = fmt.Errorf("Timeout occurred while gathering data.")
514	}
515	return
516}
517
518func (m *Manager) FinishTask(task performer) {
519	m.input <- task
520}
521
522func (m *Manager) Query(command string) (status string) {
523	request := &queryRequest{command: command, sink: make(chan string)}
524	m.input <- request
525	return <-request.sink
526}
527
528func (m *Manager) validatePlugins(options *agent.AgentOptions) (err error) {
529	for _, p := range plugin.Plugins {
530		if c, ok := p.(plugin.Configurator); ok {
531			if err = c.Validate(options.Plugins[p.Name()]); err != nil {
532				return fmt.Errorf("invalid plugin %s configuration: %s", p.Name(), err)
533			}
534		}
535	}
536	return
537}
538
539func (m *Manager) configure(options *agent.AgentOptions) (err error) {
540	m.aliases, err = alias.NewManager(options)
541	return
542}
543
544func NewManager(options *agent.AgentOptions) (mannager *Manager, err error) {
545	var m Manager
546	m.init()
547	if err = m.validatePlugins(options); err != nil {
548		return
549	}
550	return &m, m.configure(options)
551}
552