1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package scheduler
21
22import (
23	"errors"
24	"fmt"
25	"hash/fnv"
26	"sync/atomic"
27	"time"
28	"unsafe"
29
30	"zabbix.com/internal/agent"
31	"zabbix.com/pkg/glexpr"
32	"zabbix.com/pkg/log"
33	"zabbix.com/pkg/plugin"
34	"zabbix.com/pkg/zbxlib"
35)
36
37// clientItem represents item monitored by client
38type clientItem struct {
39	itemid uint64
40	delay  string
41	key    string
42}
43
44// pluginInfo is used to track plugin usage by client
45type pluginInfo struct {
46	used time.Time
47	// temporary link to a watcherTask during update
48	watcher *watcherTask
49}
50
51// client represents source of items (metrics) to be queried.
52// Each server for active checks is represented by a separate client.
53// There is a predefined clients to handle:
54//    all single passive checks (client id 1)
55//    all internal checks (resolving HostnameItem, HostMetadataItem, HostInterfaceItem) (client id 0)
56type client struct {
57	// Client id. Predefined clients have ids < 100, while clients active checks servers (ServerActive)
58	// have auto incrementing id starting with 100.
59	id uint64
60	// A map of itemids to the associated exporter tasks. It's used to update task when item parameters change.
61	exporters map[uint64]exporterTaskAccessor
62	// plugins used by client
63	pluginsInfo map[*pluginAgent]*pluginInfo
64	// server global regular expression bundle
65	globalRegexp unsafe.Pointer
66	// plugin result sink, can be nil for bulk passive checks (in future)
67	output plugin.ResultWriter
68}
69
70// ClientAccessor interface exports client data required for scheduler tasks.
71type ClientAccessor interface {
72	Output() plugin.ResultWriter
73	GlobalRegexp() *glexpr.Bundle
74	ID() uint64
75}
76
77// GlobalRegexp returns global regular expression bundle.
78// This function is used by tasks to implement ContextProvider interface.
79// It can be accessed by plugins and replaced by scheduler at the same time,
80// so pointer access must be synchronized. The global regexp contents are never changed,
81// only replaced, so pointer synchronization is enough.
82func (c *client) GlobalRegexp() *glexpr.Bundle {
83	return (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp))
84}
85
86// ID returns client id.
87// While it's used by tasks to implement ContextProvider interface, client ID cannot
88// change, so no synchronization is required.
89func (c *client) ID() uint64 {
90	return c.id
91}
92
93// Output returns client output interface where plugins results can be written.
94// While it's used by tasks to implement ContextProvider interface, client output cannot
95// change, so no synchronization is required.
96func (c *client) Output() plugin.ResultWriter {
97	return c.output
98}
99
100// addRequest requests client to start monitoring/update item described by request 'r' using plugin 'p' (*pluginAgent)
101// with output writer 'sink'
102func (c *client) addRequest(p *pluginAgent, r *plugin.Request, sink plugin.ResultWriter, now time.Time) (err error) {
103	var info *pluginInfo
104	var ok bool
105
106	log.Debugf("[%d] adding new request for key: '%s'", c.id, r.Key)
107
108	if info, ok = c.pluginsInfo[p]; !ok {
109		info = &pluginInfo{}
110	}
111
112	// list of created tasks to be queued
113	tasks := make([]performer, 0, 6)
114
115	// handle Collector interface
116	if col, ok := p.impl.(plugin.Collector); ok {
117		if p.refcount == 0 {
118			// calculate collector seed to avoid scheduling all collectors at the same time
119			h := fnv.New32a()
120			_, _ = h.Write([]byte(p.impl.Name()))
121			task := &collectorTask{
122				taskBase: taskBase{plugin: p, active: true, recurring: true},
123				seed:     uint64(h.Sum32())}
124			if err = task.reschedule(now); err != nil {
125				return
126			}
127			tasks = append(tasks, task)
128			log.Debugf("[%d] created collector task for plugin %s with collecting interval %d", c.id, p.name(),
129				col.Period())
130		}
131	}
132
133	// handle Exporter interface
134	if _, ok := p.impl.(plugin.Exporter); ok {
135		var tacc exporterTaskAccessor
136
137		if c.id > agent.MaxBuiltinClientID {
138			var task *exporterTask
139
140			if _, err = zbxlib.GetNextcheck(r.Itemid, r.Delay, now); err != nil {
141				return err
142			}
143			if tacc, ok = c.exporters[r.Itemid]; ok {
144				task = tacc.task()
145				if task.updated.Equal(now) {
146					return errors.New("duplicate itemid found")
147				}
148				if task.plugin != p {
149					// decativate current exporter task and create new one if the item key has been changed
150					// and the new metric is handled by other plugin
151					task.deactivate()
152					ok = false
153				}
154			}
155
156			if !ok {
157				// create and register new exporter task
158				task = &exporterTask{
159					taskBase: taskBase{plugin: p, active: true, recurring: true},
160					item:     clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key},
161					updated:  now,
162					client:   c,
163					output:   sink,
164				}
165				if err = task.reschedule(now); err != nil {
166					return
167				}
168				c.exporters[r.Itemid] = task
169				tasks = append(tasks, task)
170				log.Debugf("[%d] created exporter task for plugin '%s' itemid:%d key '%s'",
171					c.id, p.name(), task.item.itemid, task.item.key)
172			} else {
173				// update existing exporter task
174				task = tacc.task()
175				task.updated = now
176				task.item.key = r.Key
177				if task.item.delay != r.Delay {
178					task.item.delay = r.Delay
179					if err = task.reschedule(now); err != nil {
180						return
181					}
182					p.tasks.Update(task)
183					log.Debugf("[%d] updated exporter task for plugin '%s' itemid:%d key '%s'",
184						c.id, p.name(), task.item.itemid, task.item.key)
185				}
186			}
187			task.meta.SetLastLogsize(*r.LastLogsize)
188			task.meta.SetMtime(int32(*r.Mtime))
189
190		} else {
191			// handle single passive check or internal request
192			task := &directExporterTask{
193				taskBase: taskBase{plugin: p, active: true, recurring: true},
194				item:     clientItem{itemid: r.Itemid, delay: r.Delay, key: r.Key},
195				expire:   now.Add(time.Duration(agent.Options.Timeout) * time.Second),
196				client:   c,
197				output:   sink,
198			}
199			if err = task.reschedule(now); err != nil {
200				return
201			}
202			tasks = append(tasks, task)
203			log.Debugf("[%d] created direct exporter task for plugin '%s' itemid:%d key '%s'",
204				c.id, p.name(), task.item.itemid, task.item.key)
205		}
206	} else if c.id <= agent.MaxBuiltinClientID {
207		return fmt.Errorf(`The "%s" key is not supported in test or single passive check mode`, r.Key)
208	}
209
210	// handle runner interface for inactive plugins
211	if _, ok := p.impl.(plugin.Runner); ok {
212		if p.refcount == 0 {
213			task := &starterTask{
214				taskBase: taskBase{plugin: p, active: true},
215			}
216			if err = task.reschedule(now); err != nil {
217				return
218			}
219			tasks = append(tasks, task)
220			log.Debugf("[%d] created starter task for plugin %s", c.id, p.name())
221		}
222	}
223
224	// handle Watcher interface (not supported by single passive check or internal requests)
225	if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID {
226		if info.watcher == nil {
227			info.watcher = &watcherTask{
228				taskBase: taskBase{plugin: p, active: true},
229				requests: make([]*plugin.Request, 0, 1),
230				client:   c,
231			}
232			if err = info.watcher.reschedule(now); err != nil {
233				return
234			}
235			tasks = append(tasks, info.watcher)
236
237			log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name())
238		}
239		info.watcher.requests = append(info.watcher.requests, r)
240	}
241
242	// handle configurator interface for inactive plugins
243	if _, ok := p.impl.(plugin.Configurator); ok {
244		if p.refcount == 0 {
245			task := &configuratorTask{
246				taskBase: taskBase{plugin: p, active: true},
247				options:  &agent.Options,
248			}
249			_ = task.reschedule(now)
250			tasks = append(tasks, task)
251			log.Debugf("[%d] created configurator task for plugin %s", c.id, p.name())
252		}
253	}
254
255	for _, t := range tasks {
256		p.enqueueTask(t)
257	}
258
259	// update plugin usage information
260	if info.used.IsZero() {
261		p.refcount++
262		c.pluginsInfo[p] = info
263	}
264	info.used = now
265
266	return nil
267}
268
269// cleanup releases unused uplugins. For external clients it's done after update,
270// while for internal clients once per hour.
271func (c *client) cleanup(plugins map[string]*pluginAgent, now time.Time) (released []*pluginAgent) {
272	released = make([]*pluginAgent, 0, len(c.pluginsInfo))
273	// remove reference to temporary watcher tasks
274	for _, p := range c.pluginsInfo {
275		p.watcher = nil
276	}
277
278	// unmap not monitored exporter tasks
279	for _, tacc := range c.exporters {
280		task := tacc.task()
281		if task.updated.Before(now) {
282			delete(c.exporters, task.item.itemid)
283			log.Debugf("[%d] released unused exporter for itemid:%d", c.id, task.item.itemid)
284			task.deactivate()
285		}
286	}
287
288	var expiry time.Time
289	// Direct requests are handled by special clients with id <= MaxBuiltinClientID.
290	// Such requests have day+hour (to keep once per day checks without expiring)
291	// expiry time before used plugins are released.
292	if c.id > agent.MaxBuiltinClientID {
293		expiry = now
294	} else {
295		expiry = now.Add(-time.Hour * 25)
296	}
297
298	// deactivate plugins
299	for _, p := range plugins {
300		if info, ok := c.pluginsInfo[p]; ok {
301			if info.used.Before(expiry) {
302				// perform empty watch task before releasing plugin, so it could
303				// release internal resources allocated to monitor this client
304				if _, ok := p.impl.(plugin.Watcher); ok && c.id > agent.MaxBuiltinClientID {
305					task := &watcherTask{
306						taskBase: taskBase{plugin: p, active: true},
307						requests: make([]*plugin.Request, 0),
308						client:   c,
309					}
310					if err := task.reschedule(now); err == nil {
311						p.enqueueTask(task)
312						log.Debugf("[%d] created watcher task for plugin %s", c.id, p.name())
313					} else {
314						// currently watcher rescheduling cannot fail, but log a warning for future
315						log.Warningf("[%d] cannot reschedule plugin '%s' closing watcher task: %s",
316							c.id, p.impl.Name(), err)
317					}
318				}
319
320				// release plugin
321				released = append(released, p)
322				delete(c.pluginsInfo, p)
323				p.refcount--
324				// TODO: define uniform time format
325				if c.id > agent.MaxBuiltinClientID {
326					log.Debugf("[%d] released unused plugin %s", c.id, p.name())
327				} else {
328					log.Debugf("[%d] released plugin %s as not used since %s", c.id, p.name(),
329						info.used.Format(time.Stamp))
330				}
331			}
332		}
333	}
334	return
335}
336
337// updateExpressions updates server global regular expression bundle
338func (c *client) updateExpressions(expressions []*glexpr.Expression) {
339	// reset expressions if changed
340	glexpr.SortExpressions(expressions)
341	var grxp *glexpr.Bundle
342	if c.globalRegexp != nil {
343		grxp = (*glexpr.Bundle)(atomic.LoadPointer(&c.globalRegexp))
344		if !grxp.CompareExpressions(expressions) {
345			grxp = nil
346		}
347	}
348
349	if grxp == nil {
350		grxp = glexpr.NewBundle(expressions)
351		atomic.StorePointer(&c.globalRegexp, unsafe.Pointer(grxp))
352	}
353}
354
355// newClient creates new client
356func newClient(id uint64, output plugin.ResultWriter) (b *client) {
357	b = &client{
358		id:          id,
359		exporters:   make(map[uint64]exporterTaskAccessor),
360		pluginsInfo: make(map[*pluginAgent]*pluginInfo),
361		output:      output,
362	}
363
364	return
365}
366