1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19package resultcache
20
21import (
22	"encoding/json"
23	"sync/atomic"
24	"time"
25
26	"zabbix.com/internal/agent"
27	"zabbix.com/internal/monitor"
28	"zabbix.com/pkg/itemutil"
29	"zabbix.com/pkg/log"
30	"zabbix.com/pkg/plugin"
31	"zabbix.com/pkg/version"
32)
33
34type MemoryCache struct {
35	*cacheData
36	results         []*AgentData
37	maxBufferSize   int32
38	totalValueNum   int32
39	persistValueNum int32
40}
41
42func (c *MemoryCache) upload(u Uploader) (err error) {
43	if len(c.results) == 0 {
44		return
45	}
46
47	c.Debugf("upload history data, %d/%d value(s)", len(c.results), cap(c.results))
48
49	request := AgentDataRequest{
50		Request: "agent data",
51		Data:    c.results,
52		Session: c.token,
53		Host:    agent.Options.Hostname,
54		Version: version.Short(),
55	}
56
57	var data []byte
58
59	if data, err = json.Marshal(&request); err != nil {
60		c.Errf("cannot convert cached history to json: %s", err.Error())
61		return
62	}
63
64	timeout := len(c.results) * c.timeout
65	if timeout > 60 {
66		timeout = 60
67	}
68	if err = u.Write(data, time.Duration(timeout)*time.Second); err != nil {
69		if c.lastError == nil || err.Error() != c.lastError.Error() {
70			c.Warningf("history upload to [%s] started to fail: %s", u.Addr(), err)
71			c.lastError = err
72		}
73		return
74	}
75
76	if c.lastError != nil {
77		c.Warningf("history upload to [%s] is working again", u.Addr())
78		c.lastError = nil
79	}
80
81	// clear results slice to ensure that the data is garbage collected
82	c.results[0] = nil
83	for i := 1; i < len(c.results); i *= 2 {
84		copy(c.results[i:], c.results[:i])
85	}
86	c.results = c.results[:0]
87
88	c.totalValueNum = 0
89	c.persistValueNum = 0
90	return
91}
92
93func (c *MemoryCache) flushOutput(u Uploader) {
94	if c.retry != nil {
95		c.retry.Stop()
96		c.retry = nil
97	}
98
99	if c.upload(u) != nil && u.CanRetry() {
100		c.retry = time.AfterFunc(UploadRetryInterval, func() { c.Upload(u) })
101	}
102}
103
104// addResult appends received result at the end of results slice
105func (c *MemoryCache) addResult(result *AgentData) {
106	full := c.persistValueNum >= c.maxBufferSize/2 || c.totalValueNum >= c.maxBufferSize
107	c.results = append(c.results, result)
108	c.totalValueNum++
109	if result.persistent {
110		c.persistValueNum++
111	}
112
113	if c.persistValueNum >= c.maxBufferSize/2 || c.totalValueNum >= c.maxBufferSize {
114		if !full && c.uploader != nil {
115			c.flushOutput(c.uploader)
116		}
117	}
118}
119
120// insertResult attempts to insert the received result into results slice by replacing existing value.
121// If no appropriate target was found it calls addResult to append value.
122func (c *MemoryCache) insertResult(result *AgentData) {
123	index := -1
124	if !result.persistent {
125		for i, r := range c.results {
126			if r.Itemid == result.Itemid {
127				c.Debugf("cache is full, replacing oldest value for itemid:%d", r.Itemid)
128				index = i
129				break
130			}
131		}
132	}
133	if index == -1 && (!result.persistent || c.persistValueNum < c.maxBufferSize/2) {
134		for i, r := range c.results {
135			if !r.persistent {
136				if result.persistent {
137					c.persistValueNum++
138				}
139				c.Debugf("cache is full, removing oldest value for itemid:%d", r.Itemid)
140				index = i
141				break
142			}
143		}
144	}
145	if index == -1 {
146		c.Warningf("cache is full and cannot cannot find a value to replace, adding new instead")
147		c.addResult(result)
148		return
149	}
150
151	copy(c.results[index:], c.results[index+1:])
152	c.results[len(c.results)-1] = result
153}
154
155func (c *MemoryCache) write(r *plugin.Result) {
156	c.lastDataID++
157	var value *string
158	var state *int
159	if r.Error == nil {
160		value = r.Value
161	} else {
162		errmsg := r.Error.Error()
163		value = &errmsg
164		tmp := itemutil.StateNotSupported
165		state = &tmp
166	}
167
168	var clock, ns int
169	if !r.Ts.IsZero() {
170		clock = int(r.Ts.Unix())
171		ns = r.Ts.Nanosecond()
172	}
173
174	data := &AgentData{
175		Id:             c.lastDataID,
176		Itemid:         r.Itemid,
177		LastLogsize:    r.LastLogsize,
178		Mtime:          r.Mtime,
179		Clock:          clock,
180		Ns:             ns,
181		Value:          value,
182		State:          state,
183		EventSource:    r.EventSource,
184		EventID:        r.EventID,
185		EventSeverity:  r.EventSeverity,
186		EventTimestamp: r.EventTimestamp,
187		persistent:     r.Persistent,
188	}
189
190	if c.totalValueNum >= c.maxBufferSize {
191		c.insertResult(data)
192	} else {
193		c.addResult(data)
194	}
195}
196
197func (c *MemoryCache) run() {
198	defer log.PanicHook()
199	c.Debugf("starting memory cache")
200
201	for {
202		u := <-c.input
203		if u == nil {
204			break
205		}
206		switch v := u.(type) {
207		case Uploader:
208			c.flushOutput(v)
209		case *plugin.Result:
210			c.write(v)
211		case *agent.AgentOptions:
212			c.updateOptions(v)
213		}
214	}
215	c.Debugf("memory cache has been stopped")
216	monitor.Unregister(monitor.Output)
217}
218
219func (c *MemoryCache) updateOptions(options *agent.AgentOptions) {
220	c.maxBufferSize = int32(options.BufferSize)
221	c.timeout = options.Timeout
222}
223
224func (c *MemoryCache) init(options *agent.AgentOptions) {
225	c.updateOptions(options)
226	c.results = make([]*AgentData, 0, c.maxBufferSize)
227}
228
229func (c *MemoryCache) Start() {
230	// register with secondary group to stop result cache after other components are stopped
231	monitor.Register(monitor.Output)
232	go c.run()
233}
234
235func (c *MemoryCache) SlotsAvailable() int {
236	slots := atomic.LoadInt32(&c.maxBufferSize) - atomic.LoadInt32(&c.totalValueNum)
237	if slots < 0 {
238		slots = 0
239	}
240
241	return int(slots)
242}
243
244func (c *MemoryCache) PersistSlotsAvailable() int {
245	slots := atomic.LoadInt32(&c.maxBufferSize)/2 - atomic.LoadInt32(&c.persistValueNum)
246	if slots < 0 {
247		slots = 0
248	}
249	return int(slots)
250}
251