1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "daemon.h"
22 #include "db.h"
23 #include "log.h"
24 #include "zbxipcservice.h"
25 #include "zbxself.h"
26 #include "dbcache.h"
27 #include "proxy.h"
28 #include "../events.h"
29 
30 #include "lld_worker.h"
31 #include "lld_protocol.h"
32 
33 extern unsigned char	process_type, program_type;
34 extern int		server_num, process_num;
35 
36 /******************************************************************************
37  *                                                                            *
38  * Function: lld_register_worker                                              *
39  *                                                                            *
40  * Purpose: registers lld worker with lld manager                             *
41  *                                                                            *
42  * Parameters: socket - [IN] the connections socket                           *
43  *                                                                            *
44  ******************************************************************************/
lld_register_worker(zbx_ipc_socket_t * socket)45 static void	lld_register_worker(zbx_ipc_socket_t *socket)
46 {
47 	pid_t	ppid;
48 
49 	ppid = getppid();
50 
51 	zbx_ipc_socket_write(socket, ZBX_IPC_LLD_REGISTER, (unsigned char *)&ppid, sizeof(ppid));
52 }
53 
54 /******************************************************************************
55  *                                                                            *
56  * Function: lld_process_task                                                 *
57  *                                                                            *
58  * Purpose: processes lld task and updates rule state/error in configuration  *
59  *          cache and database                                                *
60  *                                                                            *
61  * Parameters: message - [IN] the message with LLD request                    *
62  *                                                                            *
63  ******************************************************************************/
lld_process_task(zbx_ipc_message_t * message)64 static void	lld_process_task(zbx_ipc_message_t *message)
65 {
66 	zbx_uint64_t		itemid, hostid, lastlogsize;
67 	char			*value, *error;
68 	zbx_timespec_t		ts;
69 	zbx_item_diff_t		diff;
70 	DC_ITEM			item;
71 	int			errcode, mtime;
72 	unsigned char		state, meta;
73 
74 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
75 
76 	zbx_lld_deserialize_item_value(message->data, &itemid, &hostid, &value, &ts, &meta, &lastlogsize, &mtime, &error);
77 
78 	DCconfig_get_items_by_itemids(&item, &itemid, &errcode, 1);
79 	if (SUCCEED != errcode)
80 		goto out;
81 
82 	zabbix_log(LOG_LEVEL_DEBUG, "processing discovery rule:" ZBX_FS_UI64, itemid);
83 
84 	diff.flags = ZBX_FLAGS_ITEM_DIFF_UNSET;
85 
86 	if (NULL != error || NULL != value)
87 	{
88 		if (NULL == error && SUCCEED == lld_process_discovery_rule(itemid, value, &error))
89 			state = ITEM_STATE_NORMAL;
90 		else
91 			state = ITEM_STATE_NOTSUPPORTED;
92 
93 		if (state != item.state)
94 		{
95 			diff.state = state;
96 			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
97 
98 			if (ITEM_STATE_NORMAL == state)
99 			{
100 				zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became supported",
101 						item.host.host, item.key_orig);
102 
103 				zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts,
104 						ITEM_STATE_NORMAL, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL,
105 						NULL);
106 			}
107 			else
108 			{
109 				zabbix_log(LOG_LEVEL_WARNING, "discovery rule \"%s:%s\" became not supported: %s",
110 						item.host.host, item.key_orig, error);
111 
112 				zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_LLDRULE, itemid, &ts,
113 						ITEM_STATE_NOTSUPPORTED, NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0,
114 						NULL, error);
115 			}
116 
117 			zbx_process_events(NULL, NULL);
118 			zbx_clean_events();
119 		}
120 
121 		/* with successful LLD processing LLD error will be set to empty string */
122 		if (NULL != error && 0 != strcmp(error, item.error))
123 		{
124 			diff.error = error;
125 			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
126 		}
127 	}
128 
129 	if (0 != meta)
130 	{
131 		if (item.lastlogsize != lastlogsize)
132 		{
133 			diff.lastlogsize = lastlogsize;
134 			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
135 		}
136 		if (item.mtime != mtime)
137 		{
138 			diff.mtime = mtime;
139 			diff.flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
140 		}
141 	}
142 
143 	if (ZBX_FLAGS_ITEM_DIFF_UNSET != diff.flags)
144 	{
145 		zbx_vector_ptr_t	diffs;
146 		char			*sql = NULL;
147 		size_t			sql_alloc = 0, sql_offset = 0;
148 
149 		zbx_vector_ptr_create(&diffs);
150 		diff.itemid = itemid;
151 		zbx_vector_ptr_append(&diffs, &diff);
152 
153 		DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
154 		zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, &diffs, ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
155 		DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
156 		if (16 < sql_offset)
157 			DBexecute("%s", sql);
158 
159 		DCconfig_items_apply_changes(&diffs);
160 
161 		zbx_vector_ptr_destroy(&diffs);
162 		zbx_free(sql);
163 	}
164 
165 	DCconfig_clean_items(&item, &errcode, 1);
166 out:
167 	zbx_free(value);
168 	zbx_free(error);
169 
170 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
171 }
172 
173 
ZBX_THREAD_ENTRY(lld_worker_thread,args)174 ZBX_THREAD_ENTRY(lld_worker_thread, args)
175 {
176 #define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
177 				/* once in STAT_INTERVAL seconds */
178 
179 	char			*error = NULL;
180 	zbx_ipc_socket_t	lld_socket;
181 	zbx_ipc_message_t	message;
182 	double			time_stat, time_idle = 0, time_now, time_read;
183 	zbx_uint64_t		processed_num = 0;
184 
185 	process_type = ((zbx_thread_args_t *)args)->process_type;
186 	server_num = ((zbx_thread_args_t *)args)->server_num;
187 	process_num = ((zbx_thread_args_t *)args)->process_num;
188 
189 	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
190 			server_num, get_process_type_string(process_type), process_num);
191 
192 	zbx_setproctitle("%s [connecting to the database]", get_process_type_string(process_type));
193 
194 	zbx_ipc_message_init(&message);
195 
196 	if (FAIL == zbx_ipc_socket_open(&lld_socket, ZBX_IPC_SERVICE_LLD, SEC_PER_MIN, &error))
197 	{
198 		zabbix_log(LOG_LEVEL_CRIT, "cannot connect to lld manager service: %s", error);
199 		zbx_free(error);
200 		exit(EXIT_FAILURE);
201 	}
202 
203 	lld_register_worker(&lld_socket);
204 
205 	time_stat = zbx_time();
206 
207 
208 	DBconnect(ZBX_DB_CONNECT_NORMAL);
209 
210 	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
211 
212 	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
213 
214 	while (ZBX_IS_RUNNING())
215 	{
216 		time_now = zbx_time();
217 
218 		if (STAT_INTERVAL < time_now - time_stat)
219 		{
220 			zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL " sec during "
221 					ZBX_FS_DBL " sec]", get_process_type_string(process_type), process_num,
222 					processed_num, time_idle, time_now - time_stat);
223 
224 			time_stat = time_now;
225 			time_idle = 0;
226 			processed_num = 0;
227 		}
228 
229 		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
230 		if (SUCCEED != zbx_ipc_socket_read(&lld_socket, &message))
231 		{
232 			zabbix_log(LOG_LEVEL_CRIT, "cannot read LLD manager service request");
233 			exit(EXIT_FAILURE);
234 		}
235 		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
236 
237 		time_read = zbx_time();
238 		time_idle += time_read - time_now;
239 		zbx_update_env(time_read);
240 
241 		switch (message.code)
242 		{
243 			case ZBX_IPC_LLD_TASK:
244 				lld_process_task(&message);
245 				zbx_ipc_socket_write(&lld_socket, ZBX_IPC_LLD_DONE, NULL, 0);
246 				processed_num++;
247 				break;
248 		}
249 
250 		zbx_ipc_message_clean(&message);
251 	}
252 
253 	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
254 
255 	while (1)
256 		zbx_sleep(SEC_PER_MIN);
257 
258 	DBclose();
259 
260 	zbx_ipc_socket_close(&lld_socket);
261 }
262