1/*
2** Zabbix
3** Copyright (C) 2001-2021 Zabbix SIA
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18**/
19
20package zbxlib
21
22/*
23#cgo CFLAGS: -I${SRCDIR}/../../../../include
24
25#include "common.h"
26#include "sysinfo.h"
27#include "log.h"
28#include "../src/zabbix_agent/metrics.h"
29#include "../src/zabbix_agent/logfiles/logfiles.h"
30
31extern int CONFIG_MAX_LINES_PER_SECOND;
32
33typedef ZBX_ACTIVE_METRIC* ZBX_ACTIVE_METRIC_LP;
34typedef zbx_vector_ptr_t * zbx_vector_ptr_lp_t;
35typedef char * char_lp_t;
36typedef zbx_vector_pre_persistent_t * zbx_vector_pre_persistent_lp_t;
37
38ZBX_ACTIVE_METRIC *new_metric(char *key, zbx_uint64_t lastlogsize, int mtime, int flags)
39{
40	ZBX_ACTIVE_METRIC *metric = malloc(sizeof(ZBX_ACTIVE_METRIC));
41	memset(metric, 0, sizeof(ZBX_ACTIVE_METRIC));
42	metric->key = key;
43	// key_orig is used in error messages, consider using "itemid: <itemid>" instead of the key
44	metric->key_orig = zbx_strdup(NULL, key);
45	metric->lastlogsize = lastlogsize;
46	metric->mtime = mtime;
47	metric->flags = (unsigned char)flags;
48	metric->skip_old_data = (0 != metric->lastlogsize ? 0 : 1);
49	metric->persistent_file_name = NULL;	// initialized but not used in Agent2
50	return metric;
51}
52
53void metric_set_refresh(ZBX_ACTIVE_METRIC *metric, int refresh)
54{
55	metric->refresh = refresh;
56}
57
58void metric_get_meta(ZBX_ACTIVE_METRIC *metric, zbx_uint64_t *lastlogsize, int *mtime)
59{
60	*lastlogsize = metric->lastlogsize;
61	*mtime = metric->mtime;
62}
63
64void metric_set_unsupported(ZBX_ACTIVE_METRIC *metric)
65{
66	metric->state = ITEM_STATE_NOTSUPPORTED;
67	metric->error_count = 0;
68	metric->start_time = 0.0;
69	metric->processed_bytes = 0;
70}
71
72int metric_set_supported(ZBX_ACTIVE_METRIC *metric, zbx_uint64_t lastlogsize_sent, int mtime_sent,
73		zbx_uint64_t lastlogsize_last, int mtime_last)
74{
75	int	ret = FAIL;
76
77	if (0 == metric->error_count)
78	{
79		unsigned char	old_state = metric->state;
80		if (ITEM_STATE_NOTSUPPORTED == metric->state)
81		{
82			metric->state = ITEM_STATE_NORMAL;
83		}
84
85		if (lastlogsize_sent != metric->lastlogsize || mtime_sent != metric->mtime ||
86				(lastlogsize_last == lastlogsize_sent && mtime_last == mtime_sent &&
87						(old_state != metric->state || 0 != (ZBX_METRIC_FLAG_NEW & metric->flags))))
88		{
89			ret = SUCCEED;
90		}
91		metric->flags &= ~ZBX_METRIC_FLAG_NEW;
92	}
93	return ret;
94}
95
96void	metric_free(ZBX_ACTIVE_METRIC *metric)
97{
98	int	i;
99
100	if (NULL == metric)
101		return;
102
103	zbx_free(metric->key);
104	zbx_free(metric->key_orig);
105
106	for (i = 0; i < metric->logfiles_num; i++)
107		zbx_free(metric->logfiles[i].filename);
108
109	zbx_free(metric->logfiles);
110	zbx_free(metric->persistent_file_name);
111	zbx_free(metric);
112}
113
114typedef struct
115{
116	char *value;
117	int state;
118	zbx_uint64_t lastlogsize;
119	int mtime;
120}
121log_value_t;
122
123typedef struct
124{
125	zbx_vector_ptr_t values;
126	int slots;
127}
128log_result_t, *log_result_lp_t;
129
130static log_result_t *new_log_result(int slots)
131{
132	log_result_t *result;
133
134	result = (log_result_t *)zbx_malloc(NULL, sizeof(log_result_t));
135	zbx_vector_ptr_create(&result->values);
136	result->slots = slots;
137	return result;
138}
139
140static void add_log_value(log_result_t *result, const char *value, int state, zbx_uint64_t lastlogsize, int mtime)
141{
142	log_value_t *log;
143	log = (log_value_t *)zbx_malloc(NULL, sizeof(log_value_t));
144	log->value = zbx_strdup(NULL, value);
145	log->state = state;
146	log->lastlogsize = lastlogsize;
147	log->mtime = mtime;
148	zbx_vector_ptr_append(&result->values, log);
149}
150
151static int get_log_value(log_result_t *result, int index, char **value, int *state, zbx_uint64_t *lastlogsize, int *mtime)
152{
153	log_value_t *log;
154
155	if (index == result->values.values_num)
156		return FAIL;
157
158	log = (log_value_t *)result->values.values[index];
159	*value = log->value;
160	*state = log->state;
161	*lastlogsize = log->lastlogsize;
162	*mtime = log->mtime;
163	return SUCCEED;
164}
165
166static void free_log_value(log_value_t *log)
167{
168	zbx_free(log->value);
169	zbx_free(log);
170}
171
172static void free_log_result(log_result_t *result)
173{
174	zbx_vector_ptr_clear_ext(&result->values, (zbx_clean_func_t)free_log_value);
175	zbx_vector_ptr_destroy(&result->values);
176	zbx_free(result);
177}
178
179int	process_value_cb(const char *server, unsigned short port, const char *host, const char *key,
180		const char *value, unsigned char state, zbx_uint64_t *lastlogsize, const int *mtime,
181		unsigned long *timestamp, const char *source, unsigned short *severity, unsigned long *logeventid,
182		unsigned char flags)
183{
184	log_result_t *result = (log_result_t *)server;
185	if (result->values.values_num == result->slots)
186		return FAIL;
187
188	add_log_value(result, value, state, *lastlogsize, *mtime);
189	return SUCCEED;
190}
191
192static zbx_vector_pre_persistent_lp_t new_prep_vec(void)
193{
194	zbx_vector_pre_persistent_lp_t vect;
195
196	vect = (zbx_vector_pre_persistent_lp_t)zbx_malloc(NULL, sizeof(zbx_vector_pre_persistent_t));
197	zbx_vector_pre_persistent_create(vect);
198	return vect;
199}
200
201static void free_prep_vec(zbx_vector_pre_persistent_lp_t vect)
202{
203	// In Agent2 this vector is expected to be empty because 'persistent directory' parameter is not allowed.
204	// Therefore a simplified cleanup is used.
205	zbx_vector_pre_persistent_destroy(vect);
206	zbx_free(vect);
207}
208*/
209import "C"
210
211import (
212	"errors"
213	"time"
214	"unsafe"
215
216	"zabbix.com/pkg/itemutil"
217)
218
219const (
220	MetricFlagPersistent  = 0x01
221	MetricFlagNew         = 0x02
222	MetricFlagLogLog      = 0x04
223	MetricFlagLogLogrt    = 0x08
224	MetricFlagLogEventlog = 0x10
225	MetricFlagLogCount    = 0x20
226	MetricFlagLog         = MetricFlagLogLog | MetricFlagLogLogrt | MetricFlagLogEventlog
227)
228
229type ResultWriter interface {
230	PersistSlotsAvailable() int
231}
232
233type LogItem struct {
234	LastTs  time.Time // the last log value timestamp + 1ns
235	Results []*LogResult
236	Output  ResultWriter
237}
238
239type LogResult struct {
240	Value       *string
241	Ts          time.Time
242	Error       error
243	LastLogsize uint64
244	Mtime       int
245}
246
247func NewActiveMetric(key string, params []string, lastLogsize uint64, mtime int32) (data unsafe.Pointer, err error) {
248	flags := MetricFlagNew | MetricFlagPersistent
249	switch key {
250	case "log":
251		if len(params) >= 9 && params[8] != "" {
252			return nil, errors.New("The ninth parameter (persistent directory) is not supported by Agent2.")
253		}
254		flags |= MetricFlagLogLog
255	case "logrt":
256		if len(params) >= 9 && params[8] != "" {
257			return nil, errors.New("The ninth parameter (persistent directory) is not supported by Agent2.")
258		}
259		flags |= MetricFlagLogLogrt
260	case "log.count":
261		if len(params) >= 8 && params[7] != "" {
262			return nil, errors.New("The eighth parameter (persistent directory) is not supported by Agent2.")
263		}
264		flags |= MetricFlagLogCount | MetricFlagLogLog
265	case "logrt.count":
266		if len(params) >= 8 && params[7] != "" {
267			return nil, errors.New("The eighth parameter (persistent directory) is not supported by Agent2.")
268		}
269		flags |= MetricFlagLogCount | MetricFlagLogLogrt
270	case "eventlog":
271		flags |= MetricFlagLogEventlog
272	default:
273		return nil, errors.New("Unsupported item key.")
274	}
275	ckey := C.CString(itemutil.MakeKey(key, params))
276	return unsafe.Pointer(C.new_metric(ckey, C.zbx_uint64_t(lastLogsize), C.int(mtime), C.int(flags))), nil
277}
278
279func FreeActiveMetric(data unsafe.Pointer) {
280	C.metric_free(C.ZBX_ACTIVE_METRIC_LP(data))
281}
282
283func ProcessLogCheck(data unsafe.Pointer, item *LogItem, refresh int, cblob unsafe.Pointer) {
284	C.metric_set_refresh(C.ZBX_ACTIVE_METRIC_LP(data), C.int(refresh))
285
286	var clastLogsizeSent, clastLogsizeLast C.zbx_uint64_t
287	var cmtimeSent, cmtimeLast C.int
288	C.metric_get_meta(C.ZBX_ACTIVE_METRIC_LP(data), &clastLogsizeSent, &cmtimeSent)
289	clastLogsizeLast = clastLogsizeSent
290	cmtimeLast = cmtimeSent
291
292	result := C.new_log_result(C.int(item.Output.PersistSlotsAvailable()))
293
294	var cerrmsg *C.char
295	cprepVec := C.new_prep_vec() // In Agent2 it is always empty vector. Not used but required for linking.
296	ret := C.process_log_check(C.char_lp_t(unsafe.Pointer(result)), 0, C.zbx_vector_ptr_lp_t(cblob),
297		C.ZBX_ACTIVE_METRIC_LP(data), C.zbx_process_value_func_t(C.process_value_cb), &clastLogsizeSent,
298		&cmtimeSent, &cerrmsg, cprepVec)
299	C.free_prep_vec(cprepVec)
300
301	// add cached results
302	var cvalue *C.char
303	var clastlogsize C.zbx_uint64_t
304	var cstate, cmtime C.int
305	logTs := time.Now()
306	if logTs.Before(item.LastTs) {
307		logTs = item.LastTs
308	}
309	for i := 0; C.get_log_value(result, C.int(i), &cvalue, &cstate, &clastlogsize, &cmtime) != C.FAIL; i++ {
310		var value string
311		var err error
312		if cstate == C.ITEM_STATE_NORMAL {
313			value = C.GoString(cvalue)
314		} else {
315			err = errors.New(C.GoString(cvalue))
316		}
317
318		r := &LogResult{
319			Value:       &value,
320			Ts:          logTs,
321			Error:       err,
322			LastLogsize: uint64(clastlogsize),
323			Mtime:       int(cmtime),
324		}
325
326		item.Results = append(item.Results, r)
327		logTs = logTs.Add(time.Nanosecond)
328	}
329	C.free_log_result(result)
330
331	item.LastTs = logTs
332
333	if ret == C.FAIL {
334		C.metric_set_unsupported(C.ZBX_ACTIVE_METRIC_LP(data))
335
336		var err error
337		if cerrmsg != nil {
338			err = errors.New(C.GoString(cerrmsg))
339			C.free(unsafe.Pointer(cerrmsg))
340		} else {
341			err = errors.New("Unknown error.")
342		}
343		result := &LogResult{
344			Ts:    time.Now(),
345			Error: err,
346		}
347		item.Results = append(item.Results, result)
348	} else {
349		ret := C.metric_set_supported(C.ZBX_ACTIVE_METRIC_LP(data), clastLogsizeSent, cmtimeSent, clastLogsizeLast,
350			cmtimeLast)
351
352		if ret == Succeed {
353			C.metric_get_meta(C.ZBX_ACTIVE_METRIC_LP(data), &clastLogsizeLast, &cmtimeLast)
354			result := &LogResult{
355				Ts:          time.Now(),
356				LastLogsize: uint64(clastLogsizeLast),
357				Mtime:       int(cmtimeLast),
358			}
359			item.Results = append(item.Results, result)
360		}
361	}
362}
363
364func SetMaxLinesPerSecond(num int) {
365	C.CONFIG_MAX_LINES_PER_SECOND = C.int(num)
366}
367