1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "daemon.h"
22 #include "db.h"
23 #include "log.h"
24 #include "zbxipcservice.h"
25 #include "zbxself.h"
26 #include "dbcache.h"
27 #include "proxy.h"
28 #include "../events.h"
29
30 #include "lld_worker.h"
31 #include "lld_protocol.h"
32
33 extern unsigned char process_type, program_type;
34 extern int server_num, process_num;
35
36 /******************************************************************************
37 * *
38 * Function: lld_register_worker *
39 * *
40 * Purpose: registers lld worker with lld manager *
41 * *
42 * Parameters: socket - [IN] the connections socket *
43 * *
44 ******************************************************************************/
lld_register_worker(zbx_ipc_socket_t * socket)45 static void lld_register_worker(zbx_ipc_socket_t *socket)
46 {
47 pid_t ppid;
48
49 ppid = getppid();
50
51 zbx_ipc_socket_write(socket, ZBX_IPC_LLD_REGISTER, (unsigned char *)&ppid, sizeof(ppid));
52 }
53
54 /******************************************************************************
55 * *
56 * Function: lld_process_task *
57 * *
58 * Purpose: processes lld task and updates rule state/error in configuration *
59 * cache and database *
60 * *
61 * Parameters: message - [IN] the message with LLD request *
62 * *
63 ******************************************************************************/
lld_process_task(zbx_ipc_message_t * message)64 static void lld_process_task(zbx_ipc_message_t *message)
65 {
66 zbx_uint64_t itemid, hostid, lastlogsize;
67 char *value, *error;
68 zbx_timespec_t ts;
69 zbx_item_diff_t diff;
70 DC_ITEM item;
71 int errcode, mtime;
72 unsigned char state, meta;
73
74 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
75
76 zbx_lld_deserialize_item_value(message->data, &itemid, &hostid, &value, &ts, &meta, &lastlogsize, &mtime, &error);
77
78 DCconfig_get_items_by_itemids(&item, &itemid, &errcode, 1);
79 if (SUCCEED != errcode)
80 goto out;
81
82 zabbix_log(LOG_LEVEL_DEBUG, "processing discovery rule:" ZBX_FS_UI64, itemid);
83
84 diff.flags = ZBX_FLAGS_ITEM_DIFF_UNSET;
85
86 if (NULL != error || NULL != value)
87 {
88 if (NULL == error && SUCCEED == lld_process_discovery_rule(itemid, value, &error))
89 state = ITEM_STATE_NORMAL;
90 else
91 state = ITEM_STATE_NOTSUPPORTED;
92
93 if (state != item.state)
94 {
95 diff.state = state;
96 diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
97
98 if (ITEM_STATE_NORMAL == state)
99 {
100 zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became supported",
101 item.host.host, item.key_orig);
102
103 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts,
104 ITEM_STATE_NORMAL, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL,
105 NULL);
106 }
107 else
108 {
109 zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became not supported: %s",
110 item.host.host, item.key_orig, error);
111
112 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts,
113 ITEM_STATE_NOTSUPPORTED, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0,
114 NULL, error);
115 }
116
117 zbx_process_events(NULL, NULL);
118 zbx_clean_events();
119 }
120
121 /* with successful LLD processing LLD error will be set to empty string */
122 if (NULL != error && 0 != strcmp(error, item.error))
123 {
124 diff.error = error;
125 diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
126 }
127 }
128
129 if (0 != meta)
130 {
131 if (item.lastlogsize != lastlogsize)
132 {
133 diff.lastlogsize = lastlogsize;
134 diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
135 }
136 if (item.mtime != mtime)
137 {
138 diff.mtime = mtime;
139 diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
140 }
141 }
142
143 if (ZBX_FLAGS_ITEM_DIFF_UNSET != diff.flags)
144 {
145 zbx_vector_ptr_t diffs;
146 char *sql = NULL;
147 size_t sql_alloc = 0, sql_offset = 0;
148
149 zbx_vector_ptr_create(&diffs);
150 diff.itemid = itemid;
151 zbx_vector_ptr_append(&diffs, &diff);
152
153 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
154 zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, &diffs, ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
155 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
156 if (16 < sql_offset)
157 DBexecute("%s", sql);
158
159 DCconfig_items_apply_changes(&diffs);
160
161 zbx_vector_ptr_destroy(&diffs);
162 zbx_free(sql);
163 }
164
165 DCconfig_clean_items(&item, &errcode, 1);
166 out:
167 zbx_free(value);
168 zbx_free(error);
169
170 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
171 }
172
173
ZBX_THREAD_ENTRY(lld_worker_thread,args)174 ZBX_THREAD_ENTRY(lld_worker_thread, args)
175 {
176 #define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */
177 /* once in STAT_INTERVAL seconds */
178
179 char *error = NULL;
180 zbx_ipc_socket_t lld_socket;
181 zbx_ipc_message_t message;
182 double time_stat, time_idle = 0, time_now, time_read;
183 zbx_uint64_t processed_num = 0;
184
185 process_type = ((zbx_thread_args_t *)args)->process_type;
186 server_num = ((zbx_thread_args_t *)args)->server_num;
187 process_num = ((zbx_thread_args_t *)args)->process_num;
188
189 zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
190 server_num, get_process_type_string(process_type), process_num);
191
192 zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type));
193
194 zbx_ipc_message_init(&message);
195
196 if (FAIL == zbx_ipc_socket_open(&lld_socket, ZBX_IPC_SERVICE_LLD, SEC_PER_MIN, &error))
197 {
198 zabbix_log(LOG_LEVEL_CRIT, "cannot connect to lld manager service: %s", error);
199 zbx_free(error);
200 exit(EXIT_FAILURE);
201 }
202
203 lld_register_worker(&lld_socket);
204
205 time_stat = zbx_time();
206
207
208 DBconnect(ZBX_DB_CONNECT_NORMAL);
209
210 zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
211
212 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
213
214 while (ZBX_IS_RUNNING())
215 {
216 time_now = zbx_time();
217
218 if (STAT_INTERVAL < time_now - time_stat)
219 {
220 zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL " sec during "
221 ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num,
222 processed_num, time_idle, time_now - time_stat);
223
224 time_stat = time_now;
225 time_idle = 0;
226 processed_num = 0;
227 }
228
229 update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
230 if (SUCCEED != zbx_ipc_socket_read(&lld_socket, &message))
231 {
232 zabbix_log(LOG_LEVEL_CRIT, "cannot read LLD manager service request");
233 exit(EXIT_FAILURE);
234 }
235 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
236
237 time_read = zbx_time();
238 time_idle += time_read - time_now;
239 zbx_update_env(time_read);
240
241 switch (message.code)
242 {
243 case ZBX_IPC_LLD_TASK:
244 lld_process_task(&message);
245 zbx_ipc_socket_write(&lld_socket, ZBX_IPC_LLD_DONE, NULL, 0);
246 processed_num++;
247 break;
248 }
249
250 zbx_ipc_message_clean(&message);
251 }
252
253 zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
254
255 while (1)
256 zbx_sleep(SEC_PER_MIN);
257
258 DBclose();
259
260 zbx_ipc_socket_close(&lld_socket);
261 }
262