1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "db.h"
22 #include "log.h"
23 #include "sysinfo.h"
24 #include "zbxserver.h"
25 #include "zbxtasks.h"
26
27 #include "proxy.h"
28 #include "dbcache.h"
29 #include "discovery.h"
30 #include "zbxalgo.h"
31 #include "preproc.h"
32 #include "../zbxcrypto/tls_tcp_active.h"
33 #include "zbxlld.h"
34 #include "events.h"
35
36 extern char *CONFIG_SERVER;
37
38 /* the space reserved in json buffer to hold at least one record plus service data */
39 #define ZBX_DATA_JSON_RESERVED (HISTORY_TEXT_VALUE_LEN * 4 + ZBX_KIBIBYTE * 4)
40
41 #define ZBX_DATA_JSON_RECORD_LIMIT (ZBX_MAX_RECV_DATA_SIZE - ZBX_DATA_JSON_RESERVED)
42 #define ZBX_DATA_JSON_BATCH_LIMIT ((ZBX_MAX_RECV_DATA_SIZE - ZBX_DATA_JSON_RESERVED) / 2)
43
44 /* the maximum number of values processed in one batch */
45 #define ZBX_HISTORY_VALUES_MAX 256
46
47 typedef struct
48 {
49 zbx_uint64_t druleid;
50 zbx_vector_uint64_t dcheckids;
51 zbx_vector_ptr_t ips;
52 }
53 zbx_drule_t;
54
55 typedef struct
56 {
57 char ip[INTERFACE_IP_LEN_MAX];
58 zbx_vector_ptr_t services;
59 }
60 zbx_drule_ip_t;
61
62 extern unsigned int configured_tls_accept_modes;
63
64 typedef struct
65 {
66 const char *field;
67 const char *tag;
68 zbx_json_type_t jt;
69 const char *default_value;
70 }
71 zbx_history_field_t;
72
73 typedef struct
74 {
75 const char *table, *lastidfield;
76 zbx_history_field_t fields[ZBX_MAX_FIELDS];
77 }
78 zbx_history_table_t;
79
80 typedef struct
81 {
82 zbx_uint64_t id;
83 size_t offset;
84 }
85 zbx_id_offset_t;
86
87
88 typedef int (*zbx_client_item_validator_t)(DC_ITEM *item, zbx_socket_t *sock, void *args, char **error);
89
90 typedef struct
91 {
92 zbx_uint64_t hostid;
93 int value;
94 }
95 zbx_host_rights_t;
96
97 static zbx_history_table_t dht = {
98 "proxy_dhistory", "dhistory_lastid",
99 {
100 {"clock", ZBX_PROTO_TAG_CLOCK, ZBX_JSON_TYPE_INT, NULL},
101 {"druleid", ZBX_PROTO_TAG_DRULE, ZBX_JSON_TYPE_INT, NULL},
102 {"dcheckid", ZBX_PROTO_TAG_DCHECK, ZBX_JSON_TYPE_INT, NULL},
103 {"ip", ZBX_PROTO_TAG_IP, ZBX_JSON_TYPE_STRING, NULL},
104 {"dns", ZBX_PROTO_TAG_DNS, ZBX_JSON_TYPE_STRING, NULL},
105 {"port", ZBX_PROTO_TAG_PORT, ZBX_JSON_TYPE_INT, "0"},
106 {"value", ZBX_PROTO_TAG_VALUE, ZBX_JSON_TYPE_STRING, ""},
107 {"status", ZBX_PROTO_TAG_STATUS, ZBX_JSON_TYPE_INT, "0"},
108 {NULL}
109 }
110 };
111
112 static zbx_history_table_t areg = {
113 "proxy_autoreg_host", "autoreg_host_lastid",
114 {
115 {"clock", ZBX_PROTO_TAG_CLOCK, ZBX_JSON_TYPE_INT, NULL},
116 {"host", ZBX_PROTO_TAG_HOST, ZBX_JSON_TYPE_STRING, NULL},
117 {"listen_ip", ZBX_PROTO_TAG_IP, ZBX_JSON_TYPE_STRING, ""},
118 {"listen_dns", ZBX_PROTO_TAG_DNS, ZBX_JSON_TYPE_STRING, ""},
119 {"listen_port", ZBX_PROTO_TAG_PORT, ZBX_JSON_TYPE_STRING, "0"},
120 {"host_metadata", ZBX_PROTO_TAG_HOST_METADATA, ZBX_JSON_TYPE_STRING, ""},
121 {"flags", ZBX_PROTO_TAG_FLAGS, ZBX_JSON_TYPE_STRING, "0"},
122 {"tls_accepted", ZBX_PROTO_TAG_TLS_ACCEPTED, ZBX_JSON_TYPE_INT, "0"},
123 {NULL}
124 }
125 };
126
127 static const char *availability_tag_available[ZBX_AGENT_MAX] = {ZBX_PROTO_TAG_AVAILABLE,
128 ZBX_PROTO_TAG_SNMP_AVAILABLE, ZBX_PROTO_TAG_IPMI_AVAILABLE,
129 ZBX_PROTO_TAG_JMX_AVAILABLE};
130 static const char *availability_tag_error[ZBX_AGENT_MAX] = {ZBX_PROTO_TAG_ERROR,
131 ZBX_PROTO_TAG_SNMP_ERROR, ZBX_PROTO_TAG_IPMI_ERROR,
132 ZBX_PROTO_TAG_JMX_ERROR};
133
134 /******************************************************************************
135 * *
136 * Function: zbx_proxy_check_permissions *
137 * *
138 * Purpose: check proxy connection permissions (encryption configuration and *
139 * if peer proxy address is allowed) *
140 * *
141 * Parameters: *
142 * proxy - [IN] the proxy data *
143 * sock - [IN] connection socket context *
144 * error - [OUT] error message *
145 * *
146 * Return value: *
147 * SUCCEED - connection permission check was successful *
148 * FAIL - otherwise *
149 * *
150 ******************************************************************************/
zbx_proxy_check_permissions(const DC_PROXY * proxy,const zbx_socket_t * sock,char ** error)151 int zbx_proxy_check_permissions(const DC_PROXY *proxy, const zbx_socket_t *sock, char **error)
152 {
153 #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
154 zbx_tls_conn_attr_t attr;
155 #endif
156 if ('\0' != *proxy->proxy_address && FAIL == zbx_tcp_check_allowed_peers(sock, proxy->proxy_address))
157 {
158 *error = zbx_strdup(*error, "connection is not allowed");
159 return FAIL;
160 }
161
162 #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
163 if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type)
164 {
165 if (SUCCEED != zbx_tls_get_attr_cert(sock, &attr))
166 {
167 *error = zbx_strdup(*error, "internal error: cannot get connection attributes");
168 THIS_SHOULD_NEVER_HAPPEN;
169 return FAIL;
170 }
171 }
172 #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK))
173 else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type)
174 {
175 if (SUCCEED != zbx_tls_get_attr_psk(sock, &attr))
176 {
177 *error = zbx_strdup(*error, "internal error: cannot get connection attributes");
178 THIS_SHOULD_NEVER_HAPPEN;
179 return FAIL;
180 }
181 }
182 #endif
183 else if (ZBX_TCP_SEC_UNENCRYPTED != sock->connection_type)
184 {
185 *error = zbx_strdup(*error, "internal error: invalid connection type");
186 THIS_SHOULD_NEVER_HAPPEN;
187 return FAIL;
188 }
189 #endif
190 if (0 == ((unsigned int)proxy->tls_accept & sock->connection_type))
191 {
192 *error = zbx_dsprintf(NULL, "connection of type \"%s\" is not allowed for proxy \"%s\"",
193 zbx_tcp_connection_type_name(sock->connection_type), proxy->host);
194 return FAIL;
195 }
196
197 #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
198 if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type)
199 {
200 /* simplified match, not compliant with RFC 4517, 4518 */
201 if ('\0' != *proxy->tls_issuer && 0 != strcmp(proxy->tls_issuer, attr.issuer))
202 {
203 *error = zbx_dsprintf(*error, "proxy \"%s\" certificate issuer does not match", proxy->host);
204 return FAIL;
205 }
206
207 /* simplified match, not compliant with RFC 4517, 4518 */
208 if ('\0' != *proxy->tls_subject && 0 != strcmp(proxy->tls_subject, attr.subject))
209 {
210 *error = zbx_dsprintf(*error, "proxy \"%s\" certificate subject does not match", proxy->host);
211 return FAIL;
212 }
213 }
214 #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK))
215 else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type)
216 {
217 if (strlen(proxy->tls_psk_identity) != attr.psk_identity_len ||
218 0 != memcmp(proxy->tls_psk_identity, attr.psk_identity, attr.psk_identity_len))
219 {
220 *error = zbx_dsprintf(*error, "proxy \"%s\" is using false PSK identity", proxy->host);
221 return FAIL;
222 }
223 }
224 #endif
225 #endif
226 return SUCCEED;
227 }
228
229 /******************************************************************************
230 * *
231 * Function: zbx_host_check_permissions *
232 * *
233 * Purpose: checks host connection permissions (encryption configuration) *
234 * *
235 * Parameters: *
236 * host - [IN] the host data *
237 * sock - [IN] connection socket context *
238 * error - [OUT] error message *
239 * *
240 * Return value: *
241 * SUCCEED - connection permission check was successful *
242 * FAIL - otherwise *
243 * *
244 ******************************************************************************/
zbx_host_check_permissions(const DC_HOST * host,const zbx_socket_t * sock,char ** error)245 static int zbx_host_check_permissions(const DC_HOST *host, const zbx_socket_t *sock, char **error)
246 {
247 #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
248 zbx_tls_conn_attr_t attr;
249
250 if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type)
251 {
252 if (SUCCEED != zbx_tls_get_attr_cert(sock, &attr))
253 {
254 *error = zbx_strdup(*error, "internal error: cannot get connection attributes");
255 THIS_SHOULD_NEVER_HAPPEN;
256 return FAIL;
257 }
258 }
259 #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK))
260 else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type)
261 {
262 if (SUCCEED != zbx_tls_get_attr_psk(sock, &attr))
263 {
264 *error = zbx_strdup(*error, "internal error: cannot get connection attributes");
265 THIS_SHOULD_NEVER_HAPPEN;
266 return FAIL;
267 }
268 }
269 #endif
270 else if (ZBX_TCP_SEC_UNENCRYPTED != sock->connection_type)
271 {
272 *error = zbx_strdup(*error, "internal error: invalid connection type");
273 THIS_SHOULD_NEVER_HAPPEN;
274 return FAIL;
275 }
276 #endif
277 if (0 == ((unsigned int)host->tls_accept & sock->connection_type))
278 {
279 *error = zbx_dsprintf(NULL, "connection of type \"%s\" is not allowed for host \"%s\"",
280 zbx_tcp_connection_type_name(sock->connection_type), host->host);
281 return FAIL;
282 }
283
284 #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
285 if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type)
286 {
287 /* simplified match, not compliant with RFC 4517, 4518 */
288 if ('\0' != *host->tls_issuer && 0 != strcmp(host->tls_issuer, attr.issuer))
289 {
290 *error = zbx_dsprintf(*error, "host \"%s\" certificate issuer does not match", host->host);
291 return FAIL;
292 }
293
294 /* simplified match, not compliant with RFC 4517, 4518 */
295 if ('\0' != *host->tls_subject && 0 != strcmp(host->tls_subject, attr.subject))
296 {
297 *error = zbx_dsprintf(*error, "host \"%s\" certificate subject does not match", host->host);
298 return FAIL;
299 }
300 }
301 #if defined(HAVE_GNUTLS) || (defined(HAVE_OPENSSL) && defined(HAVE_OPENSSL_WITH_PSK))
302 else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type)
303 {
304 if (strlen(host->tls_psk_identity) != attr.psk_identity_len ||
305 0 != memcmp(host->tls_psk_identity, attr.psk_identity, attr.psk_identity_len))
306 {
307 *error = zbx_dsprintf(*error, "host \"%s\" is using false PSK identity", host->host);
308 return FAIL;
309 }
310 }
311 #endif
312 #endif
313 return SUCCEED;
314 }
315
316 /******************************************************************************
317 * *
318 * Function: get_active_proxy_from_request *
319 * *
320 * Purpose: *
321 * Extract a proxy name from JSON and find the proxy ID in configuration *
322 * cache, and check access rights. The proxy must be configured in active *
323 * mode. *
324 * *
325 * Parameters: *
326 * jp - [IN] JSON with the proxy name *
327 * proxy - [OUT] the proxy data *
328 * error - [OUT] error message *
329 * *
330 * Return value: *
331 * SUCCEED - proxy ID was found in database *
332 * FAIL - an error occurred (e.g. an unknown proxy, the proxy is *
333 * configured in passive mode or access denied) *
334 * *
335 ******************************************************************************/
get_active_proxy_from_request(struct zbx_json_parse * jp,DC_PROXY * proxy,char ** error)336 int get_active_proxy_from_request(struct zbx_json_parse *jp, DC_PROXY *proxy, char **error)
337 {
338 char *ch_error, host[HOST_HOST_LEN_MAX];
339
340 if (SUCCEED != zbx_json_value_by_name(jp, ZBX_PROTO_TAG_HOST, host, HOST_HOST_LEN_MAX, NULL))
341 {
342 *error = zbx_strdup(*error, "missing name of proxy");
343 return FAIL;
344 }
345
346 if (SUCCEED != zbx_check_hostname(host, &ch_error))
347 {
348 *error = zbx_dsprintf(*error, "invalid proxy name \"%s\": %s", host, ch_error);
349 zbx_free(ch_error);
350 return FAIL;
351 }
352
353 return zbx_dc_get_active_proxy_by_name(host, proxy, error);
354 }
355
356 /******************************************************************************
357 * *
358 * Function: check_access_passive_proxy *
359 * *
360 * Purpose: *
361 * Check access rights to a passive proxy for the given connection and *
362 * send a response if denied. *
363 * *
364 * Parameters: *
365 * sock - [IN] connection socket context *
366 * send_response - [IN] to send or not to send a response to server. *
367 * Value: ZBX_SEND_RESPONSE or *
368 * ZBX_DO_NOT_SEND_RESPONSE *
369 * req - [IN] request, included into error message *
370 * *
371 * Return value: *
372 * SUCCEED - access is allowed *
373 * FAIL - access is denied *
374 * *
375 ******************************************************************************/
check_access_passive_proxy(zbx_socket_t * sock,int send_response,const char * req)376 int check_access_passive_proxy(zbx_socket_t *sock, int send_response, const char *req)
377 {
378 char *msg = NULL;
379
380 if (FAIL == zbx_tcp_check_allowed_peers(sock, CONFIG_SERVER))
381 {
382 zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" is not allowed: %s", req, sock->peer,
383 zbx_socket_strerror());
384
385 if (ZBX_SEND_RESPONSE == send_response)
386 zbx_send_proxy_response(sock, FAIL, "connection is not allowed", CONFIG_TIMEOUT);
387
388 return FAIL;
389 }
390
391 if (0 == (configured_tls_accept_modes & sock->connection_type))
392 {
393 msg = zbx_dsprintf(NULL, "%s over connection of type \"%s\" is not allowed", req,
394 zbx_tcp_connection_type_name(sock->connection_type));
395
396 zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" by proxy configuration parameter \"TLSAccept\"",
397 msg, sock->peer);
398
399 if (ZBX_SEND_RESPONSE == send_response)
400 zbx_send_proxy_response(sock, FAIL, msg, CONFIG_TIMEOUT);
401
402 zbx_free(msg);
403 return FAIL;
404 }
405
406 #if defined(HAVE_GNUTLS) || defined(HAVE_OPENSSL)
407 if (ZBX_TCP_SEC_TLS_CERT == sock->connection_type)
408 {
409 if (SUCCEED == zbx_check_server_issuer_subject(sock, &msg))
410 return SUCCEED;
411
412 zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" is not allowed: %s", req, sock->peer, msg);
413
414 if (ZBX_SEND_RESPONSE == send_response)
415 zbx_send_proxy_response(sock, FAIL, "certificate issuer or subject mismatch", CONFIG_TIMEOUT);
416
417 zbx_free(msg);
418 return FAIL;
419 }
420 else if (ZBX_TCP_SEC_TLS_PSK == sock->connection_type)
421 {
422 if (0 != (ZBX_PSK_FOR_PROXY & zbx_tls_get_psk_usage()))
423 return SUCCEED;
424
425 zabbix_log(LOG_LEVEL_WARNING, "%s from server \"%s\" is not allowed: it used PSK which is not"
426 " configured for proxy communication with server", req, sock->peer);
427
428 if (ZBX_SEND_RESPONSE == send_response)
429 zbx_send_proxy_response(sock, FAIL, "wrong PSK used", CONFIG_TIMEOUT);
430
431 return FAIL;
432 }
433 #endif
434 return SUCCEED;
435 }
436
437 /******************************************************************************
438 * *
439 * Function: proxyconfig_add_row *
440 * *
441 * Purpose: add database row to the proxy config json data *
442 * *
443 * Parameters: j - [OUT] the output json *
444 * row - [IN] the database row to add *
445 * table - [IN] the table configuration *
446 * *
447 ******************************************************************************/
proxyconfig_add_row(struct zbx_json * j,const DB_ROW row,const ZBX_TABLE * table)448 static void proxyconfig_add_row(struct zbx_json *j, const DB_ROW row, const ZBX_TABLE *table)
449 {
450 int fld = 0, i;
451
452 zbx_json_addstring(j, NULL, row[fld++], ZBX_JSON_TYPE_INT);
453
454 for (i = 0; 0 != table->fields[i].name; i++)
455 {
456 if (0 == (table->fields[i].flags & ZBX_PROXY))
457 continue;
458
459 switch (table->fields[i].type)
460 {
461 case ZBX_TYPE_INT:
462 case ZBX_TYPE_UINT:
463 case ZBX_TYPE_ID:
464 if (SUCCEED != DBis_null(row[fld]))
465 zbx_json_addstring(j, NULL, row[fld], ZBX_JSON_TYPE_INT);
466 else
467 zbx_json_addstring(j, NULL, NULL, ZBX_JSON_TYPE_NULL);
468 break;
469 default:
470 zbx_json_addstring(j, NULL, row[fld], ZBX_JSON_TYPE_STRING);
471 break;
472 }
473 fld++;
474 }
475 }
476
477 typedef struct
478 {
479 zbx_uint64_t itemid;
480 zbx_uint64_t master_itemid;
481 char *buffer;
482 }
483 zbx_proxy_item_config_t;
484
485 /******************************************************************************
486 * *
487 * Function: get_proxyconfig_table_items *
488 * *
489 * Purpose: prepare items table proxy configuration data *
490 * *
491 ******************************************************************************/
get_proxyconfig_table_items(zbx_uint64_t proxy_hostid,struct zbx_json * j,const ZBX_TABLE * table,zbx_hashset_t * itemids)492 static int get_proxyconfig_table_items(zbx_uint64_t proxy_hostid, struct zbx_json *j, const ZBX_TABLE *table,
493 zbx_hashset_t *itemids)
494 {
495 char *sql = NULL;
496 size_t sql_alloc = 4 * ZBX_KIBIBYTE, sql_offset = 0;
497 int f, fld, fld_type = -1, fld_key = -1, fld_master = -1, ret = SUCCEED;
498 DB_RESULT result;
499 DB_ROW row;
500 zbx_hashset_t proxy_items;
501 zbx_vector_ptr_t items;
502 zbx_uint64_t itemid;
503 zbx_hashset_iter_t iter;
504 struct zbx_json json_array;
505
506 zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxy_hostid:" ZBX_FS_UI64, __func__, proxy_hostid);
507
508 zbx_json_addobject(j, table->table);
509 zbx_json_addarray(j, "fields");
510
511 sql = (char *)zbx_malloc(sql, sql_alloc);
512
513 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select t.%s", table->recid);
514
515 zbx_json_addstring(j, NULL, table->recid, ZBX_JSON_TYPE_STRING);
516
517 for (f = 0, fld = 1; 0 != table->fields[f].name; f++)
518 {
519 if (0 == (table->fields[f].flags & ZBX_PROXY))
520 continue;
521
522 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",t.");
523 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, table->fields[f].name);
524
525 zbx_json_addstring(j, NULL, table->fields[f].name, ZBX_JSON_TYPE_STRING);
526
527 if (0 == strcmp(table->fields[f].name, "type"))
528 fld_type = fld;
529 else if (0 == strcmp(table->fields[f].name, "key_"))
530 fld_key = fld;
531 else if (0 == strcmp(table->fields[f].name, "master_itemid"))
532 fld_master = fld;
533 fld++;
534 }
535
536 if (-1 == fld_type || -1 == fld_key || -1 == fld_master)
537 {
538 THIS_SHOULD_NEVER_HAPPEN;
539 exit(EXIT_FAILURE);
540 }
541
542 zbx_json_close(j); /* fields */
543
544 zbx_json_addarray(j, "data");
545
546 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
547 " from items t,hosts r where t.hostid=r.hostid"
548 " and r.proxy_hostid=" ZBX_FS_UI64
549 " and r.status in (%d,%d)"
550 " and t.flags<>%d"
551 " and t.type in (%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d)"
552 " order by t.%s",
553 proxy_hostid,
554 HOST_STATUS_MONITORED, HOST_STATUS_NOT_MONITORED,
555 ZBX_FLAG_DISCOVERY_PROTOTYPE,
556 ITEM_TYPE_ZABBIX, ITEM_TYPE_ZABBIX_ACTIVE, ITEM_TYPE_SNMP, ITEM_TYPE_IPMI, ITEM_TYPE_TRAPPER,
557 ITEM_TYPE_SIMPLE, ITEM_TYPE_HTTPTEST, ITEM_TYPE_EXTERNAL, ITEM_TYPE_DB_MONITOR, ITEM_TYPE_SSH,
558 ITEM_TYPE_TELNET, ITEM_TYPE_JMX, ITEM_TYPE_SNMPTRAP, ITEM_TYPE_INTERNAL,
559 ITEM_TYPE_HTTPAGENT, ITEM_TYPE_DEPENDENT,
560 table->recid);
561
562 if (NULL == (result = DBselect("%s", sql)))
563 {
564 ret = FAIL;
565 goto skip_data;
566 }
567
568 zbx_hashset_create(&proxy_items, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
569 zbx_json_initarray(&json_array, 256);
570
571 while (NULL != (row = DBfetch(result)))
572 {
573 if (SUCCEED == is_item_processed_by_server(atoi(row[fld_type]), row[fld_key]))
574 continue;
575
576 if (SUCCEED != DBis_null(row[fld_master]))
577 {
578 zbx_proxy_item_config_t proxy_item_local, *proxy_item;
579
580 ZBX_STR2UINT64(proxy_item_local.itemid, row[0]);
581 ZBX_STR2UINT64(proxy_item_local.master_itemid, row[fld_master]);
582 proxy_item = zbx_hashset_insert(&proxy_items, &proxy_item_local, sizeof(proxy_item_local));
583
584 proxyconfig_add_row(&json_array, row, table);
585
586 proxy_item->buffer = zbx_malloc(NULL, json_array.buffer_size + 1);
587 memcpy(proxy_item->buffer, json_array.buffer, json_array.buffer_size + 1);
588
589 zbx_json_cleanarray(&json_array);
590 }
591 else
592 {
593 ZBX_STR2UINT64(itemid, row[0]);
594 zbx_hashset_insert(itemids, &itemid, sizeof(itemid));
595
596 zbx_json_addarray(j, NULL);
597 proxyconfig_add_row(j, row, table);
598 zbx_json_close(j);
599 }
600 }
601 DBfree_result(result);
602 zbx_json_free(&json_array);
603
604 /* flush cached dependent items */
605
606 zbx_vector_ptr_create(&items);
607 while (0 != proxy_items.num_data)
608 {
609 zbx_proxy_item_config_t *proxy_item;
610 int i;
611
612 zbx_hashset_iter_reset(&proxy_items, &iter);
613 while (NULL != (proxy_item = (zbx_proxy_item_config_t *)zbx_hashset_iter_next(&iter)))
614 {
615 if (NULL == zbx_hashset_search(&proxy_items, &proxy_item->master_itemid))
616 zbx_vector_ptr_append(&items, proxy_item);
617 }
618
619 if (0 == items.values_num)
620 {
621 THIS_SHOULD_NEVER_HAPPEN;
622 exit(EXIT_FAILURE);
623 }
624
625 zbx_vector_ptr_sort(&items, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
626 for (i = 0; i < items.values_num; i++)
627 {
628 proxy_item = (zbx_proxy_item_config_t *)items.values[i];
629 if (NULL != zbx_hashset_search(itemids, &proxy_item->master_itemid))
630 {
631 zbx_hashset_insert(itemids, &proxy_item->itemid, sizeof(itemid));
632 zbx_json_addraw(j, NULL, proxy_item->buffer);
633 }
634 zbx_free(proxy_item->buffer);
635 zbx_hashset_remove_direct(&proxy_items, proxy_item);
636 }
637
638 zbx_vector_ptr_clear(&items);
639 }
640 zbx_vector_ptr_destroy(&items);
641 zbx_hashset_destroy(&proxy_items);
642 skip_data:
643 zbx_free(sql);
644
645 zbx_json_close(j); /* data */
646 zbx_json_close(j); /* table->table */
647
648 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
649
650 return ret;
651 }
652
653 /******************************************************************************
654 * *
655 * Function: get_proxyconfig_table_items *
656 * *
657 * Purpose: prepare items table proxy configuration data *
658 * *
659 ******************************************************************************/
get_proxyconfig_table_items_ext(zbx_uint64_t proxy_hostid,const zbx_hashset_t * itemids,struct zbx_json * j,const ZBX_TABLE * table)660 static int get_proxyconfig_table_items_ext(zbx_uint64_t proxy_hostid, const zbx_hashset_t *itemids,
661 struct zbx_json *j, const ZBX_TABLE *table)
662 {
663 char *sql = NULL;
664 size_t sql_alloc = 4 * ZBX_KIBIBYTE, sql_offset = 0;
665 int f, ret = SUCCEED, index = 1, itemid_index = 0;
666 DB_RESULT result;
667 DB_ROW row;
668
669 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:%s", __func__, table->table);
670
671 zbx_json_addobject(j, table->table);
672 zbx_json_addarray(j, "fields");
673
674 sql = (char *)zbx_malloc(sql, sql_alloc);
675
676 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select t.%s", table->recid);
677
678 zbx_json_addstring(j, NULL, table->recid, ZBX_JSON_TYPE_STRING);
679
680 for (f = 0; 0 != table->fields[f].name; f++)
681 {
682 if (0 == (table->fields[f].flags & ZBX_PROXY))
683 continue;
684
685 /* either the table uses itemid as primary key, then it will be stored in the */
686 /* first (0) column as record id, or it will have reference to items table */
687 /* through itemid field */
688 if (0 == strcmp(table->fields[f].name, "itemid"))
689 itemid_index = index;
690
691 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",t.");
692 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, table->fields[f].name);
693 zbx_json_addstring(j, NULL, table->fields[f].name, ZBX_JSON_TYPE_STRING);
694 index++;
695 }
696
697 zbx_json_close(j); /* fields */
698
699 zbx_json_addarray(j, "data");
700
701 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
702 " from %s t,items i,hosts h"
703 " where t.itemid=i.itemid"
704 " and i.hostid=h.hostid"
705 " and h.proxy_hostid=" ZBX_FS_UI64,
706 table->table, proxy_hostid);
707 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " order by t.");
708 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, table->recid);
709
710 if (NULL == (result = DBselect("%s", sql)))
711 {
712 ret = FAIL;
713 goto skip_data;
714 }
715
716 while (NULL != (row = DBfetch(result)))
717 {
718 zbx_uint64_t itemid;
719
720 ZBX_STR2UINT64(itemid, row[itemid_index]);
721 if (NULL != zbx_hashset_search((zbx_hashset_t *)itemids, &itemid))
722 {
723 zbx_json_addarray(j, NULL);
724 proxyconfig_add_row(j, row, table);
725 zbx_json_close(j);
726 }
727 }
728 DBfree_result(result);
729 skip_data:
730 zbx_free(sql);
731
732 zbx_json_close(j); /* data */
733 zbx_json_close(j); /* table->table */
734
735 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
736
737 return ret;
738 }
739
740 /******************************************************************************
741 * *
742 * Function: get_proxyconfig_table *
743 * *
744 * Purpose: prepare proxy configuration data *
745 * *
746 ******************************************************************************/
get_proxyconfig_table(zbx_uint64_t proxy_hostid,struct zbx_json * j,const ZBX_TABLE * table,zbx_vector_uint64_t * hosts,zbx_vector_uint64_t * httptests)747 static int get_proxyconfig_table(zbx_uint64_t proxy_hostid, struct zbx_json *j, const ZBX_TABLE *table,
748 zbx_vector_uint64_t *hosts, zbx_vector_uint64_t *httptests)
749 {
750 char *sql = NULL;
751 size_t sql_alloc = 4 * ZBX_KIBIBYTE, sql_offset = 0;
752 int f, ret = SUCCEED;
753 DB_RESULT result;
754 DB_ROW row;
755
756 zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxy_hostid:" ZBX_FS_UI64 " table:'%s'",
757 __func__, proxy_hostid, table->table);
758
759 zbx_json_addobject(j, table->table);
760 zbx_json_addarray(j, "fields");
761
762 sql = (char *)zbx_malloc(sql, sql_alloc);
763
764 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select t.%s", table->recid);
765
766 zbx_json_addstring(j, NULL, table->recid, ZBX_JSON_TYPE_STRING);
767
768 for (f = 0; 0 != table->fields[f].name; f++)
769 {
770 if (0 == (table->fields[f].flags & ZBX_PROXY))
771 continue;
772
773 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",t.");
774 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, table->fields[f].name);
775
776 zbx_json_addstring(j, NULL, table->fields[f].name, ZBX_JSON_TYPE_STRING);
777 }
778
779 zbx_json_close(j); /* fields */
780
781 zbx_json_addarray(j, "data");
782
783 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " from %s t", table->table);
784
785 if (SUCCEED == str_in_list("hosts,interface,hosts_templates,hostmacro", table->table, ','))
786 {
787 if (0 == hosts->values_num)
788 goto skip_data;
789
790 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " where");
791 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.hostid", hosts->values, hosts->values_num);
792 }
793 else if (0 == strcmp(table->table, "interface_snmp"))
794 {
795 if (0 == hosts->values_num)
796 goto skip_data;
797
798 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",interface h where t.interfaceid=h.interfaceid and");
799 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "h.hostid", hosts->values, hosts->values_num);
800 }
801 else if (0 == strcmp(table->table, "drules"))
802 {
803 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
804 " where t.proxy_hostid=" ZBX_FS_UI64
805 " and t.status=%d",
806 proxy_hostid, DRULE_STATUS_MONITORED);
807 }
808 else if (0 == strcmp(table->table, "dchecks"))
809 {
810 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
811 ",drules r where t.druleid=r.druleid"
812 " and r.proxy_hostid=" ZBX_FS_UI64
813 " and r.status=%d",
814 proxy_hostid, DRULE_STATUS_MONITORED);
815 }
816 else if (0 == strcmp(table->table, "hstgrp"))
817 {
818 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ",config r where t.groupid=r.discovery_groupid");
819 }
820 else if (SUCCEED == str_in_list("httptest,httptest_field,httptestitem,httpstep", table->table, ','))
821 {
822 if (0 == httptests->values_num)
823 goto skip_data;
824
825 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " where");
826 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "t.httptestid",
827 httptests->values, httptests->values_num);
828 }
829 else if (SUCCEED == str_in_list("httpstepitem,httpstep_field", table->table, ','))
830 {
831 if (0 == httptests->values_num)
832 goto skip_data;
833
834 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset,
835 ",httpstep r where t.httpstepid=r.httpstepid"
836 " and");
837 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "r.httptestid",
838 httptests->values, httptests->values_num);
839 }
840
841 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " order by t.");
842 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, table->recid);
843
844 if (NULL == (result = DBselect("%s", sql)))
845 {
846 ret = FAIL;
847 goto skip_data;
848 }
849
850 while (NULL != (row = DBfetch(result)))
851 {
852 zbx_json_addarray(j, NULL);
853 proxyconfig_add_row(j, row, table);
854 zbx_json_close(j);
855 }
856 DBfree_result(result);
857 skip_data:
858 zbx_free(sql);
859
860 zbx_json_close(j); /* data */
861 zbx_json_close(j); /* table->table */
862
863 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
864
865 return ret;
866 }
867
get_proxy_monitored_hosts(zbx_uint64_t proxy_hostid,zbx_vector_uint64_t * hosts)868 static void get_proxy_monitored_hosts(zbx_uint64_t proxy_hostid, zbx_vector_uint64_t *hosts)
869 {
870 DB_RESULT result;
871 DB_ROW row;
872 zbx_uint64_t hostid, *ids = NULL;
873 int ids_alloc = 0, ids_num = 0;
874 char *sql = NULL;
875 size_t sql_alloc = 512, sql_offset;
876
877 sql = (char *)zbx_malloc(sql, sql_alloc * sizeof(char));
878
879 result = DBselect(
880 "select hostid"
881 " from hosts"
882 " where proxy_hostid=" ZBX_FS_UI64
883 " and status in (%d,%d)"
884 " and flags<>%d",
885 proxy_hostid, HOST_STATUS_MONITORED, HOST_STATUS_NOT_MONITORED, ZBX_FLAG_DISCOVERY_PROTOTYPE);
886
887 while (NULL != (row = DBfetch(result)))
888 {
889 ZBX_STR2UINT64(hostid, row[0]);
890
891 zbx_vector_uint64_append(hosts, hostid);
892 uint64_array_add(&ids, &ids_alloc, &ids_num, hostid, 64);
893 }
894 DBfree_result(result);
895
896 while (0 != ids_num)
897 {
898 sql_offset = 0;
899 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset,
900 "select distinct templateid"
901 " from hosts_templates"
902 " where");
903 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hostid", ids, ids_num);
904
905 ids_num = 0;
906
907 result = DBselect("%s", sql);
908
909 while (NULL != (row = DBfetch(result)))
910 {
911 ZBX_STR2UINT64(hostid, row[0]);
912
913 zbx_vector_uint64_append(hosts, hostid);
914 uint64_array_add(&ids, &ids_alloc, &ids_num, hostid, 64);
915 }
916 DBfree_result(result);
917 }
918
919 zbx_free(ids);
920 zbx_free(sql);
921
922 zbx_vector_uint64_sort(hosts, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
923 }
924
get_proxy_monitored_httptests(zbx_uint64_t proxy_hostid,zbx_vector_uint64_t * httptests)925 static void get_proxy_monitored_httptests(zbx_uint64_t proxy_hostid, zbx_vector_uint64_t *httptests)
926 {
927 DB_RESULT result;
928 DB_ROW row;
929 zbx_uint64_t httptestid;
930
931 result = DBselect(
932 "select httptestid"
933 " from httptest t,hosts h"
934 " where t.hostid=h.hostid"
935 " and t.status=%d"
936 " and h.proxy_hostid=" ZBX_FS_UI64
937 " and h.status=%d",
938 HTTPTEST_STATUS_MONITORED, proxy_hostid, HOST_STATUS_MONITORED);
939
940 while (NULL != (row = DBfetch(result)))
941 {
942 ZBX_STR2UINT64(httptestid, row[0]);
943
944 zbx_vector_uint64_append(httptests, httptestid);
945 }
946 DBfree_result(result);
947
948 zbx_vector_uint64_sort(httptests, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
949 }
950
951 /******************************************************************************
952 * *
953 * Function: get_proxyconfig_data *
954 * *
955 * Purpose: prepare proxy configuration data *
956 * *
957 ******************************************************************************/
get_proxyconfig_data(zbx_uint64_t proxy_hostid,struct zbx_json * j,char ** error)958 int get_proxyconfig_data(zbx_uint64_t proxy_hostid, struct zbx_json *j, char **error)
959 {
960 static const char *proxytable[] =
961 {
962 "globalmacro",
963 "hosts",
964 "interface",
965 "interface_snmp",
966 "hosts_templates",
967 "hostmacro",
968 "items",
969 "item_rtdata",
970 "item_preproc",
971 "drules",
972 "dchecks",
973 "regexps",
974 "expressions",
975 "hstgrp",
976 "config",
977 "httptest",
978 "httptestitem",
979 "httptest_field",
980 "httpstep",
981 "httpstepitem",
982 "httpstep_field",
983 "config_autoreg_tls",
984 NULL
985 };
986
987 int i, ret = FAIL;
988 const ZBX_TABLE *table;
989 zbx_vector_uint64_t hosts, httptests;
990 zbx_hashset_t itemids;
991
992 zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxy_hostid:" ZBX_FS_UI64, __func__, proxy_hostid);
993
994 zbx_hashset_create(&itemids, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
995 zbx_vector_uint64_create(&hosts);
996 zbx_vector_uint64_create(&httptests);
997
998 DBbegin();
999 get_proxy_monitored_hosts(proxy_hostid, &hosts);
1000 get_proxy_monitored_httptests(proxy_hostid, &httptests);
1001
1002 for (i = 0; NULL != proxytable[i]; i++)
1003 {
1004 table = DBget_table(proxytable[i]);
1005
1006 if (0 == strcmp(proxytable[i], "items"))
1007 {
1008 ret = get_proxyconfig_table_items(proxy_hostid, j, table, &itemids);
1009 }
1010 else if (0 == strcmp(proxytable[i], "item_preproc") || 0 == strcmp(proxytable[i], "item_rtdata"))
1011 {
1012 if (0 != itemids.num_data)
1013 ret = get_proxyconfig_table_items_ext(proxy_hostid, &itemids, j, table);
1014 }
1015 else
1016 ret = get_proxyconfig_table(proxy_hostid, j, table, &hosts, &httptests);
1017
1018 if (SUCCEED != ret)
1019 {
1020 *error = zbx_dsprintf(*error, "failed to get data from table \"%s\"", table->table);
1021 goto out;
1022 }
1023 }
1024
1025 ret = SUCCEED;
1026 out:
1027 DBcommit();
1028 zbx_vector_uint64_destroy(&httptests);
1029 zbx_vector_uint64_destroy(&hosts);
1030 zbx_hashset_destroy(&itemids);
1031
1032 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1033
1034 return ret;
1035 }
1036
1037 /******************************************************************************
1038 * *
1039 * Function: remember_record *
1040 * *
1041 * Purpose: A record is stored as a sequence of fields and flag bytes for *
1042 * handling NULL values. A field is stored as a null-terminated *
1043 * string to preserve field boundaries. If a field value can be NULL *
1044 * a flag byte is inserted after the field to distinguish between *
1045 * empty string and NULL value. The flag byte can be '\1' *
1046 * (not NULL value) or '\2' (NULL value). *
1047 * *
1048 * Examples of representation: *
1049 * \0\2 - the field can be NULL and it is NULL *
1050 * \0\1 - the field can be NULL but is empty string *
1051 * abc\0\1 - the field can be NULL but is a string "abc" *
1052 * \0 - the field can not be NULL and is empty string *
1053 * abc\0 - the field can not be NULL and is a string "abc" *
1054 * *
1055 ******************************************************************************/
remember_record(const ZBX_FIELD ** fields,int fields_count,char ** recs,size_t * recs_alloc,size_t * recs_offset,DB_ROW row)1056 static void remember_record(const ZBX_FIELD **fields, int fields_count, char **recs, size_t *recs_alloc,
1057 size_t *recs_offset, DB_ROW row)
1058 {
1059 int f;
1060
1061 for (f = 0; f < fields_count; f++)
1062 {
1063 if (0 != (fields[f]->flags & ZBX_NOTNULL))
1064 {
1065 zbx_strcpy_alloc(recs, recs_alloc, recs_offset, row[f]);
1066 *recs_offset += sizeof(char);
1067 }
1068 else if (SUCCEED != DBis_null(row[f]))
1069 {
1070 zbx_strcpy_alloc(recs, recs_alloc, recs_offset, row[f]);
1071 *recs_offset += sizeof(char);
1072 zbx_chrcpy_alloc(recs, recs_alloc, recs_offset, '\1');
1073 }
1074 else
1075 {
1076 zbx_strcpy_alloc(recs, recs_alloc, recs_offset, "");
1077 *recs_offset += sizeof(char);
1078 zbx_chrcpy_alloc(recs, recs_alloc, recs_offset, '\2');
1079 }
1080 }
1081 }
1082
id_offset_hash_func(const void * data)1083 static zbx_hash_t id_offset_hash_func(const void *data)
1084 {
1085 const zbx_id_offset_t *p = (zbx_id_offset_t *)data;
1086
1087 return ZBX_DEFAULT_UINT64_HASH_ALGO(&p->id, sizeof(zbx_uint64_t), ZBX_DEFAULT_HASH_SEED);
1088 }
1089
id_offset_compare_func(const void * d1,const void * d2)1090 static int id_offset_compare_func(const void *d1, const void *d2)
1091 {
1092 const zbx_id_offset_t *p1 = (zbx_id_offset_t *)d1, *p2 = (zbx_id_offset_t *)d2;
1093
1094 return ZBX_DEFAULT_UINT64_COMPARE_FUNC(&p1->id, &p2->id);
1095 }
1096
1097 /******************************************************************************
1098 * *
1099 * Function: find_field_by_name *
1100 * *
1101 * Purpose: find a number of the field *
1102 * *
1103 ******************************************************************************/
find_field_by_name(const ZBX_FIELD ** fields,int fields_count,const char * field_name)1104 static int find_field_by_name(const ZBX_FIELD **fields, int fields_count, const char *field_name)
1105 {
1106 int f;
1107
1108 for (f = 0; f < fields_count; f++)
1109 {
1110 if (0 == strcmp(fields[f]->name, field_name))
1111 break;
1112 }
1113
1114 return f;
1115 }
1116
1117 /******************************************************************************
1118 * *
1119 * Function: compare_nth_field *
1120 * *
1121 * Purpose: This function compares a value from JSON record with the value *
1122 * of the n-th field of DB record. For description how DB record is *
1123 * stored in memory see comments in function remember_record(). *
1124 * *
1125 * Comparing deals with 4 cases: *
1126 * - JSON value is not NULL, DB value is not NULL *
1127 * - JSON value is not NULL, DB value is NULL *
1128 * - JSON value is NULL, DB value is NULL *
1129 * - JSON value is NULL, DB value is not NULL *
1130 * *
1131 ******************************************************************************/
compare_nth_field(const ZBX_FIELD ** fields,const char * rec_data,int n,const char * str,int is_null,int * last_n,size_t * last_pos)1132 static int compare_nth_field(const ZBX_FIELD **fields, const char *rec_data, int n, const char *str, int is_null,
1133 int *last_n, size_t *last_pos)
1134 {
1135 int i = *last_n, null_in_db = 0;
1136 const char *p = rec_data + *last_pos, *field_start = NULL;
1137
1138 do /* find starting position of the n-th field */
1139 {
1140 field_start = p;
1141 while ('\0' != *p++)
1142 ;
1143
1144 null_in_db = 0;
1145
1146 if (0 == (fields[i++]->flags & ZBX_NOTNULL)) /* field could be NULL */
1147 {
1148 if ('\2' == *p && (rec_data == p - 1 || '\0' == *(p - 2) || '\1' == *(p - 2) ||
1149 '\2' == *(p - 2))) /* field value is NULL */
1150 {
1151 null_in_db = 1;
1152 p++;
1153 }
1154 else if ('\1' == *p)
1155 {
1156 p++;
1157 }
1158 else
1159 {
1160 THIS_SHOULD_NEVER_HAPPEN;
1161 *last_n = 0;
1162 *last_pos = 0;
1163 return 1;
1164 }
1165 }
1166 }
1167 while (n >= i);
1168
1169 *last_n = i; /* preserve number of field and its start position */
1170 *last_pos = (size_t)(p - rec_data); /* across calls to avoid searching from start */
1171
1172 if (0 == is_null) /* value in JSON is not NULL */
1173 {
1174 if (0 == null_in_db)
1175 return strcmp(field_start, str);
1176 else
1177 return 1;
1178 }
1179 else
1180 {
1181 if ('\0' == *str)
1182 {
1183 if (1 == null_in_db)
1184 return 0; /* fields are "equal" - both contain NULL */
1185 else
1186 return 1;
1187 }
1188 else
1189 {
1190 THIS_SHOULD_NEVER_HAPPEN;
1191 *last_n = 0;
1192 *last_pos = 0;
1193 return 1;
1194 }
1195 }
1196 }
1197
1198 /******************************************************************************
1199 * *
1200 * Function: process_proxyconfig_table *
1201 * *
1202 * Purpose: update configuration table *
1203 * *
1204 * Parameters: ... *
1205 * del - [OUT] ids of the removed records that must be deleted *
1206 * from database *
1207 * *
1208 * Return value: SUCCEED - processed successfully *
1209 * FAIL - an error occurred *
1210 * *
1211 ******************************************************************************/
process_proxyconfig_table(const ZBX_TABLE * table,struct zbx_json_parse * jp_obj,zbx_vector_uint64_t * del,char ** error)1212 static int process_proxyconfig_table(const ZBX_TABLE *table, struct zbx_json_parse *jp_obj,
1213 zbx_vector_uint64_t *del, char **error)
1214 {
1215 int f, fields_count, ret = FAIL, id_field_nr = 0, move_out = 0,
1216 move_field_nr = 0;
1217 const ZBX_FIELD *fields[ZBX_MAX_FIELDS];
1218 struct zbx_json_parse jp_data, jp_row;
1219 const char *p, *pf;
1220 zbx_uint64_t recid, *p_recid = NULL;
1221 zbx_vector_uint64_t ins, moves, availability_hostids;
1222 char *buf = NULL, *esc, *sql = NULL, *recs = NULL;
1223 size_t sql_alloc = 4 * ZBX_KIBIBYTE, sql_offset,
1224 recs_alloc = 20 * ZBX_KIBIBYTE, recs_offset = 0,
1225 buf_alloc = 0;
1226 DB_RESULT result;
1227 DB_ROW row;
1228 zbx_hashset_t h_id_offsets, h_del;
1229 zbx_hashset_iter_t iter;
1230 zbx_id_offset_t id_offset, *p_id_offset = NULL;
1231 zbx_db_insert_t db_insert;
1232 zbx_vector_ptr_t values;
1233 static zbx_vector_ptr_t skip_fields, availability_fields;
1234 static const ZBX_TABLE *table_items, *table_hosts;
1235
1236 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s'", __func__, table->table);
1237
1238 /************************************************************************************/
1239 /* T1. RECEIVED JSON (jp_obj) DATA FORMAT */
1240 /************************************************************************************/
1241 /* Line | Data | Corresponding structure in DB */
1242 /* -----+-------------------------------------------+------------------------------ */
1243 /* 1 | { | */
1244 /* 2 | "hosts": { | first table */
1245 /* 3 | "fields": [ | list of table's columns */
1246 /* 4 | "hostid", | first column */
1247 /* 5 | "host", | second column */
1248 /* 6 | ... | ...columns */
1249 /* 7 | ], | */
1250 /* 8 | "data": [ | the table data */
1251 /* 9 | [ | first entry */
1252 /* 10 | 1, | value for first column */
1253 /* 11 | "zbx01", | value for second column */
1254 /* 12 | ... | ...values */
1255 /* 13 | ], | */
1256 /* 14 | [ | second entry */
1257 /* 15 | 2, | value for first column */
1258 /* 16 | "zbx02", | value for second column */
1259 /* 17 | ... | ...values */
1260 /* 18 | ], | */
1261 /* 19 | ... | ...entries */
1262 /* 20 | ] | */
1263 /* 21 | }, | */
1264 /* 22 | "items": { | second table */
1265 /* 23 | ... | ... */
1266 /* 24 | }, | */
1267 /* 25 | ... | ...tables */
1268 /* 26 | } | */
1269 /************************************************************************************/
1270
1271 if (NULL == table_items)
1272 {
1273 table_items = DBget_table("item_rtdata");
1274
1275 /* do not update existing lastlogsize and mtime fields */
1276 zbx_vector_ptr_create(&skip_fields);
1277 zbx_vector_ptr_append(&skip_fields, (void *)DBget_field(table_items, "lastlogsize"));
1278 zbx_vector_ptr_append(&skip_fields, (void *)DBget_field(table_items, "mtime"));
1279 zbx_vector_ptr_sort(&skip_fields, ZBX_DEFAULT_PTR_COMPARE_FUNC);
1280 }
1281
1282 if (NULL == table_hosts)
1283 {
1284 table_hosts = DBget_table("hosts");
1285
1286 /* do not update existing lastlogsize and mtime fields */
1287 zbx_vector_ptr_create(&availability_fields);
1288 zbx_vector_ptr_append(&availability_fields, (void *)DBget_field(table_hosts, "available"));
1289 zbx_vector_ptr_append(&availability_fields, (void *)DBget_field(table_hosts, "snmp_available"));
1290 zbx_vector_ptr_append(&availability_fields, (void *)DBget_field(table_hosts, "ipmi_available"));
1291 zbx_vector_ptr_append(&availability_fields, (void *)DBget_field(table_hosts, "jmx_available"));
1292 zbx_vector_ptr_sort(&availability_fields, ZBX_DEFAULT_PTR_COMPARE_FUNC);
1293 }
1294
1295 /* get table columns (line 3 in T1) */
1296 if (FAIL == zbx_json_brackets_by_name(jp_obj, "fields", &jp_data))
1297 {
1298 *error = zbx_strdup(*error, zbx_json_strerror());
1299 goto out;
1300 }
1301
1302 p = NULL;
1303 /* iterate column names (lines 4-6 in T1) */
1304 for (fields_count = 0; NULL != (p = zbx_json_next_value_dyn(&jp_data, p, &buf, &buf_alloc, NULL)); fields_count++)
1305 {
1306 if (NULL == (fields[fields_count] = DBget_field(table, buf)))
1307 {
1308 *error = zbx_dsprintf(*error, "invalid field name \"%s.%s\"", table->table, buf);
1309 goto out;
1310 }
1311
1312 if (0 == (fields[fields_count]->flags & ZBX_PROXY) &&
1313 (0 != strcmp(table->recid, buf) || ZBX_TYPE_ID != fields[fields_count]->type))
1314 {
1315 *error = zbx_dsprintf(*error, "unexpected field \"%s.%s\"", table->table, buf);
1316 goto out;
1317 }
1318 }
1319
1320 if (0 == fields_count)
1321 {
1322 *error = zbx_dsprintf(*error, "empty list of field names");
1323 goto out;
1324 }
1325
1326 /* get the entries (line 8 in T1) */
1327 if (FAIL == zbx_json_brackets_by_name(jp_obj, ZBX_PROTO_TAG_DATA, &jp_data))
1328 {
1329 *error = zbx_strdup(*error, zbx_json_strerror());
1330 goto out;
1331 }
1332
1333 /* all records will be stored in one large string */
1334 recs = (char *)zbx_malloc(recs, recs_alloc);
1335
1336 /* hash set as index for fast access to records via IDs */
1337 zbx_hashset_create(&h_id_offsets, 10000, id_offset_hash_func, id_offset_compare_func);
1338
1339 /* a hash set as a list for finding records to be deleted */
1340 zbx_hashset_create(&h_del, 10000, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1341
1342 sql = (char *)zbx_malloc(sql, sql_alloc);
1343
1344 sql_offset = 0;
1345 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "select ");
1346
1347 /* make a string with a list of fields for SELECT */
1348 for (f = 0; f < fields_count; f++)
1349 {
1350 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, fields[f]->name);
1351 zbx_chrcpy_alloc(&sql, &sql_alloc, &sql_offset, ',');
1352 }
1353
1354 sql_offset--;
1355 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, " from ");
1356 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, table->table);
1357
1358 /* Find a number of the ID field. Usually the 1st field. */
1359 id_field_nr = find_field_by_name(fields, fields_count, table->recid);
1360
1361 /* select all existing records */
1362 result = DBselect("%s", sql);
1363
1364 while (NULL != (row = DBfetch(result)))
1365 {
1366 ZBX_STR2UINT64(recid, row[id_field_nr]);
1367
1368 id_offset.id = recid;
1369 id_offset.offset = recs_offset;
1370
1371 zbx_hashset_insert(&h_id_offsets, &id_offset, sizeof(id_offset));
1372 zbx_hashset_insert(&h_del, &recid, sizeof(recid));
1373
1374 remember_record(fields, fields_count, &recs, &recs_alloc, &recs_offset, row);
1375 }
1376 DBfree_result(result);
1377
1378 /* these tables have unique indices, need special preparation to avoid conflicts during inserts/updates */
1379 if (0 == strcmp("globalmacro", table->table))
1380 {
1381 move_out = 1;
1382 move_field_nr = find_field_by_name(fields, fields_count, "macro");
1383 }
1384 else if (0 == strcmp("hosts_templates", table->table))
1385 {
1386 move_out = 1;
1387 move_field_nr = find_field_by_name(fields, fields_count, "templateid");
1388 }
1389 else if (0 == strcmp("hostmacro", table->table))
1390 {
1391 move_out = 1;
1392 move_field_nr = find_field_by_name(fields, fields_count, "macro");
1393 }
1394 else if (0 == strcmp("items", table->table))
1395 {
1396 move_out = 1;
1397 move_field_nr = find_field_by_name(fields, fields_count, "key_");
1398 }
1399 else if (0 == strcmp("drules", table->table))
1400 {
1401 move_out = 1;
1402 move_field_nr = find_field_by_name(fields, fields_count, "name");
1403 }
1404 else if (0 == strcmp("regexps", table->table))
1405 {
1406 move_out = 1;
1407 move_field_nr = find_field_by_name(fields, fields_count, "name");
1408 }
1409 else if (0 == strcmp("httptest", table->table))
1410 {
1411 move_out = 1;
1412 move_field_nr = find_field_by_name(fields, fields_count, "name");
1413 }
1414
1415 zbx_vector_uint64_create(&ins);
1416
1417 if (1 == move_out)
1418 zbx_vector_uint64_create(&moves);
1419
1420 zbx_vector_uint64_create(&availability_hostids);
1421
1422 p = NULL;
1423 /* iterate the entries (lines 9, 14 and 19 in T1) */
1424 while (NULL != (p = zbx_json_next(&jp_data, p)))
1425 {
1426 if (FAIL == zbx_json_brackets_open(p, &jp_row) ||
1427 NULL == (pf = zbx_json_next_value_dyn(&jp_row, NULL, &buf, &buf_alloc, NULL)))
1428 {
1429 *error = zbx_strdup(*error, zbx_json_strerror());
1430 goto clean2;
1431 }
1432
1433 /* check whether we need to update existing entry or insert a new one */
1434
1435 ZBX_STR2UINT64(recid, buf);
1436
1437 if (NULL != zbx_hashset_search(&h_del, &recid))
1438 {
1439 zbx_hashset_remove(&h_del, &recid);
1440
1441 if (1 == move_out)
1442 {
1443 int last_n = 0;
1444 size_t last_pos = 0;
1445 zbx_json_type_t type;
1446
1447 /* locate a copy of this record as found in database */
1448 id_offset.id = recid;
1449 if (NULL == (p_id_offset = (zbx_id_offset_t *)zbx_hashset_search(&h_id_offsets, &id_offset)))
1450 {
1451 THIS_SHOULD_NEVER_HAPPEN;
1452 goto clean2;
1453 }
1454
1455 /* find the field requiring special preprocessing in JSON record */
1456 f = 1;
1457 while (NULL != (pf = zbx_json_next_value_dyn(&jp_row, pf, &buf, &buf_alloc, &type)))
1458 {
1459 /* parse values for the entry (lines 10-12 in T1) */
1460
1461 if (fields_count == f)
1462 {
1463 *error = zbx_dsprintf(*error, "invalid number of fields \"%.*s\"",
1464 (int)(jp_row.end - jp_row.start + 1), jp_row.start);
1465 goto clean2;
1466 }
1467
1468 if (move_field_nr == f)
1469 break;
1470 f++;
1471 }
1472
1473 if (0 != compare_nth_field(fields, recs + p_id_offset->offset, move_field_nr, buf,
1474 (ZBX_JSON_TYPE_NULL == type), &last_n, &last_pos))
1475 {
1476 zbx_vector_uint64_append(&moves, recid);
1477 }
1478 }
1479 }
1480 else
1481 zbx_vector_uint64_append(&ins, recid);
1482 }
1483
1484 /* copy IDs of records to be deleted from hash set to vector */
1485 zbx_hashset_iter_reset(&h_del, &iter);
1486 while (NULL != (p_recid = (uint64_t *)zbx_hashset_iter_next(&iter)))
1487 zbx_vector_uint64_append(del, *p_recid);
1488 zbx_vector_uint64_sort(del, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1489
1490 zbx_vector_uint64_sort(&ins, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1491
1492 if (1 == move_out)
1493 {
1494 /* special preprocessing for 'hosts_templates' table to eliminate conflicts */
1495 /* in the 'hostid, templateid' unique index */
1496 if (0 == strcmp("hosts_templates", table->table))
1497 {
1498 /* Making the 'hostid, templateid' combination unique to avoid collisions when new records */
1499 /* are inserted and existing ones are updated is a bit complex. Let's take a simpler approach */
1500 /* - delete affected old records and insert the new ones. */
1501 if (0 != moves.values_num)
1502 {
1503 zbx_vector_uint64_append_array(&ins, moves.values, moves.values_num);
1504 zbx_vector_uint64_sort(&ins, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1505 zbx_vector_uint64_append_array(del, moves.values, moves.values_num);
1506 zbx_vector_uint64_sort(del, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1507 }
1508
1509 if (0 != del->values_num)
1510 {
1511 sql_offset = 0;
1512 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "delete from %s where", table->table);
1513 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, table->recid, del->values,
1514 del->values_num);
1515
1516 if (ZBX_DB_OK > DBexecute("%s", sql))
1517 goto clean2;
1518
1519 zbx_vector_uint64_clear(del);
1520 }
1521 }
1522 else
1523 {
1524 /* force index field update for removed records to avoid potential conflicts */
1525 if (0 != del->values_num)
1526 zbx_vector_uint64_append_array(&moves, del->values, del->values_num);
1527
1528 /* special preprocessing for 'globalmacro', 'hostmacro', 'items', 'drules', 'regexps' and */
1529 /* 'httptest' tables to eliminate conflicts in the 'macro', 'hostid,macro', 'hostid,key_', */
1530 /* 'name', 'name' and 'hostid,name' unique indices */
1531 if (0 < moves.values_num)
1532 {
1533 sql_offset = 0;
1534 #ifdef HAVE_MYSQL
1535 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1536 "update %s set %s=concat('#',%s) where",
1537 table->table, fields[move_field_nr]->name, table->recid);
1538 #else
1539 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update %s set %s='#'||%s where",
1540 table->table, fields[move_field_nr]->name, table->recid);
1541 #endif
1542 zbx_vector_uint64_sort(&moves, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1543 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, table->recid, moves.values,
1544 moves.values_num);
1545
1546 if (ZBX_DB_OK > DBexecute("%s", sql))
1547 goto clean2;
1548 }
1549 }
1550 }
1551
1552 /* apply insert operations */
1553
1554 if (0 != ins.values_num)
1555 {
1556 zbx_vector_ptr_create(&values);
1557 zbx_db_insert_prepare_dyn(&db_insert, table, fields, fields_count);
1558
1559 p = NULL;
1560 /* iterate the entries (lines 9, 14 and 19 in T1) */
1561 while (NULL != (p = zbx_json_next(&jp_data, p)))
1562 {
1563 zbx_json_type_t type;
1564 zbx_db_value_t *value;
1565
1566 if (FAIL == zbx_json_brackets_open(p, &jp_row))
1567 {
1568 *error = zbx_dsprintf(*error, "invalid data format: %s", zbx_json_strerror());
1569 goto clean;
1570 }
1571
1572 pf = zbx_json_next_value_dyn(&jp_row, NULL, &buf, &buf_alloc, NULL);
1573
1574 /* check whether we need to insert a new entry or update an existing one */
1575 ZBX_STR2UINT64(recid, buf);
1576 if (FAIL == zbx_vector_uint64_bsearch(&ins, recid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
1577 continue;
1578
1579 /* add the id field */
1580 value = (zbx_db_value_t *)zbx_malloc(NULL, sizeof(zbx_db_value_t));
1581 value->ui64 = recid;
1582 zbx_vector_ptr_append(&values, value);
1583
1584 /* add the rest of fields */
1585 for (f = 1; NULL != (pf = zbx_json_next_value_dyn(&jp_row, pf, &buf, &buf_alloc, &type));
1586 f++)
1587 {
1588 if (f == fields_count)
1589 {
1590 *error = zbx_dsprintf(*error, "invalid number of fields \"%.*s\"",
1591 (int)(jp_row.end - jp_row.start + 1), jp_row.start);
1592 goto clean;
1593 }
1594
1595 if (ZBX_JSON_TYPE_NULL == type && 0 != (fields[f]->flags & ZBX_NOTNULL))
1596 {
1597 *error = zbx_dsprintf(*error, "column \"%s.%s\" cannot be null",
1598 table->table, fields[f]->name);
1599 goto clean;
1600 }
1601
1602 value = (zbx_db_value_t *)zbx_malloc(NULL, sizeof(zbx_db_value_t));
1603
1604 switch (fields[f]->type)
1605 {
1606 case ZBX_TYPE_INT:
1607 value->i32 = atoi(buf);
1608 break;
1609 case ZBX_TYPE_UINT:
1610 ZBX_STR2UINT64(value->ui64, buf);
1611 break;
1612 case ZBX_TYPE_ID:
1613 if (ZBX_JSON_TYPE_NULL != type)
1614 ZBX_STR2UINT64(value->ui64, buf);
1615 else
1616 value->ui64 = 0;
1617 break;
1618 case ZBX_TYPE_FLOAT:
1619 value->dbl = atof(buf);
1620 break;
1621 case ZBX_TYPE_CHAR:
1622 case ZBX_TYPE_TEXT:
1623 case ZBX_TYPE_SHORTTEXT:
1624 case ZBX_TYPE_LONGTEXT:
1625 value->str = zbx_strdup(NULL, buf);
1626 break;
1627 default:
1628 *error = zbx_dsprintf(*error, "unsupported field type %d in \"%s.%s\"",
1629 (int)fields[f]->type, table->table, fields[f]->name);
1630 zbx_free(value);
1631 goto clean;
1632
1633 }
1634
1635 zbx_vector_ptr_append(&values, value);
1636 }
1637
1638 zbx_db_insert_add_values_dyn(&db_insert, (const zbx_db_value_t **)values.values,
1639 values.values_num);
1640
1641 for (f = 0; f < fields_count; f++)
1642 {
1643 switch (fields[f]->type)
1644 {
1645 case ZBX_TYPE_CHAR:
1646 case ZBX_TYPE_TEXT:
1647 case ZBX_TYPE_SHORTTEXT:
1648 case ZBX_TYPE_LONGTEXT:
1649 value = (zbx_db_value_t *)values.values[f];
1650 zbx_free(value->str);
1651 }
1652 }
1653 zbx_vector_ptr_clear_ext(&values, zbx_ptr_free);
1654
1655 if (f != fields_count)
1656 {
1657 *error = zbx_dsprintf(*error, "invalid number of fields \"%.*s\"",
1658 (int)(jp_row.end - jp_row.start + 1), jp_row.start);
1659 goto clean;
1660 }
1661 }
1662
1663 if (FAIL == zbx_db_insert_execute(&db_insert))
1664 goto clean;
1665 }
1666
1667 /* apply update operations */
1668
1669 sql_offset = 0;
1670 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
1671
1672 p = NULL;
1673 /* iterate the entries (lines 9, 14 and 19 in T1) */
1674 while (NULL != (p = zbx_json_next(&jp_data, p)))
1675 {
1676 int rec_differ = 0; /* how many fields differ */
1677 int last_n = 0;
1678 size_t tmp_offset = sql_offset, last_pos = 0;
1679 zbx_json_type_t type;
1680
1681 if (FAIL == zbx_json_brackets_open(p, &jp_row))
1682 {
1683 *error = zbx_dsprintf(*error, "invalid data format: %s", zbx_json_strerror());
1684 goto clean;
1685 }
1686
1687 pf = zbx_json_next_value_dyn(&jp_row, NULL, &buf, &buf_alloc, NULL);
1688
1689 /* check whether we need to insert a new entry or update an existing one */
1690 ZBX_STR2UINT64(recid, buf);
1691 if (FAIL != zbx_vector_uint64_bsearch(&ins, recid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
1692 continue;
1693
1694
1695 if (1 == fields_count) /* only primary key given, no update needed */
1696 continue;
1697
1698 /* locate a copy of this record as found in database */
1699 id_offset.id = recid;
1700 if (NULL == (p_id_offset = (zbx_id_offset_t *)zbx_hashset_search(&h_id_offsets, &id_offset)))
1701 {
1702 THIS_SHOULD_NEVER_HAPPEN;
1703 goto clean;
1704 }
1705
1706 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update %s set ", table->table);
1707
1708 for (f = 1; NULL != (pf = zbx_json_next_value_dyn(&jp_row, pf, &buf, &buf_alloc, &type));
1709 f++)
1710 {
1711 /* parse values for the entry (lines 10-12 in T1) */
1712
1713 if (f == fields_count)
1714 {
1715 *error = zbx_dsprintf(*error, "invalid number of fields \"%.*s\"",
1716 (int)(jp_row.end - jp_row.start + 1), jp_row.start);
1717 goto clean;
1718 }
1719
1720 if (ZBX_JSON_TYPE_NULL == type && 0 != (fields[f]->flags & ZBX_NOTNULL))
1721 {
1722 *error = zbx_dsprintf(*error, "column \"%s.%s\" cannot be null",
1723 table->table, fields[f]->name);
1724 goto clean;
1725 }
1726
1727 /* do not update existing lastlogsize and mtime fields */
1728 if (FAIL != zbx_vector_ptr_bsearch(&skip_fields, fields[f],
1729 ZBX_DEFAULT_PTR_COMPARE_FUNC))
1730 {
1731 continue;
1732 }
1733
1734 if (0 == compare_nth_field(fields, recs + p_id_offset->offset, f, buf,
1735 (ZBX_JSON_TYPE_NULL == type), &last_n, &last_pos))
1736 {
1737 continue;
1738 }
1739
1740 if (table == table_hosts && FAIL != zbx_vector_ptr_bsearch(&availability_fields,
1741 fields[f], ZBX_DEFAULT_PTR_COMPARE_FUNC))
1742 {
1743 /* host availability on server differs from local (proxy) availability - */
1744 /* reset availability timestamp to re-send availability data to server */
1745 zbx_vector_uint64_append(&availability_hostids, recid);
1746 continue;
1747 }
1748
1749 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "%s=", fields[f]->name);
1750 rec_differ++;
1751
1752 if (ZBX_JSON_TYPE_NULL == type)
1753 {
1754 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, "null,");
1755 continue;
1756 }
1757
1758 switch (fields[f]->type)
1759 {
1760 case ZBX_TYPE_INT:
1761 case ZBX_TYPE_UINT:
1762 case ZBX_TYPE_ID:
1763 case ZBX_TYPE_FLOAT:
1764 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "%s,", buf);
1765 break;
1766 default:
1767 esc = DBdyn_escape_string(buf);
1768 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "'%s',", esc);
1769 zbx_free(esc);
1770 }
1771 }
1772
1773 if (f != fields_count)
1774 {
1775 *error = zbx_dsprintf(*error, "invalid number of fields \"%.*s\"",
1776 (int)(jp_row.end - jp_row.start + 1), jp_row.start);
1777 goto clean;
1778 }
1779
1780 sql_offset--;
1781
1782 if (0 != rec_differ)
1783 {
1784 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " where %s=" ZBX_FS_UI64 ";\n",
1785 table->recid, recid);
1786
1787 if (SUCCEED != DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset))
1788 goto clean;
1789 }
1790 else
1791 {
1792 sql_offset = tmp_offset; /* discard this update, all fields are the same */
1793 *(sql + sql_offset) = '\0';
1794 }
1795 }
1796
1797 if (16 < sql_offset) /* in ORACLE always present begin..end; */
1798 {
1799 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
1800
1801 if (ZBX_DB_OK > DBexecute("%s", sql))
1802 goto clean;
1803 }
1804
1805 /* delete operations are performed by the caller using the returned del vector */
1806
1807 if (0 != availability_hostids.values_num)
1808 {
1809 zbx_vector_uint64_sort(&availability_hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1810 zbx_vector_uint64_uniq(&availability_hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1811 DCtouch_hosts_availability(&availability_hostids);
1812 }
1813
1814 ret = SUCCEED;
1815 clean:
1816 if (0 != ins.values_num)
1817 {
1818 zbx_db_insert_clean(&db_insert);
1819 zbx_vector_ptr_destroy(&values);
1820 }
1821 clean2:
1822 zbx_hashset_destroy(&h_id_offsets);
1823 zbx_hashset_destroy(&h_del);
1824 zbx_vector_uint64_destroy(&availability_hostids);
1825 zbx_vector_uint64_destroy(&ins);
1826 if (1 == move_out)
1827 zbx_vector_uint64_destroy(&moves);
1828 zbx_free(sql);
1829 zbx_free(recs);
1830 out:
1831 zbx_free(buf);
1832
1833 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1834
1835 return ret;
1836 }
1837
1838 /******************************************************************************
1839 * *
1840 * Function: process_proxyconfig *
1841 * *
1842 * Purpose: update configuration *
1843 * *
1844 ******************************************************************************/
process_proxyconfig(struct zbx_json_parse * jp_data)1845 int process_proxyconfig(struct zbx_json_parse *jp_data)
1846 {
1847 typedef struct
1848 {
1849 const ZBX_TABLE *table;
1850 zbx_vector_uint64_t ids;
1851 }
1852 table_ids_t;
1853
1854 char buf[ZBX_TABLENAME_LEN_MAX];
1855 const char *p = NULL;
1856 struct zbx_json_parse jp_obj;
1857 char *error = NULL;
1858 int i, ret = SUCCEED;
1859
1860 table_ids_t *table_ids;
1861 zbx_vector_ptr_t tables_proxy;
1862 const ZBX_TABLE *table;
1863
1864 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1865
1866 zbx_vector_ptr_create(&tables_proxy);
1867
1868 DBbegin();
1869
1870 /* iterate the tables (lines 2, 22 and 25 in T1) */
1871 while (NULL != (p = zbx_json_pair_next(jp_data, p, buf, sizeof(buf))) && SUCCEED == ret)
1872 {
1873 if (FAIL == zbx_json_brackets_open(p, &jp_obj))
1874 {
1875 error = zbx_strdup(error, zbx_json_strerror());
1876 ret = FAIL;
1877 break;
1878 }
1879
1880 if (NULL == (table = DBget_table(buf)))
1881 {
1882 error = zbx_dsprintf(error, "invalid table name \"%s\"", buf);
1883 ret = FAIL;
1884 break;
1885 }
1886
1887 table_ids = (table_ids_t *)zbx_malloc(NULL, sizeof(table_ids_t));
1888 table_ids->table = table;
1889 zbx_vector_uint64_create(&table_ids->ids);
1890 zbx_vector_ptr_append(&tables_proxy, table_ids);
1891
1892 ret = process_proxyconfig_table(table, &jp_obj, &table_ids->ids, &error);
1893 }
1894
1895 if (SUCCEED == ret)
1896 {
1897 char *sql = NULL;
1898 size_t sql_alloc = 512, sql_offset = 0;
1899
1900 sql = (char *)zbx_malloc(sql, sql_alloc * sizeof(char));
1901
1902 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
1903
1904 for (i = tables_proxy.values_num - 1; 0 <= i; i--)
1905 {
1906 table_ids = (table_ids_t *)tables_proxy.values[i];
1907
1908 if (0 == table_ids->ids.values_num)
1909 continue;
1910
1911 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "delete from %s where",
1912 table_ids->table->table);
1913 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, table_ids->table->recid,
1914 table_ids->ids.values, table_ids->ids.values_num);
1915 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ";\n");
1916 }
1917
1918 if (sql_offset > 16) /* in ORACLE always present begin..end; */
1919 {
1920 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
1921
1922 if (ZBX_DB_OK > DBexecute("%s", sql))
1923 ret = FAIL;
1924 }
1925
1926 zbx_free(sql);
1927 }
1928
1929 for (i = 0; i < tables_proxy.values_num; i++)
1930 {
1931 table_ids = (table_ids_t *)tables_proxy.values[i];
1932
1933 zbx_vector_uint64_destroy(&table_ids->ids);
1934 zbx_free(table_ids);
1935 }
1936 zbx_vector_ptr_destroy(&tables_proxy);
1937
1938 if (SUCCEED != (ret = DBend(ret)))
1939 {
1940 zabbix_log(LOG_LEVEL_ERR, "failed to update local proxy configuration copy: %s",
1941 (NULL == error ? "database error" : error));
1942 }
1943
1944 zbx_free(error);
1945
1946 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1947
1948 return ret;
1949 }
1950
1951 /******************************************************************************
1952 * *
1953 * Function: get_host_availability_data *
1954 * *
1955 * Return value: SUCCEED - processed successfully *
1956 * FAIL - no host availability has been changed *
1957 * *
1958 ******************************************************************************/
get_host_availability_data(struct zbx_json * json,int * ts)1959 int get_host_availability_data(struct zbx_json *json, int *ts)
1960 {
1961 int i, j, ret = FAIL;
1962 zbx_vector_ptr_t hosts;
1963 zbx_host_availability_t *ha;
1964
1965 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1966
1967 zbx_vector_ptr_create(&hosts);
1968
1969 if (SUCCEED != DCget_hosts_availability(&hosts, ts))
1970 goto out;
1971
1972 zbx_json_addarray(json, ZBX_PROTO_TAG_HOST_AVAILABILITY);
1973
1974 for (i = 0; i < hosts.values_num; i++)
1975 {
1976 ha = (zbx_host_availability_t *)hosts.values[i];
1977
1978 zbx_json_addobject(json, NULL);
1979 zbx_json_adduint64(json, ZBX_PROTO_TAG_HOSTID, ha->hostid);
1980
1981 for (j = 0; j < ZBX_AGENT_MAX; j++)
1982 {
1983 zbx_json_adduint64(json, availability_tag_available[j], ha->agents[j].available);
1984 zbx_json_addstring(json, availability_tag_error[j], ha->agents[j].error, ZBX_JSON_TYPE_STRING);
1985 }
1986
1987 zbx_json_close(json);
1988 }
1989
1990 zbx_json_close(json);
1991
1992 ret = SUCCEED;
1993 out:
1994 zbx_vector_ptr_clear_ext(&hosts, (zbx_mem_free_func_t)zbx_host_availability_free);
1995 zbx_vector_ptr_destroy(&hosts);
1996
1997 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
1998
1999 return ret;
2000 }
2001
2002 /******************************************************************************
2003 * *
2004 * Function: process_host_availability_contents *
2005 * *
2006 * Purpose: parses host availability data contents and processes it *
2007 * *
2008 * Return value: SUCCEED - processed successfully *
2009 * FAIL - an error occurred *
2010 * *
2011 ******************************************************************************/
process_host_availability_contents(struct zbx_json_parse * jp_data,char ** error)2012 static int process_host_availability_contents(struct zbx_json_parse *jp_data, char **error)
2013 {
2014 zbx_uint64_t hostid;
2015 struct zbx_json_parse jp_row;
2016 const char *p = NULL;
2017 char *tmp = NULL;
2018 size_t tmp_alloc = 129;
2019 zbx_host_availability_t *ha = NULL;
2020 zbx_vector_ptr_t hosts;
2021 int i, ret;
2022
2023 tmp = (char *)zbx_malloc(NULL, tmp_alloc);
2024
2025 zbx_vector_ptr_create(&hosts);
2026
2027 while (NULL != (p = zbx_json_next(jp_data, p))) /* iterate the host entries */
2028 {
2029 if (SUCCEED != (ret = zbx_json_brackets_open(p, &jp_row)))
2030 {
2031 *error = zbx_strdup(*error, zbx_json_strerror());
2032 goto out;
2033 }
2034
2035 if (SUCCEED != (ret = zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_HOSTID, &tmp, &tmp_alloc,
2036 NULL)))
2037 {
2038 *error = zbx_strdup(*error, zbx_json_strerror());
2039 goto out;
2040 }
2041
2042 if (SUCCEED != (ret = is_uint64(tmp, &hostid)))
2043 {
2044 *error = zbx_strdup(*error, "hostid is not a valid numeric");
2045 goto out;
2046 }
2047
2048 ha = (zbx_host_availability_t *)zbx_malloc(NULL, sizeof(zbx_host_availability_t));
2049 zbx_host_availability_init(ha, hostid);
2050
2051 for (i = 0; i < ZBX_AGENT_MAX; i++)
2052 {
2053 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, availability_tag_available[i], &tmp,
2054 &tmp_alloc, NULL))
2055 {
2056 continue;
2057 }
2058
2059 ha->agents[i].available = atoi(tmp);
2060 ha->agents[i].flags |= ZBX_FLAGS_AGENT_STATUS_AVAILABLE;
2061 }
2062
2063 for (i = 0; i < ZBX_AGENT_MAX; i++)
2064 {
2065 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, availability_tag_error[i], &tmp, &tmp_alloc,
2066 NULL))
2067 continue;
2068
2069 ha->agents[i].error = zbx_strdup(NULL, tmp);
2070 ha->agents[i].flags |= ZBX_FLAGS_AGENT_STATUS_ERROR;
2071 }
2072
2073 if (SUCCEED != (ret = zbx_host_availability_is_set(ha)))
2074 {
2075 zbx_free(ha);
2076 *error = zbx_dsprintf(*error, "no availability data for \"hostid\":" ZBX_FS_UI64, hostid);
2077 goto out;
2078 }
2079
2080 zbx_vector_ptr_append(&hosts, ha);
2081 }
2082
2083 if (0 < hosts.values_num && SUCCEED == DCset_hosts_availability(&hosts))
2084 {
2085 char *sql = NULL;
2086 size_t sql_alloc = 4 * ZBX_KIBIBYTE, sql_offset = 0;
2087
2088 sql = (char *)zbx_malloc(sql, sql_alloc);
2089
2090 DBbegin();
2091 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2092
2093 for (i = 0; i < hosts.values_num; i++)
2094 {
2095 if (SUCCEED != zbx_sql_add_host_availability(&sql, &sql_alloc, &sql_offset,
2096 (zbx_host_availability_t *)hosts.values[i]))
2097 {
2098 continue;
2099 }
2100
2101 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ";\n");
2102 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
2103 }
2104
2105 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2106
2107 if (16 < sql_offset)
2108 DBexecute("%s", sql);
2109
2110 DBcommit();
2111
2112 zbx_free(sql);
2113 }
2114
2115 ret = SUCCEED;
2116 out:
2117 zbx_vector_ptr_clear_ext(&hosts, (zbx_mem_free_func_t)zbx_host_availability_free);
2118 zbx_vector_ptr_destroy(&hosts);
2119
2120 zbx_free(tmp);
2121
2122 return ret;
2123 }
2124
2125 /******************************************************************************
2126 * *
2127 * Function: process_host_availability *
2128 * *
2129 * Purpose: update proxy hosts availability *
2130 * *
2131 * Return value: SUCCEED - processed successfully *
2132 * FAIL - an error occurred *
2133 * *
2134 ******************************************************************************/
process_host_availability(struct zbx_json_parse * jp,char ** error)2135 int process_host_availability(struct zbx_json_parse *jp, char **error)
2136 {
2137 struct zbx_json_parse jp_data;
2138 int ret;
2139
2140 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2141
2142 if (SUCCEED != (ret = zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DATA, &jp_data)))
2143 {
2144 *error = zbx_strdup(*error, zbx_json_strerror());
2145 goto out;
2146 }
2147
2148 if (SUCCEED == zbx_json_object_is_empty(&jp_data))
2149 goto out;
2150
2151 ret = process_host_availability_contents(&jp_data, error);
2152
2153 out:
2154 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
2155
2156 return ret;
2157 }
2158
2159 /******************************************************************************
2160 * *
2161 * Function: proxy_get_lastid *
2162 * *
2163 ******************************************************************************/
proxy_get_lastid(const char * table_name,const char * lastidfield,zbx_uint64_t * lastid)2164 static void proxy_get_lastid(const char *table_name, const char *lastidfield, zbx_uint64_t *lastid)
2165 {
2166 DB_RESULT result;
2167 DB_ROW row;
2168
2169 zabbix_log(LOG_LEVEL_DEBUG, "In %s() field:'%s.%s'", __func__, table_name, lastidfield);
2170
2171 result = DBselect("select nextid from ids where table_name='%s' and field_name='%s'",
2172 table_name, lastidfield);
2173
2174 if (NULL == (row = DBfetch(result)))
2175 *lastid = 0;
2176 else
2177 ZBX_STR2UINT64(*lastid, row[0]);
2178 DBfree_result(result);
2179
2180 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():" ZBX_FS_UI64, __func__, *lastid);
2181 }
2182
2183 /******************************************************************************
2184 * *
2185 * Function: proxy_set_lastid *
2186 * *
2187 ******************************************************************************/
proxy_set_lastid(const char * table_name,const char * lastidfield,const zbx_uint64_t lastid)2188 static void proxy_set_lastid(const char *table_name, const char *lastidfield, const zbx_uint64_t lastid)
2189 {
2190 DB_RESULT result;
2191
2192 zabbix_log(LOG_LEVEL_DEBUG, "In %s() [%s.%s:" ZBX_FS_UI64 "]", __func__, table_name, lastidfield, lastid);
2193
2194 result = DBselect("select 1 from ids where table_name='%s' and field_name='%s'",
2195 table_name, lastidfield);
2196
2197 if (NULL == DBfetch(result))
2198 {
2199 DBexecute("insert into ids (table_name,field_name,nextid) values ('%s','%s'," ZBX_FS_UI64 ")",
2200 table_name, lastidfield, lastid);
2201 }
2202 else
2203 {
2204 DBexecute("update ids set nextid=" ZBX_FS_UI64 " where table_name='%s' and field_name='%s'",
2205 lastid, table_name, lastidfield);
2206 }
2207 DBfree_result(result);
2208
2209 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2210 }
2211
proxy_set_hist_lastid(const zbx_uint64_t lastid)2212 void proxy_set_hist_lastid(const zbx_uint64_t lastid)
2213 {
2214 proxy_set_lastid("proxy_history", "history_lastid", lastid);
2215 }
2216
proxy_set_dhis_lastid(const zbx_uint64_t lastid)2217 void proxy_set_dhis_lastid(const zbx_uint64_t lastid)
2218 {
2219 proxy_set_lastid(dht.table, dht.lastidfield, lastid);
2220 }
2221
proxy_set_areg_lastid(const zbx_uint64_t lastid)2222 void proxy_set_areg_lastid(const zbx_uint64_t lastid)
2223 {
2224 proxy_set_lastid(areg.table, areg.lastidfield, lastid);
2225 }
2226
proxy_get_delay(const zbx_uint64_t lastid)2227 int proxy_get_delay(const zbx_uint64_t lastid)
2228 {
2229 DB_RESULT result;
2230 DB_ROW row;
2231 char *sql = NULL;
2232 int ts = 0;
2233
2234 zabbix_log(LOG_LEVEL_DEBUG, "In %s() [lastid=" ZBX_FS_UI64 "]", __func__, lastid);
2235
2236 sql = zbx_dsprintf(sql, "select write_clock from proxy_history where id>" ZBX_FS_UI64 " order by id asc",
2237 lastid);
2238
2239 result = DBselectN(sql, 1);
2240 zbx_free(sql);
2241
2242 if (NULL != (row = DBfetch(result)))
2243 ts = (int)time(NULL) - atoi(row[0]);
2244
2245 DBfree_result(result);
2246
2247 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2248
2249 return ts;
2250 }
2251
2252 /******************************************************************************
2253 * *
2254 * Function: proxy_get_history_data_simple *
2255 * *
2256 * Purpose: Get history data from the database. *
2257 * *
2258 ******************************************************************************/
proxy_get_history_data_simple(struct zbx_json * j,const char * proto_tag,const zbx_history_table_t * ht,zbx_uint64_t * lastid,zbx_uint64_t * id,int * records_num,int * more)2259 static void proxy_get_history_data_simple(struct zbx_json *j, const char *proto_tag, const zbx_history_table_t *ht,
2260 zbx_uint64_t *lastid, zbx_uint64_t *id, int *records_num, int *more)
2261 {
2262 size_t offset = 0;
2263 int f, records_num_last = *records_num, retries = 1;
2264 char sql[MAX_STRING_LEN];
2265 DB_RESULT result;
2266 DB_ROW row;
2267 struct timespec t_sleep = { 0, 100000000L }, t_rem;
2268
2269 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s'", __func__, ht->table);
2270
2271 *more = ZBX_PROXY_DATA_DONE;
2272
2273 offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, "select id");
2274
2275 for (f = 0; NULL != ht->fields[f].field; f++)
2276 offset += zbx_snprintf(sql + offset, sizeof(sql) - offset, ",%s", ht->fields[f].field);
2277 try_again:
2278 zbx_snprintf(sql + offset, sizeof(sql) - offset, " from %s where id>" ZBX_FS_UI64 " order by id",
2279 ht->table, *id);
2280
2281 result = DBselectN(sql, ZBX_MAX_HRECORDS);
2282
2283 while (NULL != (row = DBfetch(result)))
2284 {
2285 ZBX_STR2UINT64(*lastid, row[0]);
2286
2287 if (1 < *lastid - *id)
2288 {
2289 /* At least one record is missing. It can happen if some DB syncer process has */
2290 /* started but not yet committed a transaction or a rollback occurred in a DB syncer. */
2291 if (0 < retries--)
2292 {
2293 DBfree_result(result);
2294 zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing."
2295 " Waiting " ZBX_FS_DBL " sec, retrying.",
2296 __func__, *lastid - *id - 1,
2297 t_sleep.tv_sec + t_sleep.tv_nsec / 1e9);
2298 nanosleep(&t_sleep, &t_rem);
2299 goto try_again;
2300 }
2301 else
2302 {
2303 zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing. No more retries.",
2304 __func__, *lastid - *id - 1);
2305 }
2306 }
2307
2308 if (0 == *records_num)
2309 zbx_json_addarray(j, proto_tag);
2310
2311 zbx_json_addobject(j, NULL);
2312
2313 for (f = 0; NULL != ht->fields[f].field; f++)
2314 {
2315 if (NULL != ht->fields[f].default_value && 0 == strcmp(row[f + 1], ht->fields[f].default_value))
2316 continue;
2317
2318 zbx_json_addstring(j, ht->fields[f].tag, row[f + 1], ht->fields[f].jt);
2319 }
2320
2321 (*records_num)++;
2322
2323 zbx_json_close(j);
2324
2325 /* stop gathering data to avoid exceeding the maximum packet size */
2326 if (ZBX_DATA_JSON_RECORD_LIMIT < j->buffer_offset)
2327 {
2328 *more = ZBX_PROXY_DATA_MORE;
2329 break;
2330 }
2331
2332 *id = *lastid;
2333 }
2334 DBfree_result(result);
2335
2336 if (ZBX_MAX_HRECORDS == *records_num - records_num_last)
2337 *more = ZBX_PROXY_DATA_MORE;
2338
2339 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%d lastid:" ZBX_FS_UI64 " more:%d size:" ZBX_FS_SIZE_T,
2340 __func__, *records_num - records_num_last, *lastid, *more,
2341 (zbx_fs_size_t)j->buffer_offset);
2342 }
2343
2344 typedef struct
2345 {
2346 zbx_uint64_t id;
2347 zbx_uint64_t itemid;
2348 zbx_uint64_t lastlogsize;
2349 size_t source_offset;
2350 size_t value_offset;
2351 int clock;
2352 int ns;
2353 int timestamp;
2354 int severity;
2355 int logeventid;
2356 int mtime;
2357 unsigned char state;
2358 unsigned char flags;
2359 }
2360 zbx_history_data_t;
2361
2362 /******************************************************************************
2363 * *
2364 * Function: proxy_get_history_data *
2365 * *
2366 * Purpose: read proxy history data from the database *
2367 * *
2368 * Parameters: lastid - [IN] the id of last processed proxy *
2369 * history record *
2370 * data - [IN/OUT] the proxy history data buffer *
2371 * data_alloc - [IN/OUT] the size of proxy history data *
2372 * buffer *
2373 * string_buffer - [IN/OUT] the string buffer *
2374 * string_buffer_size - [IN/OUT] the size of string buffer *
2375 * more - [OUT] set to ZBX_PROXY_DATA_MORE if there *
2376 * might be more data to read *
2377 * *
2378 * Return value: The number of records read. *
2379 * *
2380 ******************************************************************************/
proxy_get_history_data(zbx_uint64_t lastid,zbx_history_data_t ** data,size_t * data_alloc,char ** string_buffer,size_t * string_buffer_alloc,int * more)2381 static int proxy_get_history_data(zbx_uint64_t lastid, zbx_history_data_t **data, size_t *data_alloc,
2382 char **string_buffer, size_t *string_buffer_alloc, int *more)
2383 {
2384
2385 DB_RESULT result;
2386 DB_ROW row;
2387 char *sql = NULL;
2388 size_t sql_alloc = 0, sql_offset = 0, data_num = 0;
2389 size_t string_buffer_offset = 0;
2390 zbx_uint64_t id;
2391 int retries = 1, total_retries = 10;
2392 struct timespec t_sleep = { 0, 100000000L }, t_rem;
2393 zbx_history_data_t *hd;
2394
2395 zabbix_log(LOG_LEVEL_DEBUG, "In %s() lastid:" ZBX_FS_UI64, __func__, lastid);
2396
2397 try_again:
2398 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
2399 "select id,itemid,clock,ns,timestamp,source,severity,"
2400 "value,logeventid,state,lastlogsize,mtime,flags"
2401 " from proxy_history"
2402 " where id>" ZBX_FS_UI64
2403 " order by id",
2404 lastid);
2405
2406 result = DBselectN(sql, ZBX_MAX_HRECORDS - data_num);
2407
2408 zbx_free(sql);
2409
2410 while (NULL != (row = DBfetch(result)))
2411 {
2412 ZBX_STR2UINT64(id, row[0]);
2413
2414 if (1 < id - lastid)
2415 {
2416 /* At least one record is missing. It can happen if some DB syncer process has */
2417 /* started but not yet committed a transaction or a rollback occurred in a DB syncer. */
2418 if (0 < retries--)
2419 {
2420 /* limit the number of total retries to avoid being stuck */
2421 /* in history full of 'holes' for a long time */
2422 if (0 >= total_retries--)
2423 break;
2424
2425 DBfree_result(result);
2426 zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing."
2427 " Waiting " ZBX_FS_DBL " sec, retrying.",
2428 __func__, id - lastid - 1,
2429 t_sleep.tv_sec + t_sleep.tv_nsec / 1e9);
2430 nanosleep(&t_sleep, &t_rem);
2431 goto try_again;
2432 }
2433 else
2434 {
2435 zabbix_log(LOG_LEVEL_DEBUG, "%s() " ZBX_FS_UI64 " record(s) missing. No more retries.",
2436 __func__, id - lastid - 1);
2437 }
2438 }
2439
2440 retries = 1;
2441
2442 if (*data_alloc == data_num)
2443 {
2444 *data_alloc *= 2;
2445 *data = (zbx_history_data_t *)zbx_realloc(*data, sizeof(zbx_history_data_t) * *data_alloc);
2446 }
2447
2448 hd = *data + data_num++;
2449 hd->id = id;
2450 ZBX_STR2UINT64(hd->itemid, row[1]);
2451 ZBX_STR2UCHAR(hd->flags, row[12]);
2452 hd->clock = atoi(row[2]);
2453 hd->ns = atoi(row[3]);
2454
2455 if (PROXY_HISTORY_FLAG_NOVALUE != (hd->flags & PROXY_HISTORY_MASK_NOVALUE))
2456 {
2457 ZBX_STR2UCHAR(hd->state, row[9]);
2458
2459 if (0 == (hd->flags & PROXY_HISTORY_FLAG_NOVALUE))
2460 {
2461 size_t len1, len2;
2462
2463 hd->timestamp = atoi(row[4]);
2464 hd->severity = atoi(row[6]);
2465 hd->logeventid = atoi(row[8]);
2466
2467 len1 = strlen(row[5]) + 1;
2468 len2 = strlen(row[7]) + 1;
2469
2470 if (*string_buffer_alloc < string_buffer_offset + len1 + len2)
2471 {
2472 while (*string_buffer_alloc < string_buffer_offset + len1 + len2)
2473 *string_buffer_alloc += ZBX_KIBIBYTE;
2474
2475 *string_buffer = (char *)zbx_realloc(*string_buffer, *string_buffer_alloc);
2476 }
2477
2478 hd->source_offset = string_buffer_offset;
2479 memcpy(*string_buffer + hd->source_offset, row[5], len1);
2480 string_buffer_offset += len1;
2481
2482 hd->value_offset = string_buffer_offset;
2483 memcpy(*string_buffer + hd->value_offset, row[7], len2);
2484 string_buffer_offset += len2;
2485 }
2486
2487 if (0 != (hd->flags & PROXY_HISTORY_FLAG_META))
2488 {
2489 ZBX_STR2UINT64(hd->lastlogsize, row[10]);
2490 hd->mtime = atoi(row[11]);
2491 }
2492 }
2493
2494 lastid = id;
2495 }
2496 DBfree_result(result);
2497
2498 if (ZBX_MAX_HRECORDS != data_num && 1 == retries)
2499 *more = ZBX_PROXY_DATA_DONE;
2500
2501 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() data_num:" ZBX_FS_SIZE_T, __func__, data_num);
2502
2503 return data_num;
2504 }
2505
2506 /******************************************************************************
2507 * *
2508 * Function: proxy_add_hist_data *
2509 * *
2510 * Purpose: add history records to output json *
2511 * *
2512 * Parameters: j - [IN] the json output buffer *
2513 * records_num - [IN] the total number of records added *
2514 * dc_items - [IN] the item configuration data *
2515 * errcodes - [IN] the item configuration status codes *
2516 * records - [IN] the records to add *
2517 * string_buffer - [IN] the string buffer holding string values *
2518 * lastid - [OUT] the id of last added record *
2519 * *
2520 * Return value: The total number of records added. *
2521 * *
2522 ******************************************************************************/
proxy_add_hist_data(struct zbx_json * j,int records_num,const DC_ITEM * dc_items,const int * errcodes,const zbx_vector_ptr_t * records,const char * string_buffer,zbx_uint64_t * lastid)2523 static int proxy_add_hist_data(struct zbx_json *j, int records_num, const DC_ITEM *dc_items, const int *errcodes,
2524 const zbx_vector_ptr_t *records, const char *string_buffer, zbx_uint64_t *lastid)
2525 {
2526 int i;
2527 const zbx_history_data_t *hd;
2528
2529 for (i = records->values_num - 1; i >= 0; i--)
2530 {
2531 hd = (const zbx_history_data_t *)records->values[i];
2532 *lastid = hd->id;
2533
2534 if (SUCCEED != errcodes[i])
2535 continue;
2536
2537 if (ITEM_STATUS_ACTIVE != dc_items[i].status)
2538 continue;
2539
2540 if (HOST_STATUS_MONITORED != dc_items[i].host.status)
2541 continue;
2542
2543 if (PROXY_HISTORY_FLAG_NOVALUE == (hd->flags & PROXY_HISTORY_MASK_NOVALUE))
2544 {
2545 if (SUCCEED != zbx_is_counted_in_item_queue(dc_items[i].type, dc_items[i].key_orig))
2546 continue;
2547 }
2548
2549 if (0 == records_num)
2550 zbx_json_addarray(j, ZBX_PROTO_TAG_HISTORY_DATA);
2551
2552 zbx_json_addobject(j, NULL);
2553 zbx_json_adduint64(j, ZBX_PROTO_TAG_ID, hd->id);
2554 zbx_json_adduint64(j, ZBX_PROTO_TAG_ITEMID, hd->itemid);
2555 zbx_json_adduint64(j, ZBX_PROTO_TAG_CLOCK, hd->clock);
2556 zbx_json_adduint64(j, ZBX_PROTO_TAG_NS, hd->ns);
2557
2558 if (PROXY_HISTORY_FLAG_NOVALUE != (hd->flags & PROXY_HISTORY_MASK_NOVALUE))
2559 {
2560 if (ITEM_STATE_NORMAL != hd->state)
2561 zbx_json_adduint64(j, ZBX_PROTO_TAG_STATE, hd->state);
2562
2563 if (0 == (hd->flags & PROXY_HISTORY_FLAG_NOVALUE))
2564 {
2565 if (0 != hd->timestamp)
2566 zbx_json_adduint64(j, ZBX_PROTO_TAG_LOGTIMESTAMP, hd->timestamp);
2567
2568 if ('\0' != string_buffer[hd->source_offset])
2569 {
2570 zbx_json_addstring(j, ZBX_PROTO_TAG_LOGSOURCE,
2571 string_buffer + hd->source_offset, ZBX_JSON_TYPE_STRING);
2572 }
2573
2574 if (0 != hd->severity)
2575 zbx_json_adduint64(j, ZBX_PROTO_TAG_LOGSEVERITY, hd->severity);
2576
2577 if (0 != hd->logeventid)
2578 zbx_json_adduint64(j, ZBX_PROTO_TAG_LOGEVENTID, hd->logeventid);
2579
2580 zbx_json_addstring(j, ZBX_PROTO_TAG_VALUE, string_buffer + hd->value_offset,
2581 ZBX_JSON_TYPE_STRING);
2582 }
2583
2584 if (0 != (hd->flags & PROXY_HISTORY_FLAG_META))
2585 {
2586 zbx_json_adduint64(j, ZBX_PROTO_TAG_LASTLOGSIZE, hd->lastlogsize);
2587 zbx_json_adduint64(j, ZBX_PROTO_TAG_MTIME, hd->mtime);
2588 }
2589 }
2590
2591 zbx_json_close(j);
2592 records_num++;
2593
2594 /* stop gathering data to avoid exceeding the maximum packet size */
2595 if (ZBX_DATA_JSON_RECORD_LIMIT < j->buffer_offset)
2596 break;
2597 }
2598
2599 return records_num;
2600 }
2601
proxy_get_hist_data(struct zbx_json * j,zbx_uint64_t * lastid,int * more)2602 int proxy_get_hist_data(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
2603 {
2604 int records_num = 0, data_num, i, *errcodes = NULL, items_alloc = 0;
2605 zbx_uint64_t id;
2606 zbx_hashset_t itemids_added;
2607 zbx_history_data_t *data;
2608 char *string_buffer;
2609 size_t data_alloc = 16, string_buffer_alloc = ZBX_KIBIBYTE;
2610 zbx_vector_uint64_t itemids;
2611 zbx_vector_ptr_t records;
2612 DC_ITEM *dc_items = 0;
2613
2614 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2615
2616 zbx_vector_uint64_create(&itemids);
2617 zbx_vector_ptr_create(&records);
2618 data = (zbx_history_data_t *)zbx_malloc(NULL, data_alloc * sizeof(zbx_history_data_t));
2619 string_buffer = (char *)zbx_malloc(NULL, string_buffer_alloc);
2620
2621 *more = ZBX_PROXY_DATA_MORE;
2622 proxy_get_lastid("proxy_history", "history_lastid", &id);
2623
2624 zbx_hashset_create(&itemids_added, data_alloc, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
2625
2626 /* get history data in batches by ZBX_MAX_HRECORDS records and stop if: */
2627 /* 1) there are no more data to read */
2628 /* 2) we have retrieved more than the total maximum number of records */
2629 /* 3) we have gathered more than half of the maximum packet size */
2630 while (ZBX_DATA_JSON_BATCH_LIMIT > j->buffer_offset && ZBX_MAX_HRECORDS_TOTAL > records_num &&
2631 0 != (data_num = proxy_get_history_data(id, &data, &data_alloc, &string_buffer,
2632 &string_buffer_alloc, more)))
2633 {
2634 zbx_vector_uint64_reserve(&itemids, data_num);
2635 zbx_vector_ptr_reserve(&records, data_num);
2636
2637 /* filter out duplicate novalue updates */
2638 for (i = data_num - 1; i >= 0; i--)
2639 {
2640 if (PROXY_HISTORY_FLAG_NOVALUE == (data[i].flags & PROXY_HISTORY_MASK_NOVALUE))
2641 {
2642 if (NULL != zbx_hashset_search(&itemids_added, &data[i].itemid))
2643 continue;
2644
2645 zbx_hashset_insert(&itemids_added, &data[i].itemid, sizeof(data[i].itemid));
2646 }
2647
2648 zbx_vector_ptr_append(&records, &data[i]);
2649 zbx_vector_uint64_append(&itemids, data[i].itemid);
2650 }
2651
2652 /* append history records to json */
2653
2654 if (itemids.values_num > items_alloc)
2655 {
2656 items_alloc = itemids.values_num;
2657 dc_items = (DC_ITEM *)zbx_realloc(dc_items, items_alloc * sizeof(DC_ITEM));
2658 errcodes = (int *)zbx_realloc(errcodes, items_alloc * sizeof(int));
2659 }
2660
2661 DCconfig_get_items_by_itemids(dc_items, itemids.values, errcodes, itemids.values_num);
2662
2663 records_num = proxy_add_hist_data(j, records_num, dc_items, errcodes, &records, string_buffer, lastid);
2664 DCconfig_clean_items(dc_items, errcodes, itemids.values_num);
2665
2666 /* got less data than requested - either no more data to read or the history is full of */
2667 /* holes. In this case send retrieved data before attempting to read/wait for more data */
2668 if (ZBX_MAX_HRECORDS > data_num)
2669 break;
2670
2671 zbx_vector_uint64_clear(&itemids);
2672 zbx_vector_ptr_clear(&records);
2673 zbx_hashset_clear(&itemids_added);
2674 id = *lastid;
2675 }
2676
2677 if (0 != records_num)
2678 zbx_json_close(j);
2679
2680 zbx_hashset_destroy(&itemids_added);
2681
2682 zbx_free(dc_items);
2683 zbx_free(errcodes);
2684 zbx_free(data);
2685 zbx_free(string_buffer);
2686 zbx_vector_ptr_destroy(&records);
2687 zbx_vector_uint64_destroy(&itemids);
2688
2689 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() lastid:" ZBX_FS_UI64 " records_num:%d size:~" ZBX_FS_SIZE_T " more:%d",
2690 __func__, *lastid, records_num, j->buffer_offset, *more);
2691
2692 return records_num;
2693 }
2694
proxy_get_dhis_data(struct zbx_json * j,zbx_uint64_t * lastid,int * more)2695 int proxy_get_dhis_data(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
2696 {
2697 int records_num = 0;
2698 zbx_uint64_t id;
2699
2700 proxy_get_lastid(dht.table, dht.lastidfield, &id);
2701
2702 /* get history data in batches by ZBX_MAX_HRECORDS records and stop if: */
2703 /* 1) there are no more data to read */
2704 /* 2) we have retrieved more than the total maximum number of records */
2705 /* 3) we have gathered more than half of the maximum packet size */
2706 while (ZBX_DATA_JSON_BATCH_LIMIT > j->buffer_offset)
2707 {
2708 proxy_get_history_data_simple(j, ZBX_PROTO_TAG_DISCOVERY_DATA, &dht, lastid, &id, &records_num, more);
2709
2710 if (ZBX_PROXY_DATA_DONE == *more || ZBX_MAX_HRECORDS_TOTAL <= records_num)
2711 break;
2712 }
2713
2714 if (0 != records_num)
2715 zbx_json_close(j);
2716
2717 return records_num;
2718 }
2719
proxy_get_areg_data(struct zbx_json * j,zbx_uint64_t * lastid,int * more)2720 int proxy_get_areg_data(struct zbx_json *j, zbx_uint64_t *lastid, int *more)
2721 {
2722 int records_num = 0;
2723 zbx_uint64_t id;
2724
2725 proxy_get_lastid(areg.table, areg.lastidfield, &id);
2726
2727 /* get history data in batches by ZBX_MAX_HRECORDS records and stop if: */
2728 /* 1) there are no more data to read */
2729 /* 2) we have retrieved more than the total maximum number of records */
2730 /* 3) we have gathered more than half of the maximum packet size */
2731 while (ZBX_DATA_JSON_BATCH_LIMIT > j->buffer_offset)
2732 {
2733 proxy_get_history_data_simple(j, ZBX_PROTO_TAG_AUTOREGISTRATION, &areg, lastid, &id, &records_num,
2734 more);
2735
2736 if (ZBX_PROXY_DATA_DONE == *more || ZBX_MAX_HRECORDS_TOTAL <= records_num)
2737 break;
2738 }
2739
2740 if (0 != records_num)
2741 zbx_json_close(j);
2742
2743 return records_num;
2744 }
2745
calc_timestamp(const char * line,int * timestamp,const char * format)2746 void calc_timestamp(const char *line, int *timestamp, const char *format)
2747 {
2748 int hh, mm, ss, yyyy, dd, MM;
2749 int hhc = 0, mmc = 0, ssc = 0, yyyyc = 0, ddc = 0, MMc = 0;
2750 int i, num;
2751 struct tm tm;
2752 time_t t;
2753
2754 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2755
2756 hh = mm = ss = yyyy = dd = MM = 0;
2757
2758 for (i = 0; '\0' != format[i] && '\0' != line[i]; i++)
2759 {
2760 if (0 == isdigit(line[i]))
2761 continue;
2762
2763 num = (int)line[i] - 48;
2764
2765 switch ((char)format[i])
2766 {
2767 case 'h':
2768 hh = 10 * hh + num;
2769 hhc++;
2770 break;
2771 case 'm':
2772 mm = 10 * mm + num;
2773 mmc++;
2774 break;
2775 case 's':
2776 ss = 10 * ss + num;
2777 ssc++;
2778 break;
2779 case 'y':
2780 yyyy = 10 * yyyy + num;
2781 yyyyc++;
2782 break;
2783 case 'd':
2784 dd = 10 * dd + num;
2785 ddc++;
2786 break;
2787 case 'M':
2788 MM = 10 * MM + num;
2789 MMc++;
2790 break;
2791 }
2792 }
2793
2794 zabbix_log(LOG_LEVEL_DEBUG, "%s() %02d:%02d:%02d %02d/%02d/%04d", __func__, hh, mm, ss, MM, dd, yyyy);
2795
2796 /* seconds can be ignored, no ssc here */
2797 if (0 != hhc && 0 != mmc && 0 != yyyyc && 0 != ddc && 0 != MMc)
2798 {
2799 tm.tm_sec = ss;
2800 tm.tm_min = mm;
2801 tm.tm_hour = hh;
2802 tm.tm_mday = dd;
2803 tm.tm_mon = MM - 1;
2804 tm.tm_year = yyyy - 1900;
2805 tm.tm_isdst = -1;
2806
2807 if (0 < (t = mktime(&tm)))
2808 *timestamp = t;
2809 }
2810
2811 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() timestamp:%d", __func__, *timestamp);
2812 }
2813
2814 /******************************************************************************
2815 * *
2816 * Function: process_item_value *
2817 * *
2818 * Purpose: processes item value depending on proxy/flags settings *
2819 * *
2820 * Parameters: item - [IN] the item to process *
2821 * result - [IN] the item result *
2822 * *
2823 * Comments: Values gathered by server are sent to the preprocessing manager, *
2824 * while values received from proxy are already preprocessed and *
2825 * must be either directly stored to history cache or sent to lld *
2826 * manager. *
2827 * *
2828 ******************************************************************************/
process_item_value(const DC_ITEM * item,AGENT_RESULT * result,zbx_timespec_t * ts,int * h_num,char * error)2829 static void process_item_value(const DC_ITEM *item, AGENT_RESULT *result, zbx_timespec_t *ts, int *h_num,
2830 char *error)
2831 {
2832 if (0 == item->host.proxy_hostid)
2833 {
2834 zbx_preprocess_item_value(item->itemid, item->host.hostid, item->value_type, item->flags, result, ts,
2835 item->state, error);
2836 *h_num = 0;
2837 }
2838 else
2839 {
2840 if (0 != (ZBX_FLAG_DISCOVERY_RULE & item->flags))
2841 {
2842 zbx_lld_process_agent_result(item->itemid, item->host.hostid, result, ts, error);
2843 *h_num = 0;
2844 }
2845 else
2846 {
2847 dc_add_history(item->itemid, item->value_type, item->flags, result, ts, item->state, error);
2848 *h_num = 1;
2849 }
2850 }
2851 }
2852
2853 /******************************************************************************
2854 * *
2855 * Function: process_history_data_value *
2856 * *
2857 * Purpose: process single value from incoming history data *
2858 * *
2859 * Parameters: item - [IN] the item to process *
2860 * value - [IN] the value to process *
2861 * hval - [OUT] indication that value was added to history *
2862 * *
2863 * Return value: SUCCEED - the value was processed successfully *
2864 * FAIL - otherwise *
2865 * *
2866 ******************************************************************************/
process_history_data_value(DC_ITEM * item,zbx_agent_value_t * value,int * h_num)2867 static int process_history_data_value(DC_ITEM *item, zbx_agent_value_t *value, int *h_num)
2868 {
2869 if (ITEM_STATUS_ACTIVE != item->status)
2870 return FAIL;
2871
2872 if (HOST_STATUS_MONITORED != item->host.status)
2873 return FAIL;
2874
2875 /* update item nextcheck during maintenance */
2876 if (SUCCEED == in_maintenance_without_data_collection(item->host.maintenance_status,
2877 item->host.maintenance_type, item->type) &&
2878 item->host.maintenance_from <= value->ts.sec)
2879 {
2880 return SUCCEED;
2881 }
2882
2883 if (NULL == value->value && ITEM_STATE_NOTSUPPORTED == value->state)
2884 {
2885 THIS_SHOULD_NEVER_HAPPEN;
2886 return FAIL;
2887 }
2888
2889 if (ITEM_STATE_NOTSUPPORTED == value->state ||
2890 (NULL != value->value && 0 == strcmp(value->value, ZBX_NOTSUPPORTED)))
2891 {
2892 zabbix_log(LOG_LEVEL_DEBUG, "item [%s:%s] error: %s", item->host.host, item->key_orig, value->value);
2893
2894 item->state = ITEM_STATE_NOTSUPPORTED;
2895 process_item_value(item, NULL, &value->ts, h_num, value->value);
2896 }
2897 else
2898 {
2899 AGENT_RESULT result;
2900
2901 init_result(&result);
2902
2903 if (NULL != value->value)
2904 {
2905 if (ITEM_VALUE_TYPE_LOG == item->value_type)
2906 {
2907 zbx_log_t *log;
2908
2909 log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t));
2910 log->value = zbx_strdup(NULL, value->value);
2911 zbx_replace_invalid_utf8(log->value);
2912
2913 if (0 == value->timestamp)
2914 {
2915 log->timestamp = 0;
2916 calc_timestamp(log->value, &log->timestamp, item->logtimefmt);
2917 }
2918 else
2919 log->timestamp = value->timestamp;
2920
2921 log->logeventid = value->logeventid;
2922 log->severity = value->severity;
2923
2924 if (NULL != value->source)
2925 {
2926 log->source = zbx_strdup(NULL, value->source);
2927 zbx_replace_invalid_utf8(log->source);
2928 }
2929 else
2930 log->source = NULL;
2931
2932 SET_LOG_RESULT(&result, log);
2933 }
2934 else
2935 set_result_type(&result, ITEM_VALUE_TYPE_TEXT, value->value);
2936 }
2937
2938 if (0 != value->meta)
2939 set_result_meta(&result, value->lastlogsize, value->mtime);
2940
2941 if (0 != ISSET_VALUE(&result) || 0 != ISSET_META(&result))
2942 {
2943 item->state = ITEM_STATE_NORMAL;
2944 process_item_value(item, &result, &value->ts, h_num, NULL);
2945 }
2946
2947 free_result(&result);
2948 }
2949
2950 return SUCCEED;
2951 }
2952
2953 /******************************************************************************
2954 * *
2955 * Function: process_history_data *
2956 * *
2957 * Purpose: process new item values *
2958 * *
2959 * Parameters: items - [IN] the items to process *
2960 * values - [IN] the item values value to process *
2961 * errcodes - [IN/OUT] in - item configuration error code *
2962 * (FAIL - item/host was not found) *
2963 * out - value processing result *
2964 * (SUCCEED - processed, FAIL - error) *
2965 * values_num - [IN] the number of items/values to process *
2966 * nodata_win - [IN/OUT] proxy communication delay info *
2967 * *
2968 * Return value: the number of processed values *
2969 * *
2970 ******************************************************************************/
process_history_data(DC_ITEM * items,zbx_agent_value_t * values,int * errcodes,size_t values_num,zbx_proxy_suppress_t * nodata_win)2971 int process_history_data(DC_ITEM *items, zbx_agent_value_t *values, int *errcodes, size_t values_num,
2972 zbx_proxy_suppress_t *nodata_win)
2973 {
2974 size_t i;
2975 int processed_num = 0, history_num;
2976
2977 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2978
2979 for (i = 0; i < values_num; i++)
2980 {
2981 if (SUCCEED != errcodes[i])
2982 continue;
2983
2984 history_num = 0;
2985
2986 if (SUCCEED != process_history_data_value(&items[i], &values[i], &history_num))
2987 {
2988 /* clean failed items to avoid updating their runtime data */
2989 DCconfig_clean_items(&items[i], &errcodes[i], 1);
2990 errcodes[i] = FAIL;
2991 continue;
2992 }
2993
2994 if (0 != items[i].host.proxy_hostid && NULL != nodata_win &&
2995 0 != (nodata_win->flags & ZBX_PROXY_SUPPRESS_ACTIVE) && 0 < history_num)
2996 {
2997 if (values[i].ts.sec <= nodata_win->period_end)
2998 {
2999 nodata_win->values_num++;
3000 }
3001 else
3002 {
3003 nodata_win->flags &= (~ZBX_PROXY_SUPPRESS_MORE);
3004 }
3005
3006 zabbix_log(LOG_LEVEL_TRACE, "%s() flags:%d values_num:%d value_time:%d period_end:%d",
3007 __func__, nodata_win->flags, nodata_win->values_num, values[i].ts.sec,
3008 nodata_win->period_end);
3009 }
3010
3011 processed_num++;
3012 }
3013
3014 if (0 < processed_num)
3015 zbx_dc_items_update_nextcheck(items, values, errcodes, values_num);
3016
3017 zbx_preprocessor_flush();
3018 dc_flush_history();
3019
3020 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() processed:%d", __func__, processed_num);
3021
3022 return processed_num;
3023 }
3024
3025 /******************************************************************************
3026 * *
3027 * Function: zbx_agent_values_clean *
3028 * *
3029 * Purpose: frees resources allocated to store agent values *
3030 * *
3031 * Parameters: values - [IN] the values to clean *
3032 * values_num - [IN] the number of items in values array *
3033 * *
3034 ******************************************************************************/
zbx_agent_values_clean(zbx_agent_value_t * values,size_t values_num)3035 static void zbx_agent_values_clean(zbx_agent_value_t *values, size_t values_num)
3036 {
3037 size_t i;
3038
3039 for (i = 0; i < values_num; i++)
3040 {
3041 zbx_free(values[i].value);
3042 zbx_free(values[i].source);
3043 }
3044 }
3045
3046 /******************************************************************************
3047 * *
3048 * Function: log_client_timediff *
3049 * *
3050 * Purpose: calculates difference between server and client (proxy, active *
3051 * agent or sender) time and log it *
3052 * *
3053 * Parameters: level - [IN] log level *
3054 * jp - [IN] JSON with clock, [ns] fields *
3055 * ts_recv - [IN] the connection timestamp *
3056 * *
3057 ******************************************************************************/
log_client_timediff(int level,struct zbx_json_parse * jp,const zbx_timespec_t * ts_recv)3058 static void log_client_timediff(int level, struct zbx_json_parse *jp, const zbx_timespec_t *ts_recv)
3059 {
3060 char tmp[32];
3061 zbx_timespec_t client_timediff;
3062 int sec, ns;
3063
3064 if (SUCCEED != ZBX_CHECK_LOG_LEVEL(level))
3065 return;
3066
3067 if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp), NULL))
3068 {
3069 sec = atoi(tmp);
3070 client_timediff.sec = ts_recv->sec - sec;
3071
3072 if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_NS, tmp, sizeof(tmp), NULL))
3073 {
3074 ns = atoi(tmp);
3075 client_timediff.ns = ts_recv->ns - ns;
3076
3077 if (client_timediff.sec > 0 && client_timediff.ns < 0)
3078 {
3079 client_timediff.sec--;
3080 client_timediff.ns += 1000000000;
3081 }
3082 else if (client_timediff.sec < 0 && client_timediff.ns > 0)
3083 {
3084 client_timediff.sec++;
3085 client_timediff.ns -= 1000000000;
3086 }
3087
3088 zabbix_log(level, "%s(): timestamp from json %d seconds and %d nanosecond, "
3089 "delta time from json %d seconds and %d nanosecond",
3090 __func__, sec, ns, client_timediff.sec, client_timediff.ns);
3091 }
3092 else
3093 {
3094 zabbix_log(level, "%s(): timestamp from json %d seconds, "
3095 "delta time from json %d seconds", __func__, sec, client_timediff.sec);
3096 }
3097 }
3098 }
3099
3100 /******************************************************************************
3101 * *
3102 * Function: parse_history_data_row_value *
3103 * *
3104 * Purpose: parses agent value from history data json row *
3105 * *
3106 * Parameters: jp_row - [IN] JSON with history data row *
3107 * unique_shift - [IN/OUT] auto increment nanoseconds to ensure *
3108 * unique value of timestamps *
3109 * av - [OUT] the agent value *
3110 * *
3111 * Return value: SUCCEED - the value was parsed successfully *
3112 * FAIL - otherwise *
3113 * *
3114 ******************************************************************************/
parse_history_data_row_value(const struct zbx_json_parse * jp_row,zbx_timespec_t * unique_shift,zbx_agent_value_t * av)3115 static int parse_history_data_row_value(const struct zbx_json_parse *jp_row, zbx_timespec_t *unique_shift,
3116 zbx_agent_value_t *av)
3117 {
3118 char *tmp = NULL;
3119 size_t tmp_alloc = 0;
3120 int ret = FAIL;
3121
3122 memset(av, 0, sizeof(zbx_agent_value_t));
3123
3124 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_CLOCK, &tmp, &tmp_alloc, NULL))
3125 {
3126 if (FAIL == is_uint31(tmp, &av->ts.sec))
3127 goto out;
3128
3129 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_NS, &tmp, &tmp_alloc, NULL))
3130 {
3131 if (FAIL == is_uint_n_range(tmp, tmp_alloc, &av->ts.ns, sizeof(av->ts.ns),
3132 0LL, 999999999LL))
3133 {
3134 goto out;
3135 }
3136 }
3137 else
3138 {
3139 /* ensure unique value timestamp (clock, ns) if only clock is available */
3140
3141 av->ts.sec += unique_shift->sec;
3142 av->ts.ns = unique_shift->ns++;
3143
3144 if (unique_shift->ns > 999999999)
3145 {
3146 unique_shift->sec++;
3147 unique_shift->ns = 0;
3148 }
3149 }
3150 }
3151 else
3152 zbx_timespec(&av->ts);
3153
3154 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_STATE, &tmp, &tmp_alloc, NULL))
3155 av->state = (unsigned char)atoi(tmp);
3156
3157 /* Unsupported item meta information must be ignored for backwards compatibility. */
3158 /* New agents will not send meta information for items in unsupported state. */
3159 if (ITEM_STATE_NOTSUPPORTED != av->state)
3160 {
3161 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LASTLOGSIZE, &tmp, &tmp_alloc, NULL))
3162 {
3163 av->meta = 1; /* contains meta information */
3164
3165 is_uint64(tmp, &av->lastlogsize);
3166
3167 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_MTIME, &tmp, &tmp_alloc, NULL))
3168 av->mtime = atoi(tmp);
3169 }
3170 }
3171
3172 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_VALUE, &tmp, &tmp_alloc, NULL))
3173 av->value = zbx_strdup(av->value, tmp);
3174
3175 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGTIMESTAMP, &tmp, &tmp_alloc, NULL))
3176 av->timestamp = atoi(tmp);
3177
3178 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGSOURCE, &tmp, &tmp_alloc, NULL))
3179 av->source = zbx_strdup(av->source, tmp);
3180
3181 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGSEVERITY, &tmp, &tmp_alloc, NULL))
3182 av->severity = atoi(tmp);
3183
3184 if (SUCCEED == zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_LOGEVENTID, &tmp, &tmp_alloc, NULL))
3185 av->logeventid = atoi(tmp);
3186
3187 if (SUCCEED != zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_ID, &tmp, &tmp_alloc, NULL) ||
3188 SUCCEED != is_uint64(tmp, &av->id))
3189 {
3190 av->id = 0;
3191 }
3192
3193 zbx_free(tmp);
3194
3195 ret = SUCCEED;
3196 out:
3197 return ret;
3198 }
3199
3200 /******************************************************************************
3201 * *
3202 * Function: parse_history_data_row_itemid *
3203 * *
3204 * Purpose: parses item identifier from history data json row *
3205 * *
3206 * Parameters: jp_row - [IN] JSON with history data row *
3207 * itemid - [OUT] the item identifier *
3208 * *
3209 * Return value: SUCCEED - the item identifier was parsed successfully *
3210 * FAIL - otherwise *
3211 * *
3212 ******************************************************************************/
parse_history_data_row_itemid(const struct zbx_json_parse * jp_row,zbx_uint64_t * itemid)3213 static int parse_history_data_row_itemid(const struct zbx_json_parse *jp_row, zbx_uint64_t *itemid)
3214 {
3215 char buffer[MAX_ID_LEN + 1];
3216
3217 if (SUCCEED != zbx_json_value_by_name(jp_row, ZBX_PROTO_TAG_ITEMID, buffer, sizeof(buffer), NULL))
3218 return FAIL;
3219
3220 if (SUCCEED != is_uint64(buffer, itemid))
3221 return FAIL;
3222
3223 return SUCCEED;
3224 }
3225 /******************************************************************************
3226 * *
3227 * Function: parse_history_data_row_hostkey *
3228 * *
3229 * Purpose: parses host,key pair from history data json row *
3230 * *
3231 * Parameters: jp_row - [IN] JSON with history data row *
3232 * hk - [OUT] the host,key pair *
3233 * *
3234 * Return value: SUCCEED - the host,key pair was parsed successfully *
3235 * FAIL - otherwise *
3236 * *
3237 ******************************************************************************/
parse_history_data_row_hostkey(const struct zbx_json_parse * jp_row,zbx_host_key_t * hk)3238 static int parse_history_data_row_hostkey(const struct zbx_json_parse *jp_row, zbx_host_key_t *hk)
3239 {
3240 size_t str_alloc;
3241
3242 str_alloc = 0;
3243 zbx_free(hk->host);
3244
3245 if (SUCCEED != zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_HOST, &hk->host, &str_alloc, NULL))
3246 return FAIL;
3247
3248 str_alloc = 0;
3249 zbx_free(hk->key);
3250
3251 if (SUCCEED != zbx_json_value_by_name_dyn(jp_row, ZBX_PROTO_TAG_KEY, &hk->key, &str_alloc, NULL))
3252 {
3253 zbx_free(hk->host);
3254 return FAIL;
3255 }
3256
3257 return SUCCEED;
3258 }
3259
3260 /******************************************************************************
3261 * *
3262 * Function: parse_history_data *
3263 * *
3264 * Purpose: parses up to ZBX_HISTORY_VALUES_MAX item values and host,key *
3265 * pairs from history data json *
3266 * *
3267 * Parameters: jp_data - [IN] JSON with history data array *
3268 * pnext - [IN/OUT] the pointer to the next item in json, *
3269 * NULL - no more data left *
3270 * values - [OUT] the item values *
3271 * hostkeys - [OUT] the corresponding host,key pairs *
3272 * values_num - [OUT] number of elements in values and hostkeys *
3273 * arrays *
3274 * parsed_num - [OUT] the number of values parsed *
3275 * unique_shift - [IN/OUT] auto increment nanoseconds to ensure *
3276 * unique value of timestamps *
3277 * *
3278 * Return value: SUCCEED - values were parsed successfully *
3279 * FAIL - an error occurred *
3280 * *
3281 ******************************************************************************/
parse_history_data(struct zbx_json_parse * jp_data,const char ** pnext,zbx_agent_value_t * values,zbx_host_key_t * hostkeys,int * values_num,int * parsed_num,zbx_timespec_t * unique_shift)3282 static int parse_history_data(struct zbx_json_parse *jp_data, const char **pnext, zbx_agent_value_t *values,
3283 zbx_host_key_t *hostkeys, int *values_num, int *parsed_num, zbx_timespec_t *unique_shift)
3284 {
3285 struct zbx_json_parse jp_row;
3286 int ret = FAIL;
3287
3288 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
3289
3290 *values_num = 0;
3291 *parsed_num = 0;
3292
3293 if (NULL == *pnext)
3294 {
3295 if (NULL == (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX)
3296 {
3297 ret = SUCCEED;
3298 goto out;
3299 }
3300 }
3301
3302 /* iterate the history data rows */
3303 do
3304 {
3305 if (FAIL == zbx_json_brackets_open(*pnext, &jp_row))
3306 {
3307 zabbix_log(LOG_LEVEL_WARNING, "%s", zbx_json_strerror());
3308 goto out;
3309 }
3310
3311 (*parsed_num)++;
3312
3313 if (SUCCEED != parse_history_data_row_hostkey(&jp_row, &hostkeys[*values_num]))
3314 continue;
3315
3316 if (SUCCEED != parse_history_data_row_value(&jp_row, unique_shift, &values[*values_num]))
3317 continue;
3318
3319 (*values_num)++;
3320 }
3321 while (NULL != (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX);
3322
3323 ret = SUCCEED;
3324 out:
3325 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s processed:%d/%d", __func__, zbx_result_string(ret),
3326 *values_num, *parsed_num);
3327
3328 return ret;
3329 }
3330
3331 /******************************************************************************
3332 * *
3333 * Function: parse_history_data_by_itemids *
3334 * *
3335 * Purpose: parses up to ZBX_HISTORY_VALUES_MAX item values and item *
3336 * identifiers from history data json *
3337 * *
3338 * Parameters: jp_data - [IN] JSON with history data array *
3339 * pnext - [IN/OUT] the pointer to the next item in *
3340 * json, NULL - no more data left *
3341 * values - [OUT] the item values *
3342 * itemids - [OUT] the corresponding item identifiers *
3343 * values_num - [OUT] number of elements in values and itemids *
3344 * arrays *
3345 * parsed_num - [OUT] the number of values parsed *
3346 * unique_shift - [IN/OUT] auto increment nanoseconds to ensure *
3347 * unique value of timestamps *
3348 * info - [OUT] address of a pointer to the info string *
3349 * (should be freed by the caller) *
3350 * *
3351 * Return value: SUCCEED - values were parsed successfully *
3352 * FAIL - an error occurred *
3353 * *
3354 * Comments: This function is used to parse the new proxy history data *
3355 * protocol introduced in Zabbix v3.3. *
3356 * *
3357 ******************************************************************************/
parse_history_data_by_itemids(struct zbx_json_parse * jp_data,const char ** pnext,zbx_agent_value_t * values,zbx_uint64_t * itemids,int * values_num,int * parsed_num,zbx_timespec_t * unique_shift,char ** error)3358 static int parse_history_data_by_itemids(struct zbx_json_parse *jp_data, const char **pnext,
3359 zbx_agent_value_t *values, zbx_uint64_t *itemids, int *values_num, int *parsed_num,
3360 zbx_timespec_t *unique_shift, char **error)
3361 {
3362 struct zbx_json_parse jp_row;
3363 int ret = FAIL;
3364
3365 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
3366
3367 *values_num = 0;
3368 *parsed_num = 0;
3369
3370 if (NULL == *pnext)
3371 {
3372 if (NULL == (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX)
3373 {
3374 ret = SUCCEED;
3375 goto out;
3376 }
3377 }
3378
3379 /* iterate the history data rows */
3380 do
3381 {
3382 if (FAIL == zbx_json_brackets_open(*pnext, &jp_row))
3383 {
3384 *error = zbx_strdup(*error, zbx_json_strerror());
3385 goto out;
3386 }
3387
3388 (*parsed_num)++;
3389
3390 if (SUCCEED != parse_history_data_row_itemid(&jp_row, &itemids[*values_num]))
3391 continue;
3392
3393 if (SUCCEED != parse_history_data_row_value(&jp_row, unique_shift, &values[*values_num]))
3394 continue;
3395
3396 (*values_num)++;
3397 }
3398 while (NULL != (*pnext = zbx_json_next(jp_data, *pnext)) && *values_num < ZBX_HISTORY_VALUES_MAX);
3399
3400 ret = SUCCEED;
3401 out:
3402 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s processed:%d/%d", __func__, zbx_result_string(ret),
3403 *values_num, *parsed_num);
3404
3405 return ret;
3406 }
3407
3408 /******************************************************************************
3409 * *
3410 * Function: proxy_item_validator *
3411 * *
3412 * Purpose: validates item received from proxy *
3413 * *
3414 * Parameters: item - [IN/OUT] the item data *
3415 * sock - [IN] the connection socket *
3416 * args - [IN] the validator arguments *
3417 * error - unused *
3418 * *
3419 * Return value: SUCCEED - the validation was successful *
3420 * FAIL - otherwise *
3421 * *
3422 ******************************************************************************/
proxy_item_validator(DC_ITEM * item,zbx_socket_t * sock,void * args,char ** error)3423 static int proxy_item_validator(DC_ITEM *item, zbx_socket_t *sock, void *args, char **error)
3424 {
3425 zbx_uint64_t *proxyid = (zbx_uint64_t *)args;
3426
3427 ZBX_UNUSED(sock);
3428 ZBX_UNUSED(error);
3429
3430 /* don't process item if its host was assigned to another proxy */
3431 if (item->host.proxy_hostid != *proxyid)
3432 return FAIL;
3433
3434 /* don't process aggregate/calculated items coming from proxy */
3435 if (ITEM_TYPE_AGGREGATE == item->type || ITEM_TYPE_CALCULATED == item->type)
3436 return FAIL;
3437
3438 return SUCCEED;
3439 }
3440
3441 /******************************************************************************
3442 * *
3443 * Function: process_history_data_by_itemids *
3444 * *
3445 * Purpose: parses history data array and process the data *
3446 * *
3447 * Parameters: proxy - [IN] the proxy *
3448 * jp_data - [IN] JSON with history data array *
3449 * session - [IN] the data session *
3450 * nodata_win - [OUT] counter of delayed values *
3451 * info - [OUT] address of a pointer to the info *
3452 * string (should be freed by the caller) *
3453 * mode - [IN] item retrieve mode is used to retrieve only *
3454 * necessary data to reduce time spent holding *
3455 * read lock *
3456 * *
3457 * Return value: SUCCEED - processed successfully *
3458 * FAIL - an error occurred *
3459 * *
3460 * Comments: This function is used to parse the new proxy history data *
3461 * protocol introduced in Zabbix v3.3. *
3462 * *
3463 ******************************************************************************/
process_history_data_by_itemids(zbx_socket_t * sock,zbx_client_item_validator_t validator_func,void * validator_args,struct zbx_json_parse * jp_data,zbx_data_session_t * session,zbx_proxy_suppress_t * nodata_win,char ** info,unsigned int mode)3464 static int process_history_data_by_itemids(zbx_socket_t *sock, zbx_client_item_validator_t validator_func,
3465 void *validator_args, struct zbx_json_parse *jp_data, zbx_data_session_t *session,
3466 zbx_proxy_suppress_t *nodata_win, char **info, unsigned int mode)
3467 {
3468 const char *pnext = NULL;
3469 int ret = SUCCEED, processed_num = 0, total_num = 0, values_num, read_num, i, *errcodes;
3470 double sec;
3471 DC_ITEM *items;
3472 char *error = NULL;
3473 zbx_uint64_t itemids[ZBX_HISTORY_VALUES_MAX];
3474 zbx_agent_value_t values[ZBX_HISTORY_VALUES_MAX];
3475 zbx_timespec_t unique_shift = {0, 0};
3476
3477 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
3478
3479 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * ZBX_HISTORY_VALUES_MAX);
3480 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * ZBX_HISTORY_VALUES_MAX);
3481
3482 sec = zbx_time();
3483
3484 while (SUCCEED == parse_history_data_by_itemids(jp_data, &pnext, values, itemids, &values_num, &read_num,
3485 &unique_shift, &error) && 0 != values_num)
3486 {
3487 DCconfig_get_items_by_itemids_partial(items, itemids, errcodes, (size_t)values_num, mode);
3488
3489 for (i = 0; i < values_num; i++)
3490 {
3491 if (SUCCEED != errcodes[i])
3492 continue;
3493
3494 /* check and discard if duplicate data */
3495 if (NULL != session && 0 != values[i].id && values[i].id <= session->last_valueid)
3496 {
3497 DCconfig_clean_items(&items[i], &errcodes[i], 1);
3498 errcodes[i] = FAIL;
3499 continue;
3500 }
3501
3502 if (SUCCEED != validator_func(&items[i], sock, validator_args, &error))
3503 {
3504 if (NULL != error)
3505 {
3506 zabbix_log(LOG_LEVEL_WARNING, "%s", error);
3507 zbx_free(error);
3508 }
3509
3510 DCconfig_clean_items(&items[i], &errcodes[i], 1);
3511 errcodes[i] = FAIL;
3512 }
3513 }
3514
3515 processed_num += process_history_data(items, values, errcodes, values_num, nodata_win);
3516
3517 total_num += read_num;
3518
3519 if (NULL != session)
3520 session->last_valueid = values[values_num - 1].id;
3521
3522 DCconfig_clean_items(items, errcodes, values_num);
3523 zbx_agent_values_clean(values, values_num);
3524
3525 if (NULL == pnext)
3526 break;
3527 }
3528
3529 zbx_free(errcodes);
3530 zbx_free(items);
3531
3532 if (NULL == error)
3533 {
3534 ret = SUCCEED;
3535 *info = zbx_dsprintf(*info, "processed: %d; failed: %d; total: %d; seconds spent: " ZBX_FS_DBL,
3536 processed_num, total_num - processed_num, total_num, zbx_time() - sec);
3537 }
3538 else
3539 {
3540 zbx_free(*info);
3541 *info = error;
3542 }
3543
3544 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
3545
3546 return ret;
3547 }
3548
3549 /******************************************************************************
3550 * *
3551 * Function: agent_item_validator *
3552 * *
3553 * Purpose: validates item received from active agent *
3554 * *
3555 * Parameters: item - [IN] the item data *
3556 * sock - [IN] the connection socket *
3557 * args - [IN] the validator arguments *
3558 * error - [OUT] the error message *
3559 * *
3560 * Return value: SUCCEED - the validation was successful *
3561 * FAIL - otherwise *
3562 * *
3563 ******************************************************************************/
agent_item_validator(DC_ITEM * item,zbx_socket_t * sock,void * args,char ** error)3564 static int agent_item_validator(DC_ITEM *item, zbx_socket_t *sock, void *args, char **error)
3565 {
3566 zbx_host_rights_t *rights = (zbx_host_rights_t *)args;
3567
3568 if (0 != item->host.proxy_hostid)
3569 return FAIL;
3570
3571 if (ITEM_TYPE_ZABBIX_ACTIVE != item->type)
3572 return FAIL;
3573
3574 if (rights->hostid != item->host.hostid)
3575 {
3576 rights->hostid = item->host.hostid;
3577 rights->value = zbx_host_check_permissions(&item->host, sock, error);
3578 }
3579
3580 return rights->value;
3581 }
3582
3583 /******************************************************************************
3584 * *
3585 * Function: sender_item_validator *
3586 * *
3587 * Purpose: validates item received from sender *
3588 * *
3589 * Parameters: item - [IN] the item data *
3590 * sock - [IN] the connection socket *
3591 * args - [IN] the validator arguments *
3592 * error - [OUT] the error message *
3593 * *
3594 * Return value: SUCCEED - the validation was successful *
3595 * FAIL - otherwise *
3596 * *
3597 ******************************************************************************/
sender_item_validator(DC_ITEM * item,zbx_socket_t * sock,void * args,char ** error)3598 static int sender_item_validator(DC_ITEM *item, zbx_socket_t *sock, void *args, char **error)
3599 {
3600 zbx_host_rights_t *rights;
3601 char key_short[VALUE_ERRMSG_MAX * ZBX_MAX_BYTES_IN_UTF8_CHAR + 1];
3602
3603 if (0 != item->host.proxy_hostid)
3604 return FAIL;
3605
3606 switch(item->type)
3607 {
3608 case ITEM_TYPE_HTTPAGENT:
3609 if (0 == item->allow_traps)
3610 {
3611 *error = zbx_dsprintf(*error, "cannot process HTTP agent item \"%s\" trap:"
3612 " trapping is not enabled", zbx_truncate_itemkey(item->key_orig,
3613 VALUE_ERRMSG_MAX, key_short, sizeof(key_short)));
3614 return FAIL;
3615 }
3616 break;
3617 case ITEM_TYPE_TRAPPER:
3618 break;
3619 default:
3620 *error = zbx_dsprintf(*error, "cannot process item \"%s\" trap:"
3621 " item type \"%d\" cannot be used with traps",
3622 zbx_truncate_itemkey(item->key_orig, VALUE_ERRMSG_MAX, key_short,
3623 sizeof(key_short)), item->type);
3624 return FAIL;
3625 }
3626
3627 if ('\0' != *item->trapper_hosts) /* list of allowed hosts not empty */
3628 {
3629 char *allowed_peers;
3630 int ret;
3631
3632 allowed_peers = zbx_strdup(NULL, item->trapper_hosts);
3633 substitute_simple_macros(NULL, NULL, NULL, NULL, NULL, NULL, item, NULL, NULL,
3634 &allowed_peers, MACRO_TYPE_ALLOWED_HOSTS, NULL, 0);
3635 ret = zbx_tcp_check_allowed_peers(sock, allowed_peers);
3636 zbx_free(allowed_peers);
3637
3638 if (FAIL == ret)
3639 {
3640 *error = zbx_dsprintf(*error, "cannot process item \"%s\" trap: %s",
3641 zbx_truncate_itemkey(item->key_orig, VALUE_ERRMSG_MAX, key_short,
3642 sizeof(key_short)), zbx_socket_strerror());
3643 return FAIL;
3644 }
3645 }
3646
3647 rights = (zbx_host_rights_t *)args;
3648
3649 if (rights->hostid != item->host.hostid)
3650 {
3651 rights->hostid = item->host.hostid;
3652 rights->value = zbx_host_check_permissions(&item->host, sock, error);
3653 }
3654
3655 return rights->value;
3656 }
3657
process_history_data_by_keys(zbx_socket_t * sock,zbx_client_item_validator_t validator_func,void * validator_args,char ** info,struct zbx_json_parse * jp_data,const char * token)3658 static void process_history_data_by_keys(zbx_socket_t *sock, zbx_client_item_validator_t validator_func,
3659 void *validator_args, char **info, struct zbx_json_parse *jp_data, const char *token)
3660 {
3661 int values_num, read_num, processed_num = 0, total_num = 0, i;
3662 zbx_timespec_t unique_shift = {0, 0};
3663 const char *pnext = NULL;
3664 char *error = NULL;
3665 zbx_host_key_t *hostkeys;
3666 DC_ITEM *items;
3667 zbx_data_session_t *session = NULL;
3668 zbx_uint64_t last_hostid = 0;
3669 zbx_agent_value_t values[ZBX_HISTORY_VALUES_MAX];
3670 int errcodes[ZBX_HISTORY_VALUES_MAX];
3671 double sec;
3672
3673 sec = zbx_time();
3674
3675 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * ZBX_HISTORY_VALUES_MAX);
3676 hostkeys = (zbx_host_key_t *)zbx_malloc(NULL, sizeof(zbx_host_key_t) * ZBX_HISTORY_VALUES_MAX);
3677 memset(hostkeys, 0, sizeof(zbx_host_key_t) * ZBX_HISTORY_VALUES_MAX);
3678
3679 while (SUCCEED == parse_history_data(jp_data, &pnext, values, hostkeys, &values_num, &read_num,
3680 &unique_shift) && 0 != values_num)
3681 {
3682 DCconfig_get_items_by_keys(items, hostkeys, errcodes, values_num);
3683
3684 for (i = 0; i < values_num; i++)
3685 {
3686 if (SUCCEED != errcodes[i])
3687 {
3688 zabbix_log(LOG_LEVEL_DEBUG, "cannot retrieve key \"%s\" on host \"%s\" from "
3689 "configuration cache", hostkeys[i].key, hostkeys[i].host);
3690 continue;
3691 }
3692
3693 if (last_hostid != items[i].host.hostid)
3694 {
3695 last_hostid = items[i].host.hostid;
3696
3697 if (NULL != token)
3698 session = zbx_dc_get_or_create_data_session(last_hostid, token);
3699 }
3700
3701 /* check and discard if duplicate data */
3702 if (NULL != session && 0 != values[i].id && values[i].id <= session->last_valueid)
3703 {
3704 DCconfig_clean_items(&items[i], &errcodes[i], 1);
3705 errcodes[i] = FAIL;
3706 continue;
3707 }
3708
3709 if (SUCCEED != validator_func(&items[i], sock, validator_args, &error))
3710 {
3711 if (NULL != error)
3712 {
3713 zabbix_log(LOG_LEVEL_WARNING, "%s", error);
3714 zbx_free(error);
3715 }
3716 else
3717 {
3718 zabbix_log(LOG_LEVEL_DEBUG, "unknown validation error for item \"%s\"",
3719 (NULL == items[i].key) ? items[i].key_orig : items[i].key);
3720 }
3721
3722 DCconfig_clean_items(&items[i], &errcodes[i], 1);
3723 errcodes[i] = FAIL;
3724 }
3725
3726 if (NULL != session)
3727 session->last_valueid = values[i].id;
3728 }
3729
3730 processed_num += process_history_data(items, values, errcodes, values_num, NULL);
3731 total_num += read_num;
3732
3733 DCconfig_clean_items(items, errcodes, values_num);
3734 zbx_agent_values_clean(values, values_num);
3735
3736 if (NULL == pnext)
3737 break;
3738 }
3739
3740 for (i = 0; i < ZBX_HISTORY_VALUES_MAX; i++)
3741 {
3742 zbx_free(hostkeys[i].host);
3743 zbx_free(hostkeys[i].key);
3744 }
3745
3746 zbx_free(hostkeys);
3747 zbx_free(items);
3748
3749 *info = zbx_dsprintf(*info, "processed: %d; failed: %d; total: %d; seconds spent: " ZBX_FS_DBL,
3750 processed_num, total_num - processed_num, total_num, zbx_time() - sec);
3751 }
3752
3753 /******************************************************************************
3754 * *
3755 * Function: process_client_history_data *
3756 * *
3757 * Purpose: process history data sent by proxy/agent/sender *
3758 * *
3759 * Parameters: sock - [IN] the connection socket *
3760 * jp - [IN] JSON with historical data *
3761 * ts - [IN] the client connection timestamp *
3762 * validator_func - [IN] the item validator callback function *
3763 * validator_args - [IN] the user arguments passed to validator *
3764 * function *
3765 * info - [OUT] address of a pointer to the info string *
3766 * (should be freed by the caller) *
3767 * *
3768 * Return value: SUCCEED - processed successfully *
3769 * FAIL - an error occurred *
3770 * *
3771 ******************************************************************************/
process_client_history_data(zbx_socket_t * sock,struct zbx_json_parse * jp,zbx_timespec_t * ts,zbx_client_item_validator_t validator_func,void * validator_args,char ** info)3772 static int process_client_history_data(zbx_socket_t *sock, struct zbx_json_parse *jp, zbx_timespec_t *ts,
3773 zbx_client_item_validator_t validator_func, void *validator_args, char **info)
3774 {
3775 int ret;
3776 char *token = NULL;
3777 size_t token_alloc = 0;
3778 struct zbx_json_parse jp_data;
3779 char tmp[MAX_STRING_LEN];
3780 int version;
3781
3782 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
3783
3784 log_client_timediff(LOG_LEVEL_DEBUG, jp, ts);
3785
3786 if (SUCCEED != (ret = zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DATA, &jp_data)))
3787 {
3788 *info = zbx_strdup(*info, zbx_json_strerror());
3789 goto out;
3790 }
3791
3792 if (SUCCEED == zbx_json_value_by_name_dyn(jp, ZBX_PROTO_TAG_SESSION, &token, &token_alloc, NULL))
3793 {
3794 size_t token_len;
3795
3796 if (ZBX_DATA_SESSION_TOKEN_SIZE != (token_len = strlen(token)))
3797 {
3798 *info = zbx_dsprintf(*info, "invalid session token length %d", (int)token_len);
3799 ret = FAIL;
3800 goto out;
3801 }
3802 }
3803
3804 if (SUCCEED != zbx_json_value_by_name(jp, ZBX_PROTO_TAG_VERSION, tmp, sizeof(tmp), NULL) ||
3805 FAIL == (version = zbx_get_component_version(tmp)))
3806 {
3807 version = ZBX_COMPONENT_VERSION(4, 2);
3808 }
3809
3810 if (ZBX_COMPONENT_VERSION(4, 4) <= version &&
3811 SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_HOST, tmp, sizeof(tmp), NULL))
3812 {
3813 zbx_data_session_t *session;
3814 zbx_uint64_t hostid;
3815
3816 if (SUCCEED != DCconfig_get_hostid_by_name(tmp, &hostid))
3817 {
3818 *info = zbx_dsprintf(*info, "unknown host '%s'", tmp);
3819 ret = SUCCEED;
3820 goto out;
3821 }
3822
3823 if (NULL == token)
3824 session = NULL;
3825 else
3826 session = zbx_dc_get_or_create_data_session(hostid, token);
3827
3828 if (SUCCEED != (ret = process_history_data_by_itemids(sock, validator_func, validator_args, &jp_data,
3829 session, NULL, info, ZBX_ITEM_GET_ALL)))
3830 {
3831 goto out;
3832 }
3833 }
3834 else
3835 process_history_data_by_keys(sock, validator_func, validator_args, info, &jp_data, token);
3836 out:
3837 zbx_free(token);
3838
3839 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
3840
3841 return ret;
3842 }
3843
3844 /******************************************************************************
3845 * *
3846 * Function: process_agent_history_data *
3847 * *
3848 * Purpose: process history data received from Zabbix active agent *
3849 * *
3850 * Parameters: sock - [IN] the connection socket *
3851 * jp - [IN] the JSON with history data *
3852 * ts - [IN] the connection timestamp *
3853 * info - [OUT] address of a pointer to the info string *
3854 * (should be freed by the caller) *
3855 * *
3856 * Return value: SUCCEED - processed successfully *
3857 * FAIL - an error occurred *
3858 * *
3859 ******************************************************************************/
process_agent_history_data(zbx_socket_t * sock,struct zbx_json_parse * jp,zbx_timespec_t * ts,char ** info)3860 int process_agent_history_data(zbx_socket_t *sock, struct zbx_json_parse *jp, zbx_timespec_t *ts, char **info)
3861 {
3862 zbx_host_rights_t rights = {0};
3863
3864 return process_client_history_data(sock, jp, ts, agent_item_validator, &rights, info);
3865 }
3866
3867 /******************************************************************************
3868 * *
3869 * Function: process_sender_history_data *
3870 * *
3871 * Purpose: process history data received from Zabbix sender *
3872 * *
3873 * Parameters: sock - [IN] the connection socket *
3874 * jp - [IN] the JSON with history data *
3875 * ts - [IN] the connection timestamp *
3876 * info - [OUT] address of a pointer to the info string *
3877 * (should be freed by the caller) *
3878 * *
3879 * Return value: SUCCEED - processed successfully *
3880 * FAIL - an error occurred *
3881 * *
3882 ******************************************************************************/
process_sender_history_data(zbx_socket_t * sock,struct zbx_json_parse * jp,zbx_timespec_t * ts,char ** info)3883 int process_sender_history_data(zbx_socket_t *sock, struct zbx_json_parse *jp, zbx_timespec_t *ts, char **info)
3884 {
3885 zbx_host_rights_t rights = {0};
3886
3887 return process_client_history_data(sock, jp, ts, sender_item_validator, &rights, info);
3888 }
3889
zbx_drule_ip_free(zbx_drule_ip_t * ip)3890 static void zbx_drule_ip_free(zbx_drule_ip_t *ip)
3891 {
3892 zbx_vector_ptr_clear_ext(&ip->services, zbx_ptr_free);
3893 zbx_vector_ptr_destroy(&ip->services);
3894 zbx_free(ip);
3895 }
3896
zbx_drule_free(zbx_drule_t * drule)3897 static void zbx_drule_free(zbx_drule_t *drule)
3898 {
3899 zbx_vector_ptr_clear_ext(&drule->ips, (zbx_clean_func_t)zbx_drule_ip_free);
3900 zbx_vector_ptr_destroy(&drule->ips);
3901 zbx_vector_uint64_destroy(&drule->dcheckids);
3902 zbx_free(drule);
3903 }
3904
3905 /******************************************************************************
3906 * *
3907 * Function: process_services *
3908 * *
3909 * Purpose: process services discovered on IP address *
3910 * *
3911 * Parameters: drule_ptr - [IN] discovery rule structure *
3912 * ip_discovered_ptr - [IN] vector of ip addresses *
3913 * *
3914 ******************************************************************************/
process_services(const zbx_vector_ptr_t * services,const char * ip,zbx_uint64_t druleid,zbx_vector_uint64_t * dcheckids,zbx_uint64_t unique_dcheckid,int * processed_num,int ip_idx)3915 static int process_services(const zbx_vector_ptr_t *services, const char *ip, zbx_uint64_t druleid,
3916 zbx_vector_uint64_t *dcheckids, zbx_uint64_t unique_dcheckid, int *processed_num, int ip_idx)
3917 {
3918 DB_DHOST dhost;
3919 zbx_service_t *service;
3920 int services_num, ret = FAIL, i, dchecks = 0;
3921 zbx_vector_ptr_t services_old;
3922 DB_DRULE drule = {.druleid = druleid, .unique_dcheckid = unique_dcheckid};
3923
3924 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
3925
3926 memset(&dhost, 0, sizeof(dhost));
3927
3928 zbx_vector_ptr_create(&services_old);
3929
3930 /* find host update */
3931 for (i = *processed_num; i < services->values_num; i++)
3932 {
3933 service = (zbx_service_t *)services->values[i];
3934
3935 zabbix_log(LOG_LEVEL_DEBUG, "%s() druleid:" ZBX_FS_UI64 " dcheckid:" ZBX_FS_UI64 " unique_dcheckid:"
3936 ZBX_FS_UI64 " time:'%s %s' ip:'%s' dns:'%s' port:%hu status:%d value:'%s'",
3937 __func__, drule.druleid, service->dcheckid, drule.unique_dcheckid,
3938 zbx_date2str(service->itemtime), zbx_time2str(service->itemtime), ip, service->dns,
3939 service->port, service->status, service->value);
3940
3941 if (0 == service->dcheckid)
3942 break;
3943
3944 dchecks++;
3945 }
3946
3947 /* stop processing current discovery rule and save proxy history until host update is available */
3948 if (i == services->values_num)
3949 {
3950 for (i = *processed_num; i < services->values_num; i++)
3951 {
3952 char *ip_esc, *dns_esc, *value_esc;
3953
3954 service = (zbx_service_t *)services->values[i];
3955
3956 ip_esc = DBdyn_escape_field("proxy_dhistory", "ip", ip);
3957 dns_esc = DBdyn_escape_field("proxy_dhistory", "dns", service->dns);
3958 value_esc = DBdyn_escape_field("proxy_dhistory", "value", service->value);
3959
3960 DBexecute("insert into proxy_dhistory (clock,druleid,ip,port,value,status,dcheckid,dns)"
3961 " values (%d," ZBX_FS_UI64 ",'%s',%d,'%s',%d," ZBX_FS_UI64 ",'%s')",
3962 (int)service->itemtime, drule.druleid, ip_esc, service->port,
3963 value_esc, service->status, service->dcheckid, dns_esc);
3964 zbx_free(value_esc);
3965 zbx_free(dns_esc);
3966 zbx_free(ip_esc);
3967 }
3968
3969 goto fail;
3970 }
3971
3972 services_num = i;
3973
3974 if (0 == *processed_num && 0 == ip_idx)
3975 {
3976 DB_RESULT result;
3977 DB_ROW row;
3978 zbx_uint64_t dcheckid;
3979
3980 result = DBselect(
3981 "select dcheckid,clock,port,value,status,dns,ip"
3982 " from proxy_dhistory"
3983 " where druleid=" ZBX_FS_UI64
3984 " order by id",
3985 drule.druleid);
3986
3987 for (i = 0; NULL != (row = DBfetch(result)); i++)
3988 {
3989 if (SUCCEED == DBis_null(row[0]))
3990 continue;
3991
3992 ZBX_STR2UINT64(dcheckid, row[0]);
3993
3994 if (0 == strcmp(ip, row[6]))
3995 {
3996 service = (zbx_service_t *)zbx_malloc(NULL, sizeof(zbx_service_t));
3997 service->dcheckid = dcheckid;
3998 service->itemtime = (time_t)atoi(row[1]);
3999 service->port = atoi(row[2]);
4000 zbx_strlcpy_utf8(service->value, row[3], MAX_DISCOVERED_VALUE_SIZE);
4001 service->status = atoi(row[4]);
4002 zbx_strlcpy(service->dns, row[5], INTERFACE_DNS_LEN_MAX);
4003 zbx_vector_ptr_append(&services_old, service);
4004 zbx_vector_uint64_append(dcheckids, service->dcheckid);
4005 dchecks++;
4006 }
4007 }
4008 DBfree_result(result);
4009
4010 if (0 != i)
4011 {
4012 DBexecute("delete from proxy_dhistory"
4013 " where druleid=" ZBX_FS_UI64,
4014 drule.druleid);
4015 }
4016
4017 zbx_vector_uint64_sort(dcheckids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
4018 zbx_vector_uint64_uniq(dcheckids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
4019
4020 if (SUCCEED != DBlock_druleid(drule.druleid))
4021 {
4022 zabbix_log(LOG_LEVEL_DEBUG, "druleid:" ZBX_FS_UI64 " does not exist", drule.druleid);
4023 goto fail;
4024 }
4025
4026 if (SUCCEED != DBlock_ids("dchecks", "dcheckid", dcheckids))
4027 {
4028 zabbix_log(LOG_LEVEL_DEBUG, "checks are not available for druleid:" ZBX_FS_UI64, drule.druleid);
4029 goto fail;
4030 }
4031 }
4032
4033 if (0 == dchecks)
4034 {
4035 zabbix_log(LOG_LEVEL_DEBUG, "cannot process host update without services");
4036 goto fail;
4037 }
4038
4039 for (i = 0; i < services_old.values_num; i++)
4040 {
4041 service = (zbx_service_t *)services_old.values[i];
4042
4043 if (FAIL == zbx_vector_uint64_bsearch(dcheckids, service->dcheckid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
4044 {
4045 zabbix_log(LOG_LEVEL_DEBUG, "dcheckid:" ZBX_FS_UI64 " does not exist", service->dcheckid);
4046 continue;
4047 }
4048
4049 discovery_update_service(&drule, service->dcheckid, &dhost, ip, service->dns, service->port,
4050 service->status, service->value, service->itemtime);
4051 }
4052
4053 for (;*processed_num < services_num; (*processed_num)++)
4054 {
4055 service = (zbx_service_t *)services->values[*processed_num];
4056
4057 if (FAIL == zbx_vector_uint64_bsearch(dcheckids, service->dcheckid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
4058 {
4059 zabbix_log(LOG_LEVEL_DEBUG, "dcheckid:" ZBX_FS_UI64 " does not exist", service->dcheckid);
4060 continue;
4061 }
4062
4063 discovery_update_service(&drule, service->dcheckid, &dhost, ip, service->dns, service->port,
4064 service->status, service->value, service->itemtime);
4065 }
4066
4067 service = (zbx_service_t *)services->values[(*processed_num)++];
4068 discovery_update_host(&dhost, service->status, service->itemtime);
4069
4070 ret = SUCCEED;
4071 fail:
4072 zbx_vector_ptr_clear_ext(&services_old, zbx_ptr_free);
4073 zbx_vector_ptr_destroy(&services_old);
4074
4075 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4076
4077 return ret;
4078 }
4079
4080 /******************************************************************************
4081 * *
4082 * Function: process_discovery_data_contents *
4083 * *
4084 * Purpose: parse discovery data contents and process it *
4085 * *
4086 * Parameters: jp_data - [IN] JSON with discovery data *
4087 * error - [OUT] address of a pointer to the info *
4088 * string (should be freed by the caller) *
4089 * *
4090 * Return value: SUCCEED - processed successfully *
4091 * FAIL - an error occurred *
4092 * *
4093 ******************************************************************************/
process_discovery_data_contents(struct zbx_json_parse * jp_data,char ** error)4094 static int process_discovery_data_contents(struct zbx_json_parse *jp_data, char **error)
4095 {
4096 DB_RESULT result;
4097 DB_ROW row;
4098 zbx_uint64_t dcheckid, druleid;
4099 struct zbx_json_parse jp_row;
4100 int status, ret = SUCCEED, i, j;
4101 unsigned short port;
4102 const char *p = NULL;
4103 char ip[INTERFACE_IP_LEN_MAX],
4104 tmp[MAX_STRING_LEN], *value = NULL, dns[INTERFACE_DNS_LEN_MAX];
4105 time_t itemtime;
4106 size_t value_alloc = MAX_DISCOVERED_VALUE_SIZE;
4107 zbx_vector_ptr_t drules;
4108 zbx_drule_t *drule;
4109 zbx_drule_ip_t *drule_ip;
4110 zbx_service_t *service;
4111
4112 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4113
4114 value = (char *)zbx_malloc(value, value_alloc);
4115
4116 zbx_vector_ptr_create(&drules);
4117
4118 while (NULL != (p = zbx_json_next(jp_data, p)))
4119 {
4120 if (FAIL == zbx_json_brackets_open(p, &jp_row))
4121 goto json_parse_error;
4122
4123 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp), NULL))
4124 goto json_parse_error;
4125
4126 itemtime = atoi(tmp);
4127
4128 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DRULE, tmp, sizeof(tmp), NULL))
4129 goto json_parse_error;
4130
4131 ZBX_STR2UINT64(druleid, tmp);
4132
4133 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DCHECK, tmp, sizeof(tmp), NULL))
4134 goto json_parse_error;
4135
4136 if ('\0' != *tmp)
4137 ZBX_STR2UINT64(dcheckid, tmp);
4138 else
4139 dcheckid = 0;
4140
4141 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_IP, ip, sizeof(ip), NULL))
4142 goto json_parse_error;
4143
4144 if (SUCCEED != is_ip(ip))
4145 {
4146 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid IP address", __func__, ip);
4147 continue;
4148 }
4149
4150 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_PORT, tmp, sizeof(tmp), NULL))
4151 {
4152 port = 0;
4153 }
4154 else if (FAIL == is_ushort(tmp, &port))
4155 {
4156 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid port", __func__, tmp);
4157 continue;
4158 }
4159
4160 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_VALUE, &value, &value_alloc, NULL))
4161 *value = '\0';
4162
4163 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DNS, dns, sizeof(dns), NULL))
4164 {
4165 *dns = '\0';
4166 }
4167 else if ('\0' != *dns && FAIL == zbx_validate_hostname(dns))
4168 {
4169 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid hostname", __func__, dns);
4170 continue;
4171 }
4172
4173 if (SUCCEED == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_STATUS, tmp, sizeof(tmp), NULL))
4174 status = atoi(tmp);
4175 else
4176 status = 0;
4177
4178 if (FAIL == (i = zbx_vector_ptr_search(&drules, &druleid, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC)))
4179 {
4180 drule = (zbx_drule_t *)zbx_malloc(NULL, sizeof(zbx_drule_t));
4181 drule->druleid = druleid;
4182 zbx_vector_ptr_create(&drule->ips);
4183 zbx_vector_uint64_create(&drule->dcheckids);
4184 zbx_vector_ptr_append(&drules, drule);
4185 }
4186 else
4187 drule = drules.values[i];
4188
4189 if (FAIL == (i = zbx_vector_ptr_search(&drule->ips, ip, ZBX_DEFAULT_STR_COMPARE_FUNC)))
4190 {
4191 drule_ip = (zbx_drule_ip_t *)zbx_malloc(NULL, sizeof(zbx_drule_ip_t));
4192 zbx_strlcpy(drule_ip->ip, ip, INTERFACE_IP_LEN_MAX);
4193 zbx_vector_ptr_create(&drule_ip->services);
4194 zbx_vector_ptr_append(&drule->ips, drule_ip);
4195 }
4196 else
4197 drule_ip = drule->ips.values[i];
4198
4199 service = (zbx_service_t *)zbx_malloc(NULL, sizeof(zbx_service_t));
4200 if (0 != (service->dcheckid = dcheckid))
4201 zbx_vector_uint64_append(&drule->dcheckids, service->dcheckid);
4202 service->port = port;
4203 service->status = status;
4204 zbx_strlcpy_utf8(service->value, value, MAX_DISCOVERED_VALUE_SIZE);
4205 zbx_strlcpy(service->dns, dns, INTERFACE_DNS_LEN_MAX);
4206 service->itemtime = itemtime;
4207 zbx_vector_ptr_append(&drule_ip->services, service);
4208
4209 continue;
4210 json_parse_error:
4211 *error = zbx_strdup(*error, zbx_json_strerror());
4212 ret = FAIL;
4213 goto json_parse_return;
4214 }
4215
4216 for (i = 0; i < drules.values_num; i++)
4217 {
4218 zbx_uint64_t unique_dcheckid;
4219 int ret2 = SUCCEED;
4220
4221 drule = (zbx_drule_t *)drules.values[i];
4222
4223 DBbegin();
4224 result = DBselect(
4225 "select dcheckid"
4226 " from dchecks"
4227 " where druleid=" ZBX_FS_UI64
4228 " and uniq=1",
4229 drule->druleid);
4230
4231 if (NULL != (row = DBfetch(result)))
4232 ZBX_STR2UINT64(unique_dcheckid, row[0]);
4233 else
4234 unique_dcheckid = 0;
4235 DBfree_result(result);
4236 for (j = 0; j < drule->ips.values_num && SUCCEED == ret2; j++)
4237 {
4238 int processed_num = 0;
4239
4240 drule_ip = (zbx_drule_ip_t *)drule->ips.values[j];
4241
4242 while (processed_num != drule_ip->services.values_num)
4243 {
4244 if (FAIL == (ret2 = process_services(&drule_ip->services, drule_ip->ip, drule->druleid,
4245 &drule->dcheckids, unique_dcheckid, &processed_num, j)))
4246 {
4247 break;
4248 }
4249 }
4250 }
4251
4252 zbx_process_events(NULL, NULL);
4253 zbx_clean_events();
4254 DBcommit();
4255 }
4256 json_parse_return:
4257 zbx_free(value);
4258
4259 zbx_vector_ptr_clear_ext(&drules, (zbx_clean_func_t)zbx_drule_free);
4260 zbx_vector_ptr_destroy(&drules);
4261
4262 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4263
4264 return ret;
4265 }
4266
4267 /******************************************************************************
4268 * *
4269 * Function: process_autoregistration_contents *
4270 * *
4271 * Purpose: parse autoregistration data contents and process it *
4272 * *
4273 * Parameters: jp_data - [IN] JSON with autoregistration data *
4274 * proxy_hostid - [IN] proxy identifier from database *
4275 * error - [OUT] address of a pointer to the info *
4276 * string (should be freed by the caller) *
4277 * *
4278 * Return value: SUCCEED - processed successfully *
4279 * FAIL - an error occurred *
4280 * *
4281 ******************************************************************************/
process_autoregistration_contents(struct zbx_json_parse * jp_data,zbx_uint64_t proxy_hostid,char ** error)4282 static int process_autoregistration_contents(struct zbx_json_parse *jp_data, zbx_uint64_t proxy_hostid,
4283 char **error)
4284 {
4285 struct zbx_json_parse jp_row;
4286 int ret = SUCCEED;
4287 const char *p = NULL;
4288 time_t itemtime;
4289 char host[HOST_HOST_LEN_MAX], ip[INTERFACE_IP_LEN_MAX], dns[INTERFACE_DNS_LEN_MAX],
4290 tmp[MAX_STRING_LEN], *host_metadata = NULL;
4291 unsigned short port;
4292 size_t host_metadata_alloc = 1; /* for at least NUL-termination char */
4293 zbx_vector_ptr_t autoreg_hosts;
4294 zbx_conn_flags_t flags = ZBX_CONN_DEFAULT;
4295
4296 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4297
4298 zbx_vector_ptr_create(&autoreg_hosts);
4299 host_metadata = (char *)zbx_malloc(host_metadata, host_metadata_alloc);
4300
4301 while (NULL != (p = zbx_json_next(jp_data, p)))
4302 {
4303 unsigned int connection_type;
4304
4305 if (FAIL == (ret = zbx_json_brackets_open(p, &jp_row)))
4306 break;
4307
4308 if (FAIL == (ret = zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_CLOCK, tmp, sizeof(tmp), NULL)))
4309 break;
4310
4311 itemtime = atoi(tmp);
4312
4313 if (FAIL == (ret = zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_HOST, host, sizeof(host), NULL)))
4314 break;
4315
4316 if (FAIL == zbx_check_hostname(host, NULL))
4317 {
4318 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid Zabbix host name", __func__, host);
4319 continue;
4320 }
4321
4322 if (FAIL == zbx_json_value_by_name_dyn(&jp_row, ZBX_PROTO_TAG_HOST_METADATA,
4323 &host_metadata, &host_metadata_alloc, NULL))
4324 {
4325 *host_metadata = '\0';
4326 }
4327
4328 if (FAIL != zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_FLAGS, tmp, sizeof(tmp), NULL))
4329 {
4330 int flags_int;
4331
4332 flags_int = atoi(tmp);
4333
4334 switch (flags_int)
4335 {
4336 case ZBX_CONN_DEFAULT:
4337 case ZBX_CONN_IP:
4338 case ZBX_CONN_DNS:
4339 flags = (zbx_conn_flags_t)flags_int;
4340 break;
4341 default:
4342 flags = ZBX_CONN_DEFAULT;
4343 zabbix_log(LOG_LEVEL_WARNING, "wrong flags value: %d for host \"%s\":",
4344 flags_int, host);
4345 }
4346 }
4347
4348 if (FAIL == (ret = zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_IP, ip, sizeof(ip), NULL)))
4349 {
4350 if (ZBX_CONN_DNS == flags)
4351 {
4352 *ip = '\0';
4353 ret = SUCCEED;
4354 }
4355 else
4356 break;
4357 }
4358 else if (SUCCEED != is_ip(ip))
4359 {
4360 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid IP address", __func__, ip);
4361 continue;
4362 }
4363
4364 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_DNS, dns, sizeof(dns), NULL))
4365 {
4366 *dns = '\0';
4367 }
4368 else if ('\0' != *dns && FAIL == zbx_validate_hostname(dns))
4369 {
4370 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid hostname", __func__, dns);
4371 continue;
4372 }
4373
4374 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_PORT, tmp, sizeof(tmp), NULL))
4375 {
4376 port = ZBX_DEFAULT_AGENT_PORT;
4377 }
4378 else if (FAIL == is_ushort(tmp, &port))
4379 {
4380 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid port", __func__, tmp);
4381 continue;
4382 }
4383
4384 if (FAIL == zbx_json_value_by_name(&jp_row, ZBX_PROTO_TAG_TLS_ACCEPTED, tmp, sizeof(tmp), NULL))
4385 {
4386 connection_type = ZBX_TCP_SEC_UNENCRYPTED;
4387 }
4388 else if (FAIL == is_uint32(tmp, &connection_type) || (ZBX_TCP_SEC_UNENCRYPTED != connection_type &&
4389 ZBX_TCP_SEC_TLS_PSK != connection_type && ZBX_TCP_SEC_TLS_CERT != connection_type))
4390 {
4391 zabbix_log(LOG_LEVEL_WARNING, "%s(): \"%s\" is not a valid value for \""
4392 ZBX_PROTO_TAG_TLS_ACCEPTED "\"", __func__, tmp);
4393 continue;
4394 }
4395
4396 DBregister_host_prepare(&autoreg_hosts, host, ip, dns, port, connection_type, host_metadata, flags,
4397 itemtime);
4398 }
4399
4400 if (0 != autoreg_hosts.values_num)
4401 {
4402 DBbegin();
4403 DBregister_host_flush(&autoreg_hosts, proxy_hostid);
4404 DBcommit();
4405 }
4406
4407 zbx_free(host_metadata);
4408 DBregister_host_clean(&autoreg_hosts);
4409 zbx_vector_ptr_destroy(&autoreg_hosts);
4410
4411 if (SUCCEED != ret)
4412 *error = zbx_strdup(*error, zbx_json_strerror());
4413
4414 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4415
4416 return ret;
4417 }
4418
4419 /******************************************************************************
4420 * *
4421 * Function: proxy_get_history_count *
4422 * *
4423 * Purpose: get the number of values waiting to be sent to the sever *
4424 * *
4425 * Return value: the number of history values *
4426 * *
4427 ******************************************************************************/
proxy_get_history_count(void)4428 int proxy_get_history_count(void)
4429 {
4430 DB_RESULT result;
4431 DB_ROW row;
4432 zbx_uint64_t id;
4433 int count = 0;
4434
4435 proxy_get_lastid("proxy_history", "history_lastid", &id);
4436
4437 result = DBselect(
4438 "select count(*)"
4439 " from proxy_history"
4440 " where id>" ZBX_FS_UI64,
4441 id);
4442
4443 if (NULL != (row = DBfetch(result)))
4444 count = atoi(row[0]);
4445
4446 DBfree_result(result);
4447
4448 return count;
4449 }
4450
4451 /******************************************************************************
4452 * *
4453 * Function: zbx_get_proxy_protocol_version *
4454 * *
4455 * Purpose: extracts protocol version from json data *
4456 * *
4457 * Parameters: *
4458 * jp - [IN] JSON with the proxy version *
4459 * *
4460 * Return value: The protocol version. *
4461 * SUCCEED - proxy version was successfully extracted *
4462 * FAIL - otherwise *
4463 * *
4464 ******************************************************************************/
zbx_get_proxy_protocol_version(struct zbx_json_parse * jp)4465 int zbx_get_proxy_protocol_version(struct zbx_json_parse *jp)
4466 {
4467 char value[MAX_STRING_LEN];
4468 int version;
4469
4470 if (NULL != jp && SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_VERSION, value, sizeof(value), NULL) &&
4471 -1 != (version = zbx_get_component_version(value)))
4472 {
4473 return version;
4474 }
4475 else
4476 return ZBX_COMPONENT_VERSION(3, 2);
4477 }
4478
4479 /******************************************************************************
4480 * *
4481 * Function: process_tasks_contents *
4482 * *
4483 * Purpose: parse tasks contents and saves the received tasks *
4484 * *
4485 * Parameters: jp_tasks - [IN] JSON with tasks data *
4486 * *
4487 ******************************************************************************/
process_tasks_contents(struct zbx_json_parse * jp_tasks)4488 static void process_tasks_contents(struct zbx_json_parse *jp_tasks)
4489 {
4490 zbx_vector_ptr_t tasks;
4491
4492 zbx_vector_ptr_create(&tasks);
4493
4494 zbx_tm_json_deserialize_tasks(jp_tasks, &tasks);
4495
4496 DBbegin();
4497 zbx_tm_save_tasks(&tasks);
4498 DBcommit();
4499
4500 zbx_vector_ptr_clear_ext(&tasks, (zbx_clean_func_t)zbx_tm_task_free);
4501 zbx_vector_ptr_destroy(&tasks);
4502 }
4503
4504 /******************************************************************************
4505 * *
4506 * Function: zbx_strcatnl_alloc *
4507 * *
4508 * Purpose: appends text to the string on a new line *
4509 * *
4510 ******************************************************************************/
zbx_strcatnl_alloc(char ** info,size_t * info_alloc,size_t * info_offset,const char * text)4511 static void zbx_strcatnl_alloc(char **info, size_t *info_alloc, size_t *info_offset, const char *text)
4512 {
4513 if (0 != *info_offset)
4514 zbx_chrcpy_alloc(info, info_alloc, info_offset, '\n');
4515
4516 zbx_strcpy_alloc(info, info_alloc, info_offset, text);
4517 }
4518
4519 /******************************************************************************
4520 * *
4521 * Function: check_proxy_nodata *
4522 * *
4523 * Purpose: detect lost connection with proxy and calculate suppression *
4524 * window if possible *
4525 * *
4526 * Parameters: ts - [IN] timestamp when the proxy connection was *
4527 * established *
4528 * proxy_staus - [IN] - active or passive proxy *
4529 * diff - [IN/OUT] the properties to update *
4530 * *
4531 ******************************************************************************/
check_proxy_nodata(zbx_timespec_t * ts,unsigned char proxy_status,zbx_proxy_diff_t * diff)4532 static void check_proxy_nodata(zbx_timespec_t *ts, unsigned char proxy_status, zbx_proxy_diff_t *diff)
4533 {
4534 int delay;
4535
4536 if (0 != (diff->nodata_win.flags & ZBX_PROXY_SUPPRESS_ACTIVE))
4537 {
4538 diff->nodata_win.values_num = 0; /* reset counter of new suppress values received from proxy */
4539 return; /* only for current packet */
4540 }
4541
4542 delay = ts->sec - diff->lastaccess;
4543
4544 if ((HOST_STATUS_PROXY_PASSIVE == proxy_status &&
4545 (2 * CONFIG_PROXYDATA_FREQUENCY) < delay && NET_DELAY_MAX < delay) ||
4546 (HOST_STATUS_PROXY_ACTIVE == proxy_status && NET_DELAY_MAX < delay))
4547 {
4548 diff->nodata_win.values_num = 0;
4549 diff->nodata_win.period_end = ts->sec;
4550 diff->flags |= ZBX_FLAGS_PROXY_DIFF_UPDATE_SUPPRESS_WIN;
4551 diff->nodata_win.flags |= ZBX_PROXY_SUPPRESS_ENABLE;
4552 }
4553 }
4554
4555 /******************************************************************************
4556 * *
4557 * Function: check_proxy_nodata_empty *
4558 * *
4559 * Purpose: detect lack of data during lost connectivity *
4560 * *
4561 * Parameters: ts - [IN] timestamp when the proxy connection was *
4562 * established *
4563 * proxy_staus - [IN] - active or passive proxy *
4564 * diff - [IN/OUT] the properties to update *
4565 * *
4566 ******************************************************************************/
check_proxy_nodata_empty(zbx_timespec_t * ts,unsigned char proxy_status,zbx_proxy_diff_t * diff)4567 static void check_proxy_nodata_empty(zbx_timespec_t *ts, unsigned char proxy_status, zbx_proxy_diff_t *diff)
4568 {
4569 int delay_empty;
4570
4571 if (0 != (diff->nodata_win.flags & ZBX_PROXY_SUPPRESS_EMPTY) && 0 != diff->nodata_win.values_num)
4572 diff->nodata_win.flags &= (~ZBX_PROXY_SUPPRESS_EMPTY);
4573
4574 if (0 == (diff->nodata_win.flags & ZBX_PROXY_SUPPRESS_EMPTY) || 0 != diff->nodata_win.values_num)
4575 return;
4576
4577 delay_empty = ts->sec - diff->nodata_win.period_end;
4578
4579 if (HOST_STATUS_PROXY_PASSIVE == proxy_status ||
4580 (HOST_STATUS_PROXY_ACTIVE == proxy_status && NET_DELAY_MAX < delay_empty))
4581 {
4582 diff->nodata_win.period_end = 0;
4583 diff->nodata_win.flags = ZBX_PROXY_SUPPRESS_DISABLE;
4584 }
4585 }
4586
4587 /******************************************************************************
4588 * *
4589 * Function: process_proxy_data *
4590 * *
4591 * Purpose: process 'proxy data' request *
4592 * *
4593 * Parameters: proxy - [IN] the source proxy *
4594 * jp - [IN] JSON with proxy data *
4595 * proxy_hostid - [IN] proxy identifier from database *
4596 * ts - [IN] timestamp when the proxy connection was *
4597 * established *
4598 * proxy_status - [IN] active or passive proxy mode *
4599 * more - [OUT] available data flag *
4600 * error - [OUT] address of a pointer to the info string *
4601 * (should be freed by the caller) *
4602 * *
4603 * Return value: SUCCEED - processed successfully *
4604 * FAIL - an error occurred *
4605 * *
4606 ******************************************************************************/
process_proxy_data(const DC_PROXY * proxy,struct zbx_json_parse * jp,zbx_timespec_t * ts,unsigned char proxy_status,int * more,char ** error)4607 int process_proxy_data(const DC_PROXY *proxy, struct zbx_json_parse *jp, zbx_timespec_t *ts,
4608 unsigned char proxy_status, int *more, char **error)
4609 {
4610 struct zbx_json_parse jp_data;
4611 int ret = SUCCEED, flags_old;
4612 char *error_step = NULL, value[MAX_STRING_LEN];
4613 size_t error_alloc = 0, error_offset = 0;
4614 zbx_proxy_diff_t proxy_diff;
4615
4616
4617 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4618
4619 proxy_diff.flags = ZBX_FLAGS_PROXY_DIFF_UNSET;
4620 proxy_diff.hostid = proxy->hostid;
4621
4622 if (SUCCEED != (ret = DCget_proxy_nodata_win(proxy_diff.hostid, &proxy_diff.nodata_win,
4623 &proxy_diff.lastaccess)))
4624 {
4625 zabbix_log(LOG_LEVEL_WARNING, "cannot get proxy communication delay");
4626 ret = FAIL;
4627 goto out;
4628 }
4629
4630 if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_MORE, value, sizeof(value), NULL))
4631 proxy_diff.more_data = atoi(value);
4632 else
4633 proxy_diff.more_data = ZBX_PROXY_DATA_DONE;
4634
4635 if (NULL != more)
4636 *more = proxy_diff.more_data;
4637
4638 if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_PROXY_DELAY, value, sizeof(value), NULL))
4639 proxy_diff.proxy_delay = atoi(value);
4640 else
4641 proxy_diff.proxy_delay = 0;
4642
4643 proxy_diff.flags |= ZBX_FLAGS_PROXY_DIFF_UPDATE_PROXYDELAY;
4644 flags_old = proxy_diff.nodata_win.flags;
4645 check_proxy_nodata(ts, proxy_status, &proxy_diff); /* first packet can be empty for active proxy */
4646
4647 zabbix_log(LOG_LEVEL_DEBUG, "%s() flag_win:%d/%d flag:%d proxy_status:%d period_end:%d delay:%d"
4648 " timestamp:%d lastaccess:%d proxy_delay:%d more:%d", __func__, proxy_diff.nodata_win.flags,
4649 flags_old, (int)proxy_diff.flags, proxy_status, proxy_diff.nodata_win.period_end,
4650 ts->sec - proxy_diff.lastaccess, ts->sec, proxy_diff.lastaccess, proxy_diff.proxy_delay,
4651 proxy_diff.more_data);
4652
4653 if (ZBX_FLAGS_PROXY_DIFF_UNSET != proxy_diff.flags)
4654 zbx_dc_update_proxy(&proxy_diff);
4655
4656 if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_HOST_AVAILABILITY, &jp_data))
4657 {
4658 if (SUCCEED != (ret = process_host_availability_contents(&jp_data, &error_step)))
4659 zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step);
4660 }
4661
4662 flags_old = proxy_diff.nodata_win.flags;
4663
4664 if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_HISTORY_DATA, &jp_data))
4665 {
4666 zbx_data_session_t *session = NULL;
4667
4668 if (SUCCEED == zbx_json_value_by_name(jp, ZBX_PROTO_TAG_SESSION, value, sizeof(value), NULL))
4669 {
4670 size_t token_len;
4671
4672 if (ZBX_DATA_SESSION_TOKEN_SIZE != (token_len = strlen(value)))
4673 {
4674 *error = zbx_dsprintf(*error, "invalid session token length %d", (int)token_len);
4675 ret = FAIL;
4676 goto out;
4677 }
4678
4679 session = zbx_dc_get_or_create_data_session(proxy->hostid, value);
4680 }
4681
4682 if (SUCCEED != (ret = process_history_data_by_itemids(NULL, proxy_item_validator,
4683 (void *)&proxy->hostid, &jp_data, session, &proxy_diff.nodata_win, &error_step,
4684 ZBX_ITEM_GET_PROCESS)))
4685 {
4686 zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step);
4687 }
4688 }
4689
4690 if (0 != (proxy_diff.nodata_win.flags & ZBX_PROXY_SUPPRESS_ACTIVE))
4691 {
4692 check_proxy_nodata_empty(ts, proxy_status, &proxy_diff);
4693
4694 if (0 < proxy_diff.nodata_win.values_num || flags_old != proxy_diff.nodata_win.flags)
4695 proxy_diff.flags |= ZBX_FLAGS_PROXY_DIFF_UPDATE_SUPPRESS_WIN;
4696
4697 zabbix_log(LOG_LEVEL_DEBUG, "Result of %s() flag_win:%d/%d flag:%d values_num:%d",
4698 __func__, proxy_diff.nodata_win.flags, flags_old, (int)proxy_diff.flags,
4699 proxy_diff.nodata_win.values_num);
4700 }
4701
4702 if (ZBX_FLAGS_PROXY_DIFF_UNSET != proxy_diff.flags)
4703 zbx_dc_update_proxy(&proxy_diff);
4704
4705 if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_DISCOVERY_DATA, &jp_data))
4706 {
4707 if (SUCCEED != (ret = process_discovery_data_contents(&jp_data, &error_step)))
4708 zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step);
4709 }
4710
4711 if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_AUTOREGISTRATION, &jp_data))
4712 {
4713 if (SUCCEED != (ret = process_autoregistration_contents(&jp_data, proxy->hostid, &error_step)))
4714 zbx_strcatnl_alloc(error, &error_alloc, &error_offset, error_step);
4715 }
4716
4717 if (SUCCEED == zbx_json_brackets_by_name(jp, ZBX_PROTO_TAG_TASKS, &jp_data))
4718 process_tasks_contents(&jp_data);
4719
4720 out:
4721 zbx_free(error_step);
4722 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4723
4724 return ret;
4725 }
4726
4727 /******************************************************************************
4728 * *
4729 * Function: zbx_db_flush_proxy_lastaccess *
4730 * *
4731 * Purpose: flushes lastaccess changes for proxies every *
4732 * ZBX_PROXY_LASTACCESS_UPDATE_FREQUENCY seconds *
4733 * *
4734 ******************************************************************************/
zbx_db_flush_proxy_lastaccess(void)4735 static void zbx_db_flush_proxy_lastaccess(void)
4736 {
4737 zbx_vector_uint64_pair_t lastaccess;
4738
4739 zbx_vector_uint64_pair_create(&lastaccess);
4740
4741 zbx_dc_get_proxy_lastaccess(&lastaccess);
4742
4743 if (0 != lastaccess.values_num)
4744 {
4745 char *sql;
4746 size_t sql_alloc = 256, sql_offset = 0;
4747 int i;
4748
4749 sql = (char *)zbx_malloc(NULL, sql_alloc);
4750
4751 DBbegin();
4752 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
4753
4754 for (i = 0; i < lastaccess.values_num; i++)
4755 {
4756 zbx_uint64_pair_t *pair = &lastaccess.values[i];
4757
4758 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "update hosts"
4759 " set lastaccess=%d"
4760 " where hostid=" ZBX_FS_UI64 ";\n",
4761 (int)pair->second, pair->first);
4762
4763 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
4764 }
4765
4766 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
4767
4768 if (16 < sql_offset) /* in ORACLE always present begin..end; */
4769 DBexecute("%s", sql);
4770
4771 DBcommit();
4772
4773 zbx_free(sql);
4774 }
4775
4776 zbx_vector_uint64_pair_destroy(&lastaccess);
4777 }
4778
4779 /******************************************************************************
4780 * *
4781 * Function: zbx_update_proxy_data *
4782 * *
4783 * Purpose: updates proxy runtime properties in cache and database. *
4784 * *
4785 * Parameters: proxy - [IN/OUT] the proxy *
4786 * version - [IN] the proxy version *
4787 * lastaccess - [IN] the last proxy access time *
4788 * compress - [IN] 1 if proxy is using data compression, *
4789 * 0 otherwise *
4790 * flags_add - [IN] additional flags for update proxy *
4791 * *
4792 * Comments: The proxy parameter properties are also updated. *
4793 * *
4794 ******************************************************************************/
zbx_update_proxy_data(DC_PROXY * proxy,int version,int lastaccess,int compress,zbx_uint64_t flags_add)4795 void zbx_update_proxy_data(DC_PROXY *proxy, int version, int lastaccess, int compress, zbx_uint64_t flags_add)
4796 {
4797 zbx_proxy_diff_t diff;
4798
4799 diff.hostid = proxy->hostid;
4800 diff.flags = ZBX_FLAGS_PROXY_DIFF_UPDATE | flags_add;
4801 diff.version = version;
4802 diff.lastaccess = lastaccess;
4803 diff.compress = compress;
4804
4805 zbx_dc_update_proxy(&diff);
4806
4807 if (0 != (diff.flags & ZBX_FLAGS_PROXY_DIFF_UPDATE_VERSION) && 0 != proxy->version)
4808 {
4809 zabbix_log(LOG_LEVEL_DEBUG, "proxy \"%s\" protocol version updated from %d.%d to %d.%d", proxy->host,
4810 ZBX_COMPONENT_VERSION_MAJOR(proxy->version),
4811 ZBX_COMPONENT_VERSION_MINOR(proxy->version),
4812 ZBX_COMPONENT_VERSION_MAJOR(diff.version),
4813 ZBX_COMPONENT_VERSION_MINOR(diff.version));
4814 }
4815
4816 proxy->version = version;
4817 proxy->auto_compress = compress;
4818 proxy->lastaccess = lastaccess;
4819
4820 if (0 != (diff.flags & ZBX_FLAGS_PROXY_DIFF_UPDATE_COMPRESS))
4821 DBexecute("update hosts set auto_compress=%d where hostid=" ZBX_FS_UI64, diff.compress, diff.hostid);
4822
4823 zbx_db_flush_proxy_lastaccess();
4824 }
4825 /******************************************************************************
4826 * *
4827 * Function: zbx_update_proxy_lasterror *
4828 * *
4829 * Purpose: flushes last_version_error_time changes runtime *
4830 * variable for proxies structures *
4831 * *
4832 ******************************************************************************/
zbx_update_proxy_lasterror(DC_PROXY * proxy)4833 static void zbx_update_proxy_lasterror(DC_PROXY *proxy)
4834 {
4835 zbx_proxy_diff_t diff;
4836
4837 diff.hostid = proxy->hostid;
4838 diff.flags = ZBX_FLAGS_PROXY_DIFF_UPDATE_LASTERROR;
4839 diff.lastaccess = time(NULL);
4840 diff.last_version_error_time = proxy->last_version_error_time;
4841
4842 zbx_dc_update_proxy(&diff);
4843 }
4844 /******************************************************************************
4845 * *
4846 * Function: zbx_check_protocol_version *
4847 * *
4848 * Purpose: check server and proxy versions and compatibility rules *
4849 * *
4850 * Parameters: *
4851 * proxy - [IN] the source proxy *
4852 * version - [IN] the version of proxy *
4853 * *
4854 * Return value: *
4855 * SUCCEED - no compatibility issue *
4856 * FAIL - compatibility check fault *
4857 * *
4858 ******************************************************************************/
zbx_check_protocol_version(DC_PROXY * proxy,int version)4859 int zbx_check_protocol_version(DC_PROXY *proxy, int version)
4860 {
4861 int server_version;
4862 int ret = SUCCEED;
4863 int now;
4864 int print_log = 0;
4865
4866 /* warn if another proxy version is used and proceed with compatibility rules*/
4867 if ((server_version = ZBX_COMPONENT_VERSION(ZABBIX_VERSION_MAJOR, ZABBIX_VERSION_MINOR)) != version)
4868 {
4869 now = (int)time(NULL);
4870
4871 if (proxy->last_version_error_time <= now)
4872 {
4873 print_log = 1;
4874 proxy->last_version_error_time = now + 5 * SEC_PER_MIN;
4875 zbx_update_proxy_lasterror(proxy);
4876 }
4877
4878 /* don't accept pre 4.2 data */
4879 if (ZBX_COMPONENT_VERSION(4, 2) > version)
4880 {
4881 if (1 == print_log)
4882 {
4883 zabbix_log(LOG_LEVEL_WARNING, "cannot process proxy \"%s\":"
4884 " protocol version %d.%d is not supported anymore",
4885 proxy->host, ZBX_COMPONENT_VERSION_MAJOR(version),
4886 ZBX_COMPONENT_VERSION_MINOR(version));
4887 }
4888 ret = FAIL;
4889 goto out;
4890 }
4891
4892 if (1 == print_log)
4893 {
4894 zabbix_log(LOG_LEVEL_WARNING, "proxy \"%s\" protocol version %d.%d differs from server version"
4895 " %d.%d", proxy->host, ZBX_COMPONENT_VERSION_MAJOR(version),
4896 ZBX_COMPONENT_VERSION_MINOR(version),
4897 ZABBIX_VERSION_MAJOR, ZABBIX_VERSION_MINOR);
4898 }
4899
4900 if (version > server_version)
4901 {
4902 if (1 == print_log)
4903 zabbix_log(LOG_LEVEL_WARNING, "cannot accept proxy data");
4904 ret = FAIL;
4905 }
4906
4907 }
4908 out:
4909 return ret;
4910 }
4911