1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package scheduler
21
22import (
23	"errors"
24	"hash/fnv"
25	"sync/atomic"
26	"time"
27	"unsafe"
28
29	"zabbix.com/internal/agent"
30	"zabbix.com/pkg/glexpr"
31	"zabbix.com/pkg/log"
32	"zabbix.com/pkg/plugin"
33	"zabbix.com/pkg/zbxlib"
34)
35
36// clientItem represents item monitored by client
37type clientItem struct {
38	itemid uint64
39	delay  string
40	key    string
41}
42
43// pluginInfo is used to track plugin usage by client
44type pluginInfo struct {
45	used time.Time
46	// temporary link to a watcherTask during update
47	watcher *watcherTask
48}
49
50// client represents source of items (metrics) to be queried.
51// Each server for active checks is represented by a separate client.
52// There is a predefined clients to handle:
53//    all single passive checks (client id 1)
54//    all internal checks (resolving HostnameItem, HostMetadataItem, HostInterfaceItem) (client id 0)
55type client struct {
56	// Client id. Predefined clients have ids < 100, while clients active checks servers (ServerActive)
57	// have auto incrementing id starting with 100.
58	id uint64
59	// A map of itemids to the associated exporter tasks. It's used to update task when item parameters change.
60	exporters map[uint64]exporterTaskAccessor
61	// plugins used by client
62	pluginsInfo map[*pluginAgent]*pluginInfo
63	// server refresh unsupported value
64	refreshUnsupported int
65	// server global regular expression bundle
66	globalRegexp unsafe.Pointer
67	// plugin result sink, can be nil for bulk passive checks (in future)
68	output plugin.ResultWriter
69}
70
71// ClientAccessor interface exports client data required for scheduler tasks.
72type ClientAccessor interface {
73	RefreshUnsupported() int
74	Output() plugin.ResultWriter
75	GlobalRegexp() *glexpr.Bundle
76	ID() uint64
77}
78
79// RefreshUnsupported returns scheduling interval for unsupported items.
80// This function is used only by scheduler, no synchronization is required.
81func (c *client) RefreshUnsupported() int {
82	return c.refreshUnsupported
83}
84
85// GlobalRegexp returns global regular expression bundle.
86// This function is used by tasks to implement ContextProvider interface.
87// It can be accessed by plugins and replaced by scheduler at the same time,
88// so pointer access must be synchronized. The global regexp contents are never changed,
89// only replaced, so pointer synchronization is enough.
90func (c *client) GlobalRegexp() *glexpr.Bundle {
91	return (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp))
92}
93
94// ID returns client id.
95// While it's used by tasks to implement ContextProvider interface, client ID cannot
96// change, so no synchronization is required.
97func (c *client) ID() uint64 {
98	return c.id
99}
100
101// Output returns client output interface where plugins results can be written.
102// While it's used by tasks to implement ContextProvider interface, client output cannot
103// change, so no synchronization is required.
104func (c *client) Output() plugin.ResultWriter {
105	return c.output
106}
107
108// addRequest requests client to start monitoring/update item described by request 'r' using plugin 'p' (*pluginAgent)
109// with output writer 'sink'
110func (c *client) addRequest(p *pluginAgent, r *plugin.Request, sink plugin.ResultWriter, now time.Time) (err error) {
111	var info *pluginInfo
112	var ok bool
113
114	log.Debugf("[%d] adding new request for key: '%s'", c.id, r.Key)
115
116	if info, ok = c.pluginsInfo[p]; !ok {
117		info = &pluginInfo{}
118	}
119
120	// list of created tasks to be queued
121	tasks := make([]performer, 0, 6)
122
123	// handle Collector interface
124	if col, ok := p.impl.(plugin.Collector); ok {
125		if p.refcount == 0 {
126			// calculate collector seed to avoid scheduling all collectors at the same time
127			h := fnv.New32a()
128			_, _ = h.Write([]byte(p.impl.Name()))
129			task := &collectorTask{
130				taskBase: taskBase{plugin: p, active: true, recurring: true},
131				seed:     uint64(h.Sum32())}
132			if err = task.reschedule(now); err != nil {
133				return
134			}
135			tasks = append(tasks, task)
136			log.Debugf("[%d] created collector task for plugin %s with collecting interval %d", c.id, p.name(),
137				col.Period())
138		}
139	}
140
141	// handle Exporter interface
142	if _, ok := p.impl.(plugin.Exporter); ok {
143		var tacc exporterTaskAccessor
144
145		if c.id > agent.MaxBuiltinClientID {
146			var task *exporterTask
147
148			if _, err = zbxlib.GetNextcheck(r.Itemid, r.Delay, now, false, c.refreshUnsupported); err != nil {
149				return err
150			}
151			if tacc, ok = c.exporters[r.Itemid]; ok {
152				task = tacc.task()
153				if task.updated.Equal(now) {
154					return errors.New("duplicate itemid found")
155				}
156				if task.plugin != p {
157					// decativate current exporter task and create new one if the item key has been changed
158					// and the new metric is handled by other plugin
159					task.deactivate()
160					ok = false
161				}
162			}
163
164			if !ok {
165				// create and register new exporter task
166				task = &exporterTask{
167					taskBase: taskBase{plugin: p, active: true, recurring: true},
168					item:     clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key},
169					updated:  now,
170					client:   c,
171					output:   sink,
172				}
173				if err = task.reschedule(now); err != nil {
174					return
175				}
176				c.exporters[r.Itemid] = task
177				tasks = append(tasks, task)
178				log.Debugf("[%d] created exporter task for plugin '%s' itemid:%d key '%s'",
179					c.id, p.name(), task.item.itemid, task.item.key)
180			} else {
181				// update existing exporter task
182				task = tacc.task()
183				task.updated = now
184				task.item.key = r.Key
185				if task.item.delay != r.Delay {
186					task.item.delay = r.Delay
187					if err = task.reschedule(now); err != nil {
188						return
189					}
190					p.tasks.Update(task)
191					log.Debugf("[%d] updated exporter task for plugin '%s' itemid:%d key '%s'",
192						c.id, p.name(), task.item.itemid, task.item.key)
193				}
194			}
195			task.meta.SetLastLogsize(*r.LastLogsize)
196			task.meta.SetMtime(int32(*r.Mtime))
197
198		} else {
199			// handle single passive check or internal request
200			task := &directExporterTask{
201				taskBase: taskBase{plugin: p, active: true, recurring: true},
202				item:     clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key},
203				expire:   now.Add(time.Duration(agent.Options.Timeout) * time.Second),
204				client:   c,
205				output:   sink,
206			}
207			if err = task.reschedule(now); err != nil {
208				return
209			}
210			tasks = append(tasks, task)
211			log.Debugf("[%d] created direct exporter task for plugin '%s' itemid:%d key '%s'",
212				c.id, p.name(), task.item.itemid, task.item.key)
213		}
214	}
215
216	// handle runner interface for inactive plugins
217	if _, ok := p.impl.(plugin.Runner); ok {
218		if p.refcount == 0 {
219			task := &starterTask{
220				taskBase: taskBase{plugin: p, active: true},
221			}
222			if err = task.reschedule(now); err != nil {
223				return
224			}
225			tasks = append(tasks, task)
226			log.Debugf("[%d] created starter task for plugin %s", c.id, p.name())
227		}
228	}
229
230	// handle Watcher interface (not supported by single passive check or internal requests)
231	if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID {
232		if info.watcher == nil {
233			info.watcher = &watcherTask{
234				taskBase: taskBase{plugin: p, active: true},
235				requests: make([]*plugin.Request, 0, 1),
236				client:   c,
237			}
238			if err = info.watcher.reschedule(now); err != nil {
239				return
240			}
241			tasks = append(tasks, info.watcher)
242
243			log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name())
244		}
245		info.watcher.requests = append(info.watcher.requests, r)
246	}
247
248	// handle configurator interface for inactive plugins
249	if _, ok := p.impl.(plugin.Configurator); ok {
250		if p.refcount == 0 {
251			task := &configuratorTask{
252				taskBase: taskBase{plugin: p, active: true},
253				options:  &agent.Options,
254			}
255			_ = task.reschedule(now)
256			tasks = append(tasks, task)
257			log.Debugf("[%d] created configurator task for plugin %s", c.id, p.name())
258		}
259	}
260
261	for _, t := range tasks {
262		p.enqueueTask(t)
263	}
264
265	// update plugin usage information
266	if info.used.IsZero() {
267		p.refcount++
268		c.pluginsInfo[p] = info
269	}
270	info.used = now
271
272	return nil
273}
274
275// cleanup releases unused uplugins. For external clients it's done after update,
276// while for internal clients once per hour.
277func (c *client) cleanup(plugins map[string]*pluginAgent, now time.Time) (released []*pluginAgent) {
278	released = make([]*pluginAgent, 0, len(c.pluginsInfo))
279	// remove reference to temporary watcher tasks
280	for _, p := range c.pluginsInfo {
281		p.watcher = nil
282	}
283
284	// unmap not monitored exporter tasks
285	for _, tacc := range c.exporters {
286		task := tacc.task()
287		if task.updated.Before(now) {
288			delete(c.exporters, task.item.itemid)
289			log.Debugf("[%d] released unused exporter for itemid:%d", c.id, task.item.itemid)
290			task.deactivate()
291		}
292	}
293
294	var expiry time.Time
295	// Direct requests are handled by special clients with id <= MaxBuiltinClientID.
296	// Such requests have day+hour (to keep once per day checks without expiring)
297	// expiry time before used plugins are released.
298	if c.id > agent.MaxBuiltinClientID {
299		expiry = now
300	} else {
301		expiry = now.Add(-time.Hour * 25)
302	}
303
304	// deactivate plugins
305	for _, p := range plugins {
306		if info, ok := c.pluginsInfo[p]; ok {
307			if info.used.Before(expiry) {
308				// perform empty watch task before releasing plugin, so it could
309				// release internal resources allocated to monitor this client
310				if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID {
311					task := &watcherTask{
312						taskBase: taskBase{plugin: p, active: true},
313						requests: make([]*plugin.Request, 0),
314						client:   c,
315					}
316					if err := task.reschedule(now); err == nil {
317						p.enqueueTask(task)
318						log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name())
319					} else {
320						// currently watcher rescheduling cannot fail, but log a warning for future
321						log.Warningf("[%d] cannot reschedule plugin '%s' closing watcher task: %s",
322							c.id, p.impl.Name(), err)
323					}
324				}
325
326				// release plugin
327				released = append(released, p)
328				delete(c.pluginsInfo, p)
329				p.refcount--
330				// TODO: define uniform time format
331				if c.id > agent.MaxBuiltinClientID {
332					log.Debugf("[%d] released unused plugin %s", c.id, p.name())
333				} else {
334					log.Debugf("[%d] released plugin %s as not used since %s", c.id, p.name(),
335						info.used.Format(time.Stamp))
336				}
337			}
338		}
339	}
340	return
341}
342
343// updateExpressions updates server global regular expression bundle
344func (c *client) updateExpressions(expressions []*glexpr.Expression) {
345	// reset expressions if changed
346	glexpr.SortExpressions(expressions)
347	var grxp *glexpr.Bundle
348	if c.globalRegexp != nil {
349		grxp = (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp))
350		if !grxp.CompareExpressions(expressions) {
351			grxp = nil
352		}
353	}
354
355	if grxp == nil {
356		grxp = glexpr.NewBundle(expressions)
357		atomic.StorePointer(&c.globalRegexp, unsafe.Pointer(grxp))
358	}
359}
360
361// newClient creates new client
362func newClient(id uint64, output plugin.ResultWriter) (b *client) {
363	b = &client{
364		id:          id,
365		exporters:   make(map[uint64]exporterTaskAccessor),
366		pluginsInfo: make(map[*pluginAgent]*pluginInfo),
367		output:      output,
368	}
369
370	return
371}
372