1/* 2** Zabbix 3** Copyright (C) 2001-2021 Zabbix SIA 4** 5** This program is free software; you can redistribute it and/or modify 6** it under the terms of the GNU General Public License as published by 7** the Free Software Foundation; either version 2 of the License, or 8** (at your option) any later version. 9** 10** This program is distributed in the hope that it will be useful, 11** but WITHOUT ANY WARRANTY; without even the implied warranty of 12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13** GNU General Public License for more details. 14** 15** You should have received a copy of the GNU General Public License 16** along with this program; if not, write to the Free Software 17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18**/ 19 20package zbxlib 21 22/* 23#cgo CFLAGS: -I${SRCDIR}/../../../../include 24 25#include "common.h" 26#include "sysinfo.h" 27#include "log.h" 28#include "../src/zabbix_agent/metrics.h" 29#include "../src/zabbix_agent/logfiles/logfiles.h" 30 31extern int CONFIG_MAX_LINES_PER_SECOND; 32 33typedef ZBX_ACTIVE_METRIC* ZBX_ACTIVE_METRIC_LP; 34typedef zbx_vector_ptr_t * zbx_vector_ptr_lp_t; 35typedef char * char_lp_t; 36typedef zbx_vector_pre_persistent_t * zbx_vector_pre_persistent_lp_t; 37 38ZBX_ACTIVE_METRIC *new_metric(char *key, zbx_uint64_t lastlogsize, int mtime, int flags) 39{ 40 ZBX_ACTIVE_METRIC *metric = malloc(sizeof(ZBX_ACTIVE_METRIC)); 41 memset(metric, 0, sizeof(ZBX_ACTIVE_METRIC)); 42 metric->key = key; 43 // key_orig is used in error messages, consider using "itemid: <itemid>" instead of the key 44 metric->key_orig = zbx_strdup(NULL, key); 45 metric->lastlogsize = lastlogsize; 46 metric->mtime = mtime; 47 metric->flags = (unsigned char)flags; 48 metric->skip_old_data = (0 != metric->lastlogsize ? 0 : 1); 49 metric->persistent_file_name = NULL; // initialized but not used in Agent2 50 return metric; 51} 52 53void metric_set_refresh(ZBX_ACTIVE_METRIC *metric, int refresh) 54{ 55 metric->refresh = refresh; 56} 57 58void metric_get_meta(ZBX_ACTIVE_METRIC *metric, zbx_uint64_t *lastlogsize, int *mtime) 59{ 60 *lastlogsize = metric->lastlogsize; 61 *mtime = metric->mtime; 62} 63 64void metric_set_unsupported(ZBX_ACTIVE_METRIC *metric) 65{ 66 metric->state = ITEM_STATE_NOTSUPPORTED; 67 metric->error_count = 0; 68 metric->start_time = 0.0; 69 metric->processed_bytes = 0; 70} 71 72int metric_set_supported(ZBX_ACTIVE_METRIC *metric, zbx_uint64_t lastlogsize_sent, int mtime_sent, 73 zbx_uint64_t lastlogsize_last, int mtime_last) 74{ 75 int ret = FAIL; 76 77 if (0 == metric->error_count) 78 { 79 unsigned char old_state = metric->state; 80 if (ITEM_STATE_NOTSUPPORTED == metric->state) 81 { 82 metric->state = ITEM_STATE_NORMAL; 83 } 84 85 if (lastlogsize_sent != metric->lastlogsize || mtime_sent != metric->mtime || 86 (lastlogsize_last == lastlogsize_sent && mtime_last == mtime_sent && 87 (old_state != metric->state || 0 != (ZBX_METRIC_FLAG_NEW & metric->flags)))) 88 { 89 ret = SUCCEED; 90 } 91 metric->flags &= ~ZBX_METRIC_FLAG_NEW; 92 } 93 return ret; 94} 95 96void metric_free(ZBX_ACTIVE_METRIC *metric) 97{ 98 int i; 99 100 if (NULL == metric) 101 return; 102 103 zbx_free(metric->key); 104 zbx_free(metric->key_orig); 105 106 for (i = 0; i < metric->logfiles_num; i++) 107 zbx_free(metric->logfiles[i].filename); 108 109 zbx_free(metric->logfiles); 110 zbx_free(metric->persistent_file_name); 111 zbx_free(metric); 112} 113 114typedef struct 115{ 116 char *value; 117 int state; 118 zbx_uint64_t lastlogsize; 119 int mtime; 120} 121log_value_t; 122 123typedef struct 124{ 125 zbx_vector_ptr_t values; 126 int slots; 127} 128log_result_t, *log_result_lp_t; 129 130static log_result_t *new_log_result(int slots) 131{ 132 log_result_t *result; 133 134 result = (log_result_t *)zbx_malloc(NULL, sizeof(log_result_t)); 135 zbx_vector_ptr_create(&result->values); 136 result->slots = slots; 137 return result; 138} 139 140static void add_log_value(log_result_t *result, const char *value, int state, zbx_uint64_t lastlogsize, int mtime) 141{ 142 log_value_t *log; 143 log = (log_value_t *)zbx_malloc(NULL, sizeof(log_value_t)); 144 log->value = zbx_strdup(NULL, value); 145 log->state = state; 146 log->lastlogsize = lastlogsize; 147 log->mtime = mtime; 148 zbx_vector_ptr_append(&result->values, log); 149} 150 151static int get_log_value(log_result_t *result, int index, char **value, int *state, zbx_uint64_t *lastlogsize, int *mtime) 152{ 153 log_value_t *log; 154 155 if (index == result->values.values_num) 156 return FAIL; 157 158 log = (log_value_t *)result->values.values[index]; 159 *value = log->value; 160 *state = log->state; 161 *lastlogsize = log->lastlogsize; 162 *mtime = log->mtime; 163 return SUCCEED; 164} 165 166static void free_log_value(log_value_t *log) 167{ 168 zbx_free(log->value); 169 zbx_free(log); 170} 171 172static void free_log_result(log_result_t *result) 173{ 174 zbx_vector_ptr_clear_ext(&result->values, (zbx_clean_func_t)free_log_value); 175 zbx_vector_ptr_destroy(&result->values); 176 zbx_free(result); 177} 178 179int process_value_cb(const char *server, unsigned short port, const char *host, const char *key, 180 const char *value, unsigned char state, zbx_uint64_t *lastlogsize, const int *mtime, 181 unsigned long *timestamp, const char *source, unsigned short *severity, unsigned long *logeventid, 182 unsigned char flags) 183{ 184 log_result_t *result = (log_result_t *)server; 185 if (result->values.values_num == result->slots) 186 return FAIL; 187 188 add_log_value(result, value, state, *lastlogsize, *mtime); 189 return SUCCEED; 190} 191 192static zbx_vector_pre_persistent_lp_t new_prep_vec(void) 193{ 194 zbx_vector_pre_persistent_lp_t vect; 195 196 vect = (zbx_vector_pre_persistent_lp_t)zbx_malloc(NULL, sizeof(zbx_vector_pre_persistent_t)); 197 zbx_vector_pre_persistent_create(vect); 198 return vect; 199} 200 201static void free_prep_vec(zbx_vector_pre_persistent_lp_t vect) 202{ 203 // In Agent2 this vector is expected to be empty because 'persistent directory' parameter is not allowed. 204 // Therefore a simplified cleanup is used. 205 zbx_vector_pre_persistent_destroy(vect); 206 zbx_free(vect); 207} 208*/ 209import "C" 210 211import ( 212 "errors" 213 "time" 214 "unsafe" 215 216 "zabbix.com/pkg/itemutil" 217) 218 219const ( 220 MetricFlagPersistent = 0x01 221 MetricFlagNew = 0x02 222 MetricFlagLogLog = 0x04 223 MetricFlagLogLogrt = 0x08 224 MetricFlagLogEventlog = 0x10 225 MetricFlagLogCount = 0x20 226 MetricFlagLog = MetricFlagLogLog | MetricFlagLogLogrt | MetricFlagLogEventlog 227) 228 229type ResultWriter interface { 230 PersistSlotsAvailable() int 231} 232 233type LogItem struct { 234 LastTs time.Time // the last log value timestamp + 1ns 235 Results []*LogResult 236 Output ResultWriter 237} 238 239type LogResult struct { 240 Value *string 241 Ts time.Time 242 Error error 243 LastLogsize uint64 244 Mtime int 245} 246 247func NewActiveMetric(key string, params []string, lastLogsize uint64, mtime int32) (data unsafe.Pointer, err error) { 248 flags := MetricFlagNew | MetricFlagPersistent 249 switch key { 250 case "log": 251 if len(params) >= 9 && params[8] != "" { 252 return nil, errors.New("The ninth parameter (persistent directory) is not supported by Agent2.") 253 } 254 flags |= MetricFlagLogLog 255 case "logrt": 256 if len(params) >= 9 && params[8] != "" { 257 return nil, errors.New("The ninth parameter (persistent directory) is not supported by Agent2.") 258 } 259 flags |= MetricFlagLogLogrt 260 case "log.count": 261 if len(params) >= 8 && params[7] != "" { 262 return nil, errors.New("The eighth parameter (persistent directory) is not supported by Agent2.") 263 } 264 flags |= MetricFlagLogCount | MetricFlagLogLog 265 case "logrt.count": 266 if len(params) >= 8 && params[7] != "" { 267 return nil, errors.New("The eighth parameter (persistent directory) is not supported by Agent2.") 268 } 269 flags |= MetricFlagLogCount | MetricFlagLogLogrt 270 case "eventlog": 271 flags |= MetricFlagLogEventlog 272 default: 273 return nil, errors.New("Unsupported item key.") 274 } 275 ckey := C.CString(itemutil.MakeKey(key, params)) 276 return unsafe.Pointer(C.new_metric(ckey, C.zbx_uint64_t(lastLogsize), C.int(mtime), C.int(flags))), nil 277} 278 279func FreeActiveMetric(data unsafe.Pointer) { 280 C.metric_free(C.ZBX_ACTIVE_METRIC_LP(data)) 281} 282 283func ProcessLogCheck(data unsafe.Pointer, item *LogItem, refresh int, cblob unsafe.Pointer) { 284 C.metric_set_refresh(C.ZBX_ACTIVE_METRIC_LP(data), C.int(refresh)) 285 286 var clastLogsizeSent, clastLogsizeLast C.zbx_uint64_t 287 var cmtimeSent, cmtimeLast C.int 288 C.metric_get_meta(C.ZBX_ACTIVE_METRIC_LP(data), &clastLogsizeSent, &cmtimeSent) 289 clastLogsizeLast = clastLogsizeSent 290 cmtimeLast = cmtimeSent 291 292 result := C.new_log_result(C.int(item.Output.PersistSlotsAvailable())) 293 294 var cerrmsg *C.char 295 cprepVec := C.new_prep_vec() // In Agent2 it is always empty vector. Not used but required for linking. 296 ret := C.process_log_check(C.char_lp_t(unsafe.Pointer(result)), 0, C.zbx_vector_ptr_lp_t(cblob), 297 C.ZBX_ACTIVE_METRIC_LP(data), C.zbx_process_value_func_t(C.process_value_cb), &clastLogsizeSent, 298 &cmtimeSent, &cerrmsg, cprepVec) 299 C.free_prep_vec(cprepVec) 300 301 // add cached results 302 var cvalue *C.char 303 var clastlogsize C.zbx_uint64_t 304 var cstate, cmtime C.int 305 logTs := time.Now() 306 if logTs.Before(item.LastTs) { 307 logTs = item.LastTs 308 } 309 for i := 0; C.get_log_value(result, C.int(i), &cvalue, &cstate, &clastlogsize, &cmtime) != C.FAIL; i++ { 310 var value string 311 var err error 312 if cstate == C.ITEM_STATE_NORMAL { 313 value = C.GoString(cvalue) 314 } else { 315 err = errors.New(C.GoString(cvalue)) 316 } 317 318 r := &LogResult{ 319 Value: &value, 320 Ts: logTs, 321 Error: err, 322 LastLogsize: uint64(clastlogsize), 323 Mtime: int(cmtime), 324 } 325 326 item.Results = append(item.Results, r) 327 logTs = logTs.Add(time.Nanosecond) 328 } 329 C.free_log_result(result) 330 331 item.LastTs = logTs 332 333 if ret == C.FAIL { 334 C.metric_set_unsupported(C.ZBX_ACTIVE_METRIC_LP(data)) 335 336 var err error 337 if cerrmsg != nil { 338 err = errors.New(C.GoString(cerrmsg)) 339 C.free(unsafe.Pointer(cerrmsg)) 340 } else { 341 err = errors.New("Unknown error.") 342 } 343 result := &LogResult{ 344 Ts: time.Now(), 345 Error: err, 346 } 347 item.Results = append(item.Results, result) 348 } else { 349 ret := C.metric_set_supported(C.ZBX_ACTIVE_METRIC_LP(data), clastLogsizeSent, cmtimeSent, clastLogsizeLast, 350 cmtimeLast) 351 352 if ret == Succeed { 353 C.metric_get_meta(C.ZBX_ACTIVE_METRIC_LP(data), &clastLogsizeLast, &cmtimeLast) 354 result := &LogResult{ 355 Ts: time.Now(), 356 LastLogsize: uint64(clastLogsizeLast), 357 Mtime: int(cmtimeLast), 358 } 359 item.Results = append(item.Results, result) 360 } 361 } 362} 363 364func SetMaxLinesPerSecond(num int) { 365 C.CONFIG_MAX_LINES_PER_SECOND = C.int(num) 366} 367