1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19package resultcache 20 21import ( 22 "encoding/json" 23 "sync/atomic" 24 "time" 25 26 "zabbix.com/internal/agent" 27 "zabbix.com/internal/monitor" 28 "zabbix.com/pkg/itemutil" 29 "zabbix.com/pkg/log" 30 "zabbix.com/pkg/plugin" 31 "zabbix.com/pkg/version" 32) 33 34type MemoryCache struct { 35 *cacheData 36 results []*AgentData 37 maxBufferSize int32 38 totalValueNum int32 39 persistValueNum int32 40} 41 42func (c *MemoryCache) upload(u Uploader) (err error) { 43 if len(c.results) == 0 { 44 return 45 } 46 47 c.Debugf("upload history data, %d/%d value(s)", len(c.results), cap(c.results)) 48 49 request := AgentDataRequest{ 50 Request: "agent data", 51 Data: c.results, 52 Session: c.token, 53 Host: agent.Options.Hostname, 54 Version: version.Short(), 55 } 56 57 var data []byte 58 59 if data, err = json.Marshal(&request); err != nil { 60 c.Errf("cannot convert cached history to json: %s", err.Error()) 61 return 62 } 63 64 timeout := len(c.results) * c.timeout 65 if timeout > 60 { 66 timeout = 60 67 } 68 if err = u.Write(data, time.Duration(timeout)*time.Second); err != nil { 69 if c.lastError == nil || err.Error() != c.lastError.Error() { 70 c.Warningf("history upload to [%s] started to fail: %s", u.Addr(), err) 71 c.lastError = err 72 } 73 return 74 } 75 76 if c.lastError != nil { 77 c.Warningf("history upload to [%s] is working again", u.Addr()) 78 c.lastError = nil 79 } 80 81 // clear results slice to ensure that the data is garbage collected 82 c.results[0] = nil 83 for i := 1; i < len(c.results); i *= 2 { 84 copy(c.results[i:], c.results[:i]) 85 } 86 c.results = c.results[:0] 87 88 c.totalValueNum = 0 89 c.persistValueNum = 0 90 return 91} 92 93func (c *MemoryCache) flushOutput(u Uploader) { 94 if c.retry != nil { 95 c.retry.Stop() 96 c.retry = nil 97 } 98 99 if c.upload(u) != nil && u.CanRetry() { 100 c.retry = time.AfterFunc(UploadRetryInterval, func() { c.Upload(u) }) 101 } 102} 103 104// addResult appends received result at the end of results slice 105func (c *MemoryCache) addResult(result *AgentData) { 106 full := c.persistValueNum >= c.maxBufferSize/2 || c.totalValueNum >= c.maxBufferSize 107 c.results = append(c.results, result) 108 c.totalValueNum++ 109 if result.persistent { 110 c.persistValueNum++ 111 } 112 113 if c.persistValueNum >= c.maxBufferSize/2 || c.totalValueNum >= c.maxBufferSize { 114 if !full && c.uploader != nil { 115 c.flushOutput(c.uploader) 116 } 117 } 118} 119 120// insertResult attempts to insert the received result into results slice by replacing existing value. 121// If no appropriate target was found it calls addResult to append value. 122func (c *MemoryCache) insertResult(result *AgentData) { 123 index := -1 124 if !result.persistent { 125 for i, r := range c.results { 126 if r.Itemid == result.Itemid { 127 c.Debugf("cache is full, replacing oldest value for itemid:%d", r.Itemid) 128 index = i 129 break 130 } 131 } 132 } 133 if index == -1 && (!result.persistent || c.persistValueNum < c.maxBufferSize/2) { 134 for i, r := range c.results { 135 if !r.persistent { 136 if result.persistent { 137 c.persistValueNum++ 138 } 139 c.Debugf("cache is full, removing oldest value for itemid:%d", r.Itemid) 140 index = i 141 break 142 } 143 } 144 } 145 if index == -1 { 146 c.Warningf("cache is full and cannot cannot find a value to replace, adding new instead") 147 c.addResult(result) 148 return 149 } 150 151 copy(c.results[index:], c.results[index+1:]) 152 c.results[len(c.results)-1] = result 153} 154 155func (c *MemoryCache) write(r *plugin.Result) { 156 c.lastDataID++ 157 var value *string 158 var state *int 159 if r.Error == nil { 160 value = r.Value 161 } else { 162 errmsg := r.Error.Error() 163 value = &errmsg 164 tmp := itemutil.StateNotSupported 165 state = &tmp 166 } 167 168 var clock, ns int 169 if !r.Ts.IsZero() { 170 clock = int(r.Ts.Unix()) 171 ns = r.Ts.Nanosecond() 172 } 173 174 data := &AgentData{ 175 Id: c.lastDataID, 176 Itemid: r.Itemid, 177 LastLogsize: r.LastLogsize, 178 Mtime: r.Mtime, 179 Clock: clock, 180 Ns: ns, 181 Value: value, 182 State: state, 183 EventSource: r.EventSource, 184 EventID: r.EventID, 185 EventSeverity: r.EventSeverity, 186 EventTimestamp: r.EventTimestamp, 187 persistent: r.Persistent, 188 } 189 190 if c.totalValueNum >= c.maxBufferSize { 191 c.insertResult(data) 192 } else { 193 c.addResult(data) 194 } 195} 196 197func (c *MemoryCache) run() { 198 defer log.PanicHook() 199 c.Debugf("starting memory cache") 200 201 for { 202 u := <-c.input 203 if u == nil { 204 break 205 } 206 switch v := u.(type) { 207 case Uploader: 208 c.flushOutput(v) 209 case *plugin.Result: 210 c.write(v) 211 case *agent.AgentOptions: 212 c.updateOptions(v) 213 } 214 } 215 c.Debugf("memory cache has been stopped") 216 monitor.Unregister(monitor.Output) 217} 218 219func (c *MemoryCache) updateOptions(options *agent.AgentOptions) { 220 c.maxBufferSize = int32(options.BufferSize) 221 c.timeout = options.Timeout 222} 223 224func (c *MemoryCache) init(options *agent.AgentOptions) { 225 c.updateOptions(options) 226 c.results = make([]*AgentData, 0, c.maxBufferSize) 227} 228 229func (c *MemoryCache) Start() { 230 // register with secondary group to stop result cache after other components are stopped 231 monitor.Register(monitor.Output) 232 go c.run() 233} 234 235func (c *MemoryCache) SlotsAvailable() int { 236 slots := atomic.LoadInt32(&c.maxBufferSize) - atomic.LoadInt32(&c.totalValueNum) 237 if slots < 0 { 238 slots = 0 239 } 240 241 return int(slots) 242} 243 244func (c *MemoryCache) PersistSlotsAvailable() int { 245 slots := atomic.LoadInt32(&c.maxBufferSize)/2 - atomic.LoadInt32(&c.persistValueNum) 246 if slots < 0 { 247 slots = 0 248 } 249 return int(slots) 250} 251